OnixS C++ SGX Titan ITCH Market Data Handler  1.2.2
API documentation
HandlerImpl.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 #include <boost/regex.hpp>
20 #include <boost/scope_exit.hpp>
21 #include <boost/format.hpp>
22 
23 #include <util/TextBuilder.h>
24 
25 #include <OnixS/HandlerCore/FeedEngine/PacketContainer.h>
26 #include <OnixS/HandlerCore/TimeHelper.h>
27 
32 
33 #include <OnixS/ItchCore/MoldUDP64/PacketHeader.h>
34 #include <OnixS/ItchCore/MoldUDP64/RetransmissionService.h>
35 
36 #include "FeedEngineImpl.h"
37 #include "ListenerHolder.h"
38 #include "MessageRepository.h"
39 #include "Version.h"
40 #include "GlimpseService.h"
41 
42 #include "HandlerLogger.h"
43 #include "Formatting.h"
44 
45 #include "HandlerImpl.h"
46 
47 ONIXS_HANDLER_NAMESPACE_BEGIN
48 
49 using namespace std;
50 using namespace HandlerCore::Common;
51 using namespace HandlerCore::MarketData;
52 
53 HandlerImpl::HandlerImpl(const HandlerSettings& settings)
54  : Licensing::LicenseChecker(settings.licenseDirectory, HANDLER_NAMESPACE::productId())
55  , base (HANDLER_NAMESPACE::projectName(), nullptr, settings.maxPacketSize)
56  , settings_(settings)
57  , bookAllocator_(settings_.buildInternalOrderBooks ? new OrderBookAllocator(settings_.maxBooksObjectAmount) : nullptr)
58  , logger_(nullptr)
59  , listenerHolder_(nullptr)
60  , realtimeFeeds_(nullptr)
61  , gapResponseFeeds_(nullptr)
62  , realtimePacketProcessor_(nullptr)
63  , messageRepository_(nullptr)
64  , retransmissionService_(nullptr)
65  , glimpseService_(nullptr)
66  , backtestingMode_(false)
67 {
68  initLogger(settings);
69 
70  forceLog (ONIXS_LOG_INFO[this]
72  ", version " << HANDLER_NAMESPACE::version()
73  << ", "
74  << os()
75  );
76 
77  forceLog(ONIXS_LOG_INFO[this] << settings.toString());
78 
79  try
80  {
81  listenerHolder_.reset(new ListenerHolder(this, logger_.get()));
82 
83  realtimePacketProcessor_.reset(
85  settings_.lostPacketWaitTime,
87  settings_.heartbeatInterval,
88  logger_.get(),
89  this,
90  lock(),
91  this
92  )
93  );
94 
95  messageRepository_.reset(new MessageRepository(*listenerHolder_.get(), settings_, this, logger_.get(), bookAllocator_.get()));
96  }
97  catch(const std::exception& ex)
98  {
99  log(ONIXS_LOG_ERROR[this] << "Exception in HandlerImpl constructor: " << ex.what());
100  throw;
101  }
102  catch(...)
103  {
104  log(ONIXS_LOG_WARN [this] << "Unknown exception in HandlerImpl constructor");
105  throw;
106  }
107 }
108 
110 {
111  stop(true);
112 }
113 
114 void HandlerImpl::setOrderBookIdFilter(const std::set<OrderBookId>& orderBookIdFilter)
115 {
116  BOOST_ASSERT(messageRepository_.get() != nullptr);
117  messageRepository_->setOrderBookIdFilter(orderBookIdFilter);
118 }
119 
121 {
122  setFeedEngine(FeedEngineImpl::getFeedEngine(feedEngine));
123 
124  forceLog(ONIXS_LOG_INFO[this] << "Bound FeedEngine");
125  forceLog(ONIXS_LOG_INFO[this] << feedEngine.settings().toString());
126 }
127 
128 void HandlerImpl::invokeError(const std::string& description)
129 {
130  BOOST_ASSERT(listenerHolder_.get() != nullptr);
131  listenerHolder_->invokeError(ErrorCode::General, description);
132 
133  log(ONIXS_LOG_ERROR[this] << description);
134 }
135 
136 void HandlerImpl::invokeWarning (const std::string& description)
137 {
138  BOOST_ASSERT(listenerHolder_.get() != nullptr);
139  listenerHolder_->invokeWarning(description);
140 
141  log(ONIXS_LOG_WARN[this] << description);
142 }
143 
145 {
146  BOOST_ASSERT(listenerHolder_.get() != nullptr);
147  return *listenerHolder_;
148 }
149 
150 std::string HandlerImpl::os()
151 {
152 #if BOOST_OS_WINDOWS
153  return "Windows";
154 #elif BOOST_OS_UNIX
155  std::string res = "Linux";
156  res += ", ";
157  res += __VERSION__;
158  return res;
159 #else
160 # error "Unknown OS"
161 #endif
162 }
163 
164 void HandlerImpl::commonLog (int logLevel, const char* msg, size_t length)
165 {
166  BOOST_ASSERT(msg != nullptr);
167  BOOST_ASSERT(length != 0);
168 
169  if (logLevel < OnixS::Logging::MIN_LOG_LEVEL || logLevel > OnixS::Logging::MAX_LOG_LEVEL)
170  throw OperationException (
171  BOOST_CURRENT_FUNCTION,
172  (boost::format("An error occurred during log operation. Invalid log level (logLevel=%d).") % logLevel).str().c_str()
173  );
174 
175  log (OnixS::Logging::LogMsg (OnixS::Logging::LogLevel (logLevel))[std::string("USER")] << OnixS::Util::ValuePtr (msg, length));
176 }
177 
179 {
180  BOOST_ASSERT(lock());
181 
182  lock()->acquire();
183 
184  BOOST_SCOPE_EXIT(this_){
185  this_->lock()->release();
186  } BOOST_SCOPE_EXIT_END
187 
188 
189  if (settings_.glimpseFeed.serviceA.valid() || settings_.glimpseFeed.serviceB.valid())
190  {
191  glimpseService_.reset(new GlimpseService(settings_, this, logger_.get()));
192  glimpseService_->subscribeOnFailure(boost::bind(&HandlerImpl::onGlimpseFailure, this, _1));
193  glimpseService_->subscribeOnWarning(boost::bind(&HandlerImpl::onGlimpseWarning, this, _1));
194 
195  glimpseService_->subscribeOnMessage(boost::bind(&HandlerImpl::onGlimpseMessage, this, _1, _2));
196  glimpseService_->subscribeOnRestarted(boost::bind(&HandlerImpl::onGlimpseRestarted, this));
197  }
198 
199  base::start();
200 }
201 
202 void HandlerImpl::stop(bool wait)
203 {
204  BOOST_ASSERT(lock());
205 
206  if (retransmissionService_.get() != nullptr)
207  retransmissionService_->stop();
208 
209  if (glimpseService_.get() && glimpseService_->inProgress())
210  glimpseService_->stop(wait);
211 
212  lock()->acquire();
213 
214  BOOST_SCOPE_EXIT(this_) {
215  this_->lock()->release();
216  } BOOST_SCOPE_EXIT_END
217 
218  base::stop();
219 }
220 
221 FE::MulticastFeedLayout::Enum getRetransmissionFeedLayout(const HandlerSettings& settings)
222 {
223  if (settings.retransmissionFeed.serviceA.valid() && settings.retransmissionFeed.serviceB.valid())
224  return FE::MulticastFeedLayout::AWithFailoverToB;
225 
226  if (settings.retransmissionFeed.serviceA.valid())
227  return FE::MulticastFeedLayout::AOnly;
228 
229  if (settings.retransmissionFeed.serviceA.valid())
230  return FE::MulticastFeedLayout::BOnly;
231 
232  throw std::invalid_argument("Retransmission Server is not set up");
233 }
234 
235 void HandlerImpl::initRetransmissionService()
236 {
237  try
238  {
239  const FE::MulticastFeedLayout::Enum layout = getRetransmissionFeedLayout(settings_);
240  FE::ConnectionInfoList incrementalConnections;
241  FE::NifsByFeedRole nifs;
242 
243  if (settings_.retransmissionFeed.serviceA.valid())
244  {
245  FE::ConnectionInfo connectionA;
246  connectionA.role(FE::NetFeedRole::A);
247 
248  nifs.A(settings_.networkInterfaceA.empty() ? settings_.networkInterface : settings_.networkInterfaceA);
249 
250  connectionA.type(FE::NetFeedType::Historical);
251  connectionA.id("RetransmissionA");
252  connectionA.ip(settings_.retransmissionFeed.serviceA.address);
253  connectionA.port(settings_.retransmissionFeed.serviceA.port);
254 
255  incrementalConnections.push_back(connectionA);
256  }
257 
258  if (settings_.retransmissionFeed.serviceB.valid())
259  {
260  FE::ConnectionInfo connectionB;
261  connectionB.role(FE::NetFeedRole::B);
262 
263  nifs.B(settings_.networkInterfaceA.empty() ? settings_.networkInterface : settings_.networkInterfaceB);
264 
265  connectionB.type(FE::NetFeedType::Historical);
266  connectionB.id("RetransmissionB");
267  connectionB.ip(settings_.retransmissionFeed.serviceB.address);
268  connectionB.port(settings_.retransmissionFeed.serviceB.port);
269 
270  incrementalConnections.push_back(connectionB);
271  }
272 
273  gapResponseFeeds_.reset(
274  constructUnicastCluster(
275  FE::NetFeedType::Historical,
276  layout,
277  incrementalConnections,
278  nifs,
279  FE::TimeSpan((FE::TimeSpan::Ticks(settings_.lostPacketWaitTime) * FE::TimeTraits::nanosecondsPerMicrosecond()) / 10),
280  OnixS::ItchCore::MoldUDP64::getDummySharedReceiver()
281  )
282  );
283 
284  retransmissionService_.reset(
285  new ItchCore::MoldUDP64::RetransmissionService
286  (
287  *gapResponseFeeds_,
289  settings_.lostPacketWaitTime,
290  logger_.get(),
291  this,
292  (layout == FE::MulticastFeedLayout::AWithFailoverToB) || (layout == FE::MulticastFeedLayout::BWithFailoverToA)
293  ));
294 
295  retransmissionService_->subscribeOnFailure(boost::bind(&HandlerImpl::onReplayFailure, this, _1, _2, _3));
296  retransmissionService_->subscribeOnPacket(boost::bind(&HandlerImpl::onReplayPacket, this, _1));
297  retransmissionService_->subscribeOnWarning(boost::bind(&HandlerImpl::invokeWarning, this, _1));
298  }
299  catch (const std::invalid_argument&)
300  {
301  retransmissionService_.reset();
302  gapResponseFeeds_.reset();
303  }
304 }
305 
306 void HandlerImpl::onStarting()
307 {
308  realtimePacketProcessor_->reset();
309  messageRepository_->reset();
310 
311  {
312  FE::ConnectionInfo connectionA;
313  FE::ConnectionInfo connectionB;
314 
315  connectionA.role(FE::NetFeedRole::A);
316  connectionB.role(FE::NetFeedRole::B);
317 
318  FE::MulticastFeedLayout::Enum layout = FE::MulticastFeedLayout::Both;
319  if (!settings_.useFeedA)
320  layout = FE::MulticastFeedLayout::BOnly;
321  else if (!settings_.useFeedB)
322  layout = FE::MulticastFeedLayout::AOnly;
323 
324  FE::NifsByFeedRole nifs;
325  nifs.A(settings_.networkInterfaceA.empty() ? settings_.networkInterface : settings_.networkInterfaceA);
326  nifs.B(settings_.networkInterfaceB.empty() ? settings_.networkInterface : settings_.networkInterfaceB);
327 
328  ///
329  connectionA.type(FE::NetFeedType::Incremental);
330  connectionB.type(FE::NetFeedType::Incremental);
331 
332  connectionA.id("MulticastA");
333  connectionB.id("MulticastB");
334 
335  connectionA.ip(settings_.itchFeed.serviceA.address);
336  connectionB.ip(settings_.itchFeed.serviceB.address);
337 
338  connectionA.port(settings_.itchFeed.serviceA.port);
339  connectionB.port(settings_.itchFeed.serviceB.port);
340 
341  FE::ConnectionInfoList incrementalConnections;
342  incrementalConnections.push_back(connectionA);
343  incrementalConnections.push_back(connectionB);
344 
345  realtimeFeeds_.reset(
346  constructCluster(
347  FE::NetFeedType::Incremental,
348  layout,
349  incrementalConnections,
350  nifs,
351  FE::TimeSpan(0, 0, settings_.heartbeatInterval, 0),
352  *realtimePacketProcessor_
353  )
354  );
355  }
356 
357  initRetransmissionService();
358 
359  realtimeFeeds_->connect(FE::CanThrowNow());
360 }
361 
362 void HandlerImpl::onStopping()
363 {
364  if(realtimeFeeds_)
365  realtimeFeeds_->disconnect();
366 
367  if (retransmissionService_)
368  gapResponseFeeds_->disconnect();
369 }
370 
371 void HandlerImpl::onHeartbeat(FE::PacketContainer& packetContainer)
372 {
373  static Logging::LogFacility logFacility("Realtime", this, Logging::LOG_LEVEL_DEBUG);
374 
375  const OnixS::ItchCore::MoldUDP64::PacketHeader* unitHeader = &OnixS::ItchCore::MoldUDP64::PacketHelper::header(packetContainer);
376 
377  BOOST_ASSERT(unitHeader != nullptr);
378 
379  if(settings_.logSettings & LogSettings::LogPackets)
380  log( ONIXS_LOG_INFO[&logFacility] << unitHeader->sequenceNumber() << " Heartbeat received (" << Base64Source(packetContainer.data(), packetContainer.size()) << ").");
381 }
382 
383 void HandlerImpl::onPacket(FE::PacketContainer& packetContainer, bool fromCache)
384 {
385  static Logging::LogFacility logFacility("Realtime", this, Logging::LOG_LEVEL_DEBUG);
386 
387  const OnixS::ItchCore::MoldUDP64::PacketHeader* unitHeader = &OnixS::ItchCore::MoldUDP64::PacketHelper::header(packetContainer);
388 
389  BOOST_ASSERT(unitHeader->messageCount() != 0);
390 
391  if (settings_.logSettings & LogSettings::LogPackets)
392  log( ONIXS_LOG_INFO[&logFacility]
393  << unitHeader->sequenceNumber()
394  << ":"
395  << unitHeader->messageCount()
396  << " Packet received ("
397  << Base64Source(packetContainer.data(), packetContainer.size())
398  << ").");
399 
400  DataSource dataSource;
401  dataSource.origin = DataSource::multicast;
402  dataSource.packetMessageCount = unitHeader->messageCount();
403  dataSource.messageSeqNum = unitHeader->sequenceNumber();
404  dataSource.session = (Byte*)unitHeader->session();
405 
406  if(!fromCache)
407  dataSource.packetReceptionTime = TimeHelper::convert<Timestamp>(packetContainer.receiveTime());
408  else
409  dataSource.packetReceptionTime = Timestamp();
410 
411  messageRepository_->onPacket(dataSource, packetContainer);
412 }
413 
414 void HandlerImpl::onReplayPacket(OnixS::HandlerCore::MarketData::FE::PacketContainer& packetContainer)
415 {
416  const OnixS::ItchCore::MoldUDP64::PacketHeader* unitHeader =
417  &OnixS::ItchCore::MoldUDP64::PacketHelper::header(packetContainer);
418 
419  static Logging::LogFacility logFacility("RetransmissionService", this, Logging::LOG_LEVEL_DEBUG);
420 
421  BOOST_ASSERT(unitHeader->messageCount() != 0);
422 
423  if (settings_.logSettings & LogSettings::LogPackets)
424  log( ONIXS_LOG_INFO[&logFacility]
425  << unitHeader->sequenceNumber()
426  << ":"
427  << unitHeader->messageCount()
428  << " Packet received ("
429  << Base64Source(packetContainer.data(), packetContainer.size())
430  << ").");
431 
432 
433  BOOST_ASSERT(unitHeader->messageCount() != 0);
434 
435  DataSource dataSource;
436  dataSource.packetReceptionTime = HANDLER_NAMESPACE::Timestamp::utcNow();
437  dataSource.packetMessageCount = unitHeader->messageCount();
438  dataSource.messageSeqNum = unitHeader->sequenceNumber();
439  dataSource.session = (Byte*)unitHeader->session();
440  dataSource.origin = DataSource::replay;
441 
442  messageRepository_->onPacket(dataSource, packetContainer);
443 }
444 
445 void HandlerImpl::onMulticastPacketGap(const OnixS::ItchCore::SessionType& session, SequenceNumber expectedSeqNum, SequenceNumber receivedSeqNum)
446 {
447  BOOST_ASSERT(receivedSeqNum != 0);
448  BOOST_ASSERT(expectedSeqNum < receivedSeqNum);
449 
450  static Logging::LogFacility logFacility("Multicast", this, Logging::LOG_LEVEL_DEBUG);
451  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap: expected " << expectedSeqNum << ", but received " << receivedSeqNum);
452 
453  BOOST_ASSERT(messageRepository_.get() != nullptr);
454  messageRepository_->onGap(expectedSeqNum, receivedSeqNum);
455 
456  if (!backtestingMode_)
457  {
458  const SequenceNumber missedPacketNumber = receivedSeqNum - expectedSeqNum;
459 
460  if (missedPacketNumber <= settings_.retransmissionMaxPacketNumber && retransmissionService_.get() != nullptr)
461  {
462  retransmissionService_->requestReplay(session, expectedSeqNum, receivedSeqNum - expectedSeqNum);
463  }
464  else if (glimpseService_.get() != nullptr)
465  {
466  if (glimpseService_->inProgress())
467  {
468  invokeWarning("Glimpse service in progress, unable to restore multicast packet gap. Resetting.");
469 
470  glimpseService_->stop(true);
471  }
472 
473  messageRepository_->onSnapshotRecoveryStarted();
474 
475  glimpseService_->request(session, receivedSeqNum - 1);
476  }
477  else
478  {
479  const std::string message = "Glimpse service is not initialized";
480 
481  if (settings_.buildInternalOrderBooks)
482  {
483  invokeError(message + ", unable to recovery state, stopping.");
484  stop(false);
485  }
486  else
487  {
488  invokeWarning(message + ", packets will be skipped.");
489  messageRepository_->skipPackets(expectedSeqNum, receivedSeqNum);
490  }
491  }
492 
493  }
494 }
495 
496 void HandlerImpl::onMulticastInactivity()
497 {
498  BOOST_ASSERT(listenerHolder_.get() != nullptr);
499  listenerHolder_->invokeWarning("No multicast data");
500 
501  BOOST_ASSERT(messageRepository_.get() != nullptr);
502  messageRepository_->onInactivity();
503 }
504 
505 void HandlerImpl::initLogger(const HandlerSettings &settings)
506 {
507  logger_.reset(new HandlerLogger(convertLogSettings(settings)));
508  setLogger(&logger_->getLogger());
509 }
510 
511 void HandlerImpl::start(const ReplayOptions& options)
512 {
513  backtestingMode_ = true;
514 
515  LogPlayerLinks links;
516 
517  BOOST_ASSERT(realtimePacketProcessor_.get() != nullptr);
518  realtimePacketProcessor_->activateImmediateGapReplayMode();
519  links.push_back(LogPlayerLink("Realtime", realtimePacketProcessor_.get()));
520 
521  retransmissionPacketProcessor_.reset(new RetransmissionPacketProcessor(logger_.get(), this, messageRepository_.get()));
522  links.push_back(LogPlayerLink("RetransmissionService", retransmissionPacketProcessor_.get()));
523 
524  glimpsePacketProcessor_.reset(new GlimpsePacketProcessor(logger_.get(), this, messageRepository_.get()));
525  links.push_back(LogPlayerLink("GlimpseService", glimpsePacketProcessor_.get()));
526 
527  const std::string parser =
528  std::string("(\\S+).*") +
529  std::string("\\[") + HANDLER_NAMESPACE::projectName() + std::string("\\.(\\w+)\\].*") +
530  std::string("(Packet received|Heartbeat received|Message received)") +
531  std::string(" \\((.*)\\)\\.?");
532 
533  BOOST_ASSERT(options.listener);
534 
535  HandlerCore::MarketData::MulticastFeedHandler::startReplay(
536  options.logs,
537  0,
538  0,
539  links,
540  boost::bind(&ReplayListener::onReplayError, options.listener, _1),
541  boost::bind(&ReplayListener::onReplayFinished, options.listener),
542  parser,
543  0,
544  1,
545  3
546  );
547 }
548 
549 void HandlerImpl::onPrepareLogPlayer(LogPlayer* logPlayer)
550 {
551  if (logPlayer)
552  {
553  logPlayer->setDefaultHandler(boost::bind(&HandlerImpl::parseGlimpseStart, this, _1, _2));
554  }
555 }
556 
557 bool HandlerImpl::parseGlimpseStart(const std::string& logLine, HandlerCore::Common::LogEntry& /*logEntry*/)
558 {
559  const std::string parser =
560  std::string("(\\S+).*") +
561  std::string("\\[") + HANDLER_NAMESPACE::projectName() + std::string("\\.(\\w+)\\].*") +
562  std::string("(SnapshotRecoveryRequested|SnapshotRecoveryRestarted)");
563 
564  const boost::regex regEx(parser);
565  boost::smatch smatch;
566  const bool ok = boost::regex_match(logLine, smatch, regEx);
567 
568  if (!ok)
569  return false;
570 
571  std::string action = smatch[3];
572  if(action == "SnapshotRecoveryRequested")
573  messageRepository_->onSnapshotRecoveryStarted();
574  else if (action == "SnapshotRecoveryRestarted")
575  messageRepository_->onSnapshotRecoveryRestarted();
576 
577  return false;
578 }
579 
580 void HandlerImpl::onReplayFailure(const ItchCore::SessionType&, SequenceNumber begin, ItchCore::Binary8 count)
581 {
582  const std::string what =
583  (boost::format("Retransmission service failure, required packets: %d - %d") % begin % (begin + count)).str();
584 
585  if (settings_.buildInternalOrderBooks)
586  {
587  invokeError(what + ", unable to recovery state, stopping.");
588  stop(false);
589  }
590  else
591  {
592  invokeWarning(what + ", packets will be skipped.");
593 
594  BOOST_ASSERT(messageRepository_.get() != nullptr);
595  messageRepository_->skipPackets(begin, begin + count);
596  }
597 }
598 
599 void HandlerImpl::onGlimpseFailure(const std::string& what)
600 {
601  const std::string message = "GLIMPSE service failure: " + what;
602  invokeError(message);
603  stop(false);
604 }
605 
606 void HandlerImpl::onGlimpseWarning(const std::string& what)
607 {
608  const std::string message = "GLIMPSE service warning: " + what;
609  invokeWarning(message);
610 }
611 
612 void HandlerImpl::onGlimpseMessage(const DataSource& dataSource, const IncomingMessage* message)
613 {
614  if (settings_.logSettings & LogSettings::LogPackets)
615  log(ONIXS_LOG_INFO[glimpseService_.get()]
616  << " Message received ("
617  << Base64Source(reinterpret_cast<const char*>(message->binary()), message->binarySize())
618  << ").");
619 
620 
621  messageRepository_->processMessage(dataSource, message);
622 }
623 
624 void HandlerImpl::onGlimpseRestarted()
625 {
626  BOOST_ASSERT(messageRepository_);
627  messageRepository_->onSnapshotRecoveryRestarted();
628 }
629 
630 void HandlerImpl::onStateChanged(HandlerCore::MarketData::MulticastFeedHandler::State::Enum newState)
631 {
632  listenerHolder_->invokeStateChanged(static_cast<HandlerState::Enum>(newState));
633 }
634 
635 ONIXS_HANDLER_NAMESPACE_END
636 
637 
Origin origin
the way data received
Definition: Defines.h:77
bool parseGlimpseStart(const std::string &logLine, HandlerCore::Common::LogEntry &)
void setOrderBookIdFilter(const std::set< OrderBookId > &orderBookIdFilter)
UInt16 packetMessageCount
Message sequence number.
Definition: Defines.h:65
static HandlerCore::MarketData::FE::MultithreadedFeedEngine * getFeedEngine(FeedEngine &feedEngine)
ReplayListener * listener
Instance to notify about replay events.
Definition: Replay.h:62
UInt64 SequenceNumber
Alias for Sequence Number type.
Definition: Defines.h:37
const void * binary() const
Message content.
unsigned int maxRetransmissionResponseTime
Maximum timeout to wait retransmission responses (sec), default value is 30 sec.
Represents timestamp without time-zone information.
Definition: Timestamp.h:84
Log binary data of received packets, applied only for Info log level and below.
Definition: LogSettings.h:78
STL namespace.
FE::MulticastFeedLayout::Enum getRetransmissionFeedLayout(const HandlerSettings &settings)
MessageSize binarySize() const
Size of message.
OnixS::Licensing::ProductId productId()
Definition: Version.cpp:33
Identifies errors of generic nature.
Definition: ErrorListener.h:36
const Byte * session
Session Id.
Definition: Defines.h:74
void commonLog(int logLevel, const char *msg, size_t length)
User&#39;s common log.
OnixS::ItchCore::MoldUDP64::PacketProcessor< HandlerImpl > RealtimePacketProcessor
Definition: HandlerImpl.h:60
void bindFeedEngine(FeedEngine &feedEngine)
void onPrepareLogPlayer(HandlerCore::Common::LogPlayer *logPlayer)
std::string toString() const
Returns the string representation.
std::string toString() const
Returns the string representation.
void invokeError(const std::string &description) override
Invoke helpers.
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
Defines ONIXS_SGXTITAN_ITCH_API which affect logs replay.
Definition: Replay.h:55
virtual void onStateChanged(State::Enum newState)
bool valid() const
Returns true if the descriptor points to valid ip address.
UInt8 Byte
Alias for Byte.
Definition: Memory.h:30
virtual void onReplayError(const std::string &errorDescription)=0
Is called once error occurs while replaying logs.
ListenerHolder & listenerHolder()
Returns commons services as shared object.
virtual void onReplayFinished()=0
Is called once all the logs are replayed.
UInt64 messageSeqNum
Message sequence number.
Definition: Defines.h:71
Timestamp packetReceptionTime
Time when the packet was received by Handler from UDP, in system ticks,.
Definition: Defines.h:62
const FeedEngineSettings & settings() const
Settings used define behavior of given instance.
Definition: FeedEngine.cpp:59
const char * projectDescription()
Definition: Version.cpp:43
FeedDescriptor retransmissionFeed
Retransmission feed.
void invokeWarning(const std::string &description) override
unsigned int retransmissionMaxPacketNumber
Lost packets threshold when the Handler prefers retransmission.
Manages processing machinery for market data received from feeds.
Definition: FeedEngine.h:254