19 #include <boost/regex.hpp> 20 #include <boost/scope_exit.hpp> 21 #include <boost/format.hpp> 23 #include <util/TextBuilder.h> 25 #include <OnixS/HandlerCore/FeedEngine/PacketContainer.h> 26 #include <OnixS/HandlerCore/TimeHelper.h> 33 #include <OnixS/ItchCore/MoldUDP64/PacketHeader.h> 34 #include <OnixS/ItchCore/MoldUDP64/RetransmissionService.h> 47 ONIXS_HANDLER_NAMESPACE_BEGIN
50 using namespace HandlerCore::Common;
51 using namespace HandlerCore::MarketData;
54 : Licensing::LicenseChecker(settings.licenseDirectory, HANDLER_NAMESPACE::
productId())
55 , base (HANDLER_NAMESPACE::
projectName(), nullptr, settings.maxPacketSize)
57 , bookAllocator_(settings_.buildInternalOrderBooks ? new
OrderBookAllocator(settings_.maxBooksObjectAmount) : nullptr)
59 , listenerHolder_(nullptr)
60 , realtimeFeeds_(nullptr)
61 , gapResponseFeeds_(nullptr)
62 , realtimePacketProcessor_(nullptr)
63 , messageRepository_(nullptr)
64 , retransmissionService_(nullptr)
65 , glimpseService_(nullptr)
66 , backtestingMode_(false)
70 forceLog (ONIXS_LOG_INFO[
this]
77 forceLog(ONIXS_LOG_INFO[
this] << settings.
toString());
83 realtimePacketProcessor_.reset(
95 messageRepository_.reset(
new MessageRepository(*listenerHolder_.get(), settings_,
this, logger_.get(), bookAllocator_.get()));
97 catch(
const std::exception& ex)
99 log(ONIXS_LOG_ERROR[
this] <<
"Exception in HandlerImpl constructor: " << ex.what());
104 log(ONIXS_LOG_WARN [
this] <<
"Unknown exception in HandlerImpl constructor");
116 BOOST_ASSERT(messageRepository_.get() !=
nullptr);
117 messageRepository_->setOrderBookIdFilter(orderBookIdFilter);
124 forceLog(ONIXS_LOG_INFO[
this] <<
"Bound FeedEngine");
130 BOOST_ASSERT(listenerHolder_.get() !=
nullptr);
133 log(ONIXS_LOG_ERROR[
this] << description);
138 BOOST_ASSERT(listenerHolder_.get() !=
nullptr);
139 listenerHolder_->invokeWarning(description);
141 log(ONIXS_LOG_WARN[
this] << description);
146 BOOST_ASSERT(listenerHolder_.get() !=
nullptr);
147 return *listenerHolder_;
150 std::string HandlerImpl::os()
155 std::string res =
"Linux";
166 BOOST_ASSERT(msg !=
nullptr);
167 BOOST_ASSERT(length != 0);
169 if (logLevel < OnixS::Logging::MIN_LOG_LEVEL || logLevel > OnixS::Logging::MAX_LOG_LEVEL)
171 BOOST_CURRENT_FUNCTION,
172 (boost::format(
"An error occurred during log operation. Invalid log level (logLevel=%d).") % logLevel).str().c_str()
175 log (OnixS::Logging::LogMsg (OnixS::Logging::LogLevel (logLevel))[std::string(
"USER")] << OnixS::Util::ValuePtr (msg, length));
180 BOOST_ASSERT(lock());
184 BOOST_SCOPE_EXIT(this_){
185 this_->lock()->release();
186 } BOOST_SCOPE_EXIT_END
191 glimpseService_.reset(
new GlimpseService(settings_,
this, logger_.get()));
192 glimpseService_->subscribeOnFailure(boost::bind(&HandlerImpl::onGlimpseFailure,
this, _1));
193 glimpseService_->subscribeOnWarning(boost::bind(&HandlerImpl::onGlimpseWarning,
this, _1));
195 glimpseService_->subscribeOnMessage(boost::bind(&HandlerImpl::onGlimpseMessage,
this, _1, _2));
196 glimpseService_->subscribeOnRestarted(boost::bind(&HandlerImpl::onGlimpseRestarted,
this));
204 BOOST_ASSERT(lock());
206 if (retransmissionService_.get() !=
nullptr)
207 retransmissionService_->stop();
209 if (glimpseService_.get() && glimpseService_->inProgress())
210 glimpseService_->stop(wait);
214 BOOST_SCOPE_EXIT(this_) {
215 this_->lock()->release();
216 } BOOST_SCOPE_EXIT_END
224 return FE::MulticastFeedLayout::AWithFailoverToB;
227 return FE::MulticastFeedLayout::AOnly;
230 return FE::MulticastFeedLayout::BOnly;
232 throw std::invalid_argument(
"Retransmission Server is not set up");
235 void HandlerImpl::initRetransmissionService()
240 FE::ConnectionInfoList incrementalConnections;
241 FE::NifsByFeedRole nifs;
245 FE::ConnectionInfo connectionA;
246 connectionA.role(FE::NetFeedRole::A);
250 connectionA.type(FE::NetFeedType::Historical);
251 connectionA.id(
"RetransmissionA");
255 incrementalConnections.push_back(connectionA);
260 FE::ConnectionInfo connectionB;
261 connectionB.role(FE::NetFeedRole::B);
265 connectionB.type(FE::NetFeedType::Historical);
266 connectionB.id(
"RetransmissionB");
270 incrementalConnections.push_back(connectionB);
273 gapResponseFeeds_.reset(
274 constructUnicastCluster(
275 FE::NetFeedType::Historical,
277 incrementalConnections,
279 FE::TimeSpan((FE::TimeSpan::Ticks(settings_.
lostPacketWaitTime) * FE::TimeTraits::nanosecondsPerMicrosecond()) / 10),
280 OnixS::ItchCore::MoldUDP64::getDummySharedReceiver()
284 retransmissionService_.reset(
285 new ItchCore::MoldUDP64::RetransmissionService
292 (layout == FE::MulticastFeedLayout::AWithFailoverToB) || (layout == FE::MulticastFeedLayout::BWithFailoverToA)
295 retransmissionService_->subscribeOnFailure(boost::bind(&HandlerImpl::onReplayFailure,
this, _1, _2, _3));
296 retransmissionService_->subscribeOnPacket(boost::bind(&HandlerImpl::onReplayPacket,
this, _1));
299 catch (
const std::invalid_argument&)
301 retransmissionService_.reset();
302 gapResponseFeeds_.reset();
306 void HandlerImpl::onStarting()
308 realtimePacketProcessor_->reset();
309 messageRepository_->reset();
312 FE::ConnectionInfo connectionA;
313 FE::ConnectionInfo connectionB;
315 connectionA.role(FE::NetFeedRole::A);
316 connectionB.role(FE::NetFeedRole::B);
318 FE::MulticastFeedLayout::Enum layout = FE::MulticastFeedLayout::Both;
320 layout = FE::MulticastFeedLayout::BOnly;
322 layout = FE::MulticastFeedLayout::AOnly;
324 FE::NifsByFeedRole nifs;
329 connectionA.type(FE::NetFeedType::Incremental);
330 connectionB.type(FE::NetFeedType::Incremental);
332 connectionA.id(
"MulticastA");
333 connectionB.id(
"MulticastB");
341 FE::ConnectionInfoList incrementalConnections;
342 incrementalConnections.push_back(connectionA);
343 incrementalConnections.push_back(connectionB);
345 realtimeFeeds_.reset(
347 FE::NetFeedType::Incremental,
349 incrementalConnections,
352 *realtimePacketProcessor_
357 initRetransmissionService();
359 realtimeFeeds_->connect(FE::CanThrowNow());
362 void HandlerImpl::onStopping()
365 realtimeFeeds_->disconnect();
367 if (retransmissionService_)
368 gapResponseFeeds_->disconnect();
371 void HandlerImpl::onHeartbeat(FE::PacketContainer& packetContainer)
373 static Logging::LogFacility logFacility(
"Realtime",
this, Logging::LOG_LEVEL_DEBUG);
375 const OnixS::ItchCore::MoldUDP64::PacketHeader* unitHeader = &OnixS::ItchCore::MoldUDP64::PacketHelper::header(packetContainer);
377 BOOST_ASSERT(unitHeader !=
nullptr);
380 log( ONIXS_LOG_INFO[&logFacility] << unitHeader->sequenceNumber() <<
" Heartbeat received (" << Base64Source(packetContainer.data(), packetContainer.size()) <<
").");
383 void HandlerImpl::onPacket(FE::PacketContainer& packetContainer,
bool fromCache)
385 static Logging::LogFacility logFacility(
"Realtime",
this, Logging::LOG_LEVEL_DEBUG);
387 const OnixS::ItchCore::MoldUDP64::PacketHeader* unitHeader = &OnixS::ItchCore::MoldUDP64::PacketHelper::header(packetContainer);
389 BOOST_ASSERT(unitHeader->messageCount() != 0);
392 log( ONIXS_LOG_INFO[&logFacility]
393 << unitHeader->sequenceNumber()
395 << unitHeader->messageCount()
396 <<
" Packet received (" 397 << Base64Source(packetContainer.data(), packetContainer.size())
407 dataSource.
packetReceptionTime = TimeHelper::convert<Timestamp>(packetContainer.receiveTime());
411 messageRepository_->onPacket(dataSource, packetContainer);
414 void HandlerImpl::onReplayPacket(OnixS::HandlerCore::MarketData::FE::PacketContainer& packetContainer)
416 const OnixS::ItchCore::MoldUDP64::PacketHeader* unitHeader =
417 &OnixS::ItchCore::MoldUDP64::PacketHelper::header(packetContainer);
419 static Logging::LogFacility logFacility(
"RetransmissionService",
this, Logging::LOG_LEVEL_DEBUG);
421 BOOST_ASSERT(unitHeader->messageCount() != 0);
424 log( ONIXS_LOG_INFO[&logFacility]
425 << unitHeader->sequenceNumber()
427 << unitHeader->messageCount()
428 <<
" Packet received (" 429 << Base64Source(packetContainer.data(), packetContainer.size())
433 BOOST_ASSERT(unitHeader->messageCount() != 0);
437 dataSource.packetMessageCount = unitHeader->messageCount();
438 dataSource.messageSeqNum = unitHeader->sequenceNumber();
439 dataSource.session = (
Byte*)unitHeader->session();
442 messageRepository_->onPacket(dataSource, packetContainer);
445 void HandlerImpl::onMulticastPacketGap(
const OnixS::ItchCore::SessionType& session,
SequenceNumber expectedSeqNum,
SequenceNumber receivedSeqNum)
447 BOOST_ASSERT(receivedSeqNum != 0);
448 BOOST_ASSERT(expectedSeqNum < receivedSeqNum);
450 static Logging::LogFacility logFacility(
"Multicast",
this, Logging::LOG_LEVEL_DEBUG);
451 log( ONIXS_LOG_DEBUG[&logFacility] <<
"Packet gap: expected " << expectedSeqNum <<
", but received " << receivedSeqNum);
453 BOOST_ASSERT(messageRepository_.get() !=
nullptr);
454 messageRepository_->onGap(expectedSeqNum, receivedSeqNum);
456 if (!backtestingMode_)
458 const SequenceNumber missedPacketNumber = receivedSeqNum - expectedSeqNum;
462 retransmissionService_->requestReplay(session, expectedSeqNum, receivedSeqNum - expectedSeqNum);
464 else if (glimpseService_.get() !=
nullptr)
466 if (glimpseService_->inProgress())
468 invokeWarning(
"Glimpse service in progress, unable to restore multicast packet gap. Resetting.");
470 glimpseService_->stop(
true);
473 messageRepository_->onSnapshotRecoveryStarted();
475 glimpseService_->request(session, receivedSeqNum - 1);
479 const std::string message =
"Glimpse service is not initialized";
483 invokeError(message +
", unable to recovery state, stopping.");
489 messageRepository_->skipPackets(expectedSeqNum, receivedSeqNum);
496 void HandlerImpl::onMulticastInactivity()
498 BOOST_ASSERT(listenerHolder_.get() !=
nullptr);
499 listenerHolder_->invokeWarning(
"No multicast data");
501 BOOST_ASSERT(messageRepository_.get() !=
nullptr);
502 messageRepository_->onInactivity();
507 logger_.reset(
new HandlerLogger(convertLogSettings(settings)));
508 setLogger(&logger_->getLogger());
513 backtestingMode_ =
true;
515 LogPlayerLinks links;
517 BOOST_ASSERT(realtimePacketProcessor_.get() !=
nullptr);
518 realtimePacketProcessor_->activateImmediateGapReplayMode();
519 links.push_back(LogPlayerLink(
"Realtime", realtimePacketProcessor_.get()));
522 links.push_back(LogPlayerLink(
"RetransmissionService", retransmissionPacketProcessor_.get()));
525 links.push_back(LogPlayerLink(
"GlimpseService", glimpsePacketProcessor_.get()));
527 const std::string parser =
528 std::string(
"(\\S+).*") +
530 std::string(
"(Packet received|Heartbeat received|Message received)") +
531 std::string(
" \\((.*)\\)\\.?");
535 HandlerCore::MarketData::MulticastFeedHandler::startReplay(
559 const std::string parser =
560 std::string(
"(\\S+).*") +
562 std::string(
"(SnapshotRecoveryRequested|SnapshotRecoveryRestarted)");
564 const boost::regex regEx(parser);
565 boost::smatch smatch;
566 const bool ok = boost::regex_match(logLine, smatch, regEx);
571 std::string action = smatch[3];
572 if(action ==
"SnapshotRecoveryRequested")
573 messageRepository_->onSnapshotRecoveryStarted();
574 else if (action ==
"SnapshotRecoveryRestarted")
575 messageRepository_->onSnapshotRecoveryRestarted();
580 void HandlerImpl::onReplayFailure(
const ItchCore::SessionType&,
SequenceNumber begin, ItchCore::Binary8 count)
582 const std::string what =
583 (boost::format(
"Retransmission service failure, required packets: %d - %d") % begin % (begin + count)).str();
587 invokeError(what +
", unable to recovery state, stopping.");
594 BOOST_ASSERT(messageRepository_.get() !=
nullptr);
595 messageRepository_->skipPackets(begin, begin + count);
599 void HandlerImpl::onGlimpseFailure(
const std::string& what)
601 const std::string message =
"GLIMPSE service failure: " + what;
606 void HandlerImpl::onGlimpseWarning(
const std::string& what)
608 const std::string message =
"GLIMPSE service warning: " + what;
615 log(ONIXS_LOG_INFO[glimpseService_.get()]
616 <<
" Message received (" 617 << Base64Source(reinterpret_cast<const char*>(message->
binary()), message->
binarySize())
621 messageRepository_->processMessage(dataSource, message);
624 void HandlerImpl::onGlimpseRestarted()
626 BOOST_ASSERT(messageRepository_);
627 messageRepository_->onSnapshotRecoveryRestarted();
632 listenerHolder_->invokeStateChanged(static_cast<HandlerState::Enum>(newState));
635 ONIXS_HANDLER_NAMESPACE_END
Origin origin
the way data received
bool parseGlimpseStart(const std::string &logLine, HandlerCore::Common::LogEntry &)
void setOrderBookIdFilter(const std::set< OrderBookId > &orderBookIdFilter)
UInt16 packetMessageCount
Message sequence number.
static HandlerCore::MarketData::FE::MultithreadedFeedEngine * getFeedEngine(FeedEngine &feedEngine)
ReplayListener * listener
Instance to notify about replay events.
UInt64 SequenceNumber
Alias for Sequence Number type.
const void * binary() const
Message content.
unsigned int maxRetransmissionResponseTime
Maximum timeout to wait retransmission responses (sec), default value is 30 sec.
Represents timestamp without time-zone information.
Log binary data of received packets, applied only for Info log level and below.
unsigned port
Port number.
void start()
Start handler.
FE::MulticastFeedLayout::Enum getRetransmissionFeedLayout(const HandlerSettings &settings)
FeedDescriptor glimpseFeed
GLIMPSE feed.
MessageSize binarySize() const
Size of message.
OnixS::Licensing::ProductId productId()
Identifies errors of generic nature.
UInt32 lostPacketWaitTime
void stop(bool wait)
Stop handler.
const Byte * session
Session Id.
FeedDescriptor itchFeed
ITCH Live feed.
void commonLog(int logLevel, const char *msg, size_t length)
User's common log.
OnixS::ItchCore::MoldUDP64::PacketProcessor< HandlerImpl > RealtimePacketProcessor
void bindFeedEngine(FeedEngine &feedEngine)
void onPrepareLogPlayer(HandlerCore::Common::LogPlayer *logPlayer)
std::string toString() const
Returns the string representation.
std::string toString() const
Returns the string representation.
void invokeError(const std::string &description) override
Invoke helpers.
std::string networkInterfaceB
const char * projectName()
std::string networkInterface
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
std::string networkInterfaceA
Defines ONIXS_SGXTITAN_ITCH_API which affect logs replay.
virtual void onStateChanged(State::Enum newState)
bool valid() const
Returns true if the descriptor points to valid ip address.
UInt8 Byte
Alias for Byte.
virtual void onReplayError(const std::string &errorDescription)=0
Is called once error occurs while replaying logs.
ServiceDescriptor serviceB
Service B.
ListenerHolder & listenerHolder()
Returns commons services as shared object.
virtual void onReplayFinished()=0
Is called once all the logs are replayed.
bool buildInternalOrderBooks
Build internal books.
UInt64 messageSeqNum
Message sequence number.
Timestamp packetReceptionTime
Time when the packet was received by Handler from UDP, in system ticks,.
const FeedEngineSettings & settings() const
Settings used define behavior of given instance.
const char * projectDescription()
std::string address
Ip address.
Handler configuration settings.
FeedDescriptor retransmissionFeed
Retransmission feed.
ServiceDescriptor serviceA
Service A.
void invokeWarning(const std::string &description) override
unsigned int retransmissionMaxPacketNumber
Lost packets threshold when the Handler prefers retransmission.
Manages processing machinery for market data received from feeds.
UInt32 outOfOrderPacketMaxInterval