25 #include <OnixS/HandlerCore/OrderBook.h> 27 #include <OnixS/ItchCore/MoldUDP64/PacketHeader.h> 28 #include <OnixS/ItchCore/MoldUDP64/PacketHelper.h> 38 #include <boost/bind.hpp> 40 #include <util/String.h> 44 ONIXS_HANDLER_NAMESPACE_BEGIN
46 template <
typename MessageType>
49 BOOST_ASSERT(message !=
nullptr);
51 return marketOrderIds.find(message->orderId()) != marketOrderIds.end();
56 BOOST_ASSERT(message !=
nullptr);
61 MessageRepository::MessageRepository(
64 const Logging::LogFacility* parent,
65 HandlerCore::Common::HandlerLogger* logger,
67 Logging::LogFacility(
"MessageRepository", parent,
OnixS::Logging::LOG_LEVEL_DEBUG),
69 listenerHolder_(listenerHolder),
70 buildOrderBooks_(settings.buildInternalOrderBooks),
72 expectedMessageSeqNum_(1),
76 BOOST_ASSERT(logger_ !=
nullptr);
77 log(ONIXS_LOG_DEBUG[
this] <<
"Skip gapped messages: " << (skipGaps_ ?
"yes" :
"no"));
79 checkOrderBookListener();
86 template <
typename MessageType>
89 BOOST_ASSERT(message !=
nullptr);
94 if (filter.find(message->orderBookId()) == filter.end())
96 log(ONIXS_LOG_DEBUG[
this] <<
"Message filtered by Instrument");
105 orderBookIdFilter_.clear();
106 orderBookIdFilter_.insert(orderBookIdFilter.begin(), orderBookIdFilter.end());
109 void MessageRepository::checkOrderBookListener()
112 listenerHolder_.
invokeWarning(
"Order books are being built, but Order Book Listener is not registered");
115 listenerHolder_.
invokeWarning(
"Order Book Listener is registered, but order books are not being built");
117 if (skipGaps_ && buildOrderBooks_)
118 listenerHolder_.
invokeWarning(
"impossible to build order books because gapped messages being skipped");
121 template <
typename MessageType>
124 BOOST_ASSERT(message !=
nullptr);
126 MessageType::validateSize(message->
binarySize());
132 log(ONIXS_LOG_DEBUG << *msg <<
" received");
134 catch (
const std::exception& ex)
136 const std::string what = std::string(
"Exception during message serialisation: ") + ex.what();
138 log(ONIXS_LOG_WARN[
this] << what);
142 const std::string what =
"Unknown exception during message serialisation.";
144 log(ONIXS_LOG_WARN[
this] << what);
152 log(ONIXS_LOG_DEBUG[
this] <<
"reset");
154 Guard guard(messageQueueLock_);
156 packetQueue_.clear();
157 expectedMessageSeqNum_ = 1;
164 if (buildOrderBooks_)
165 orderBookHolder_->reset();
167 marketOrderIds_.clear();
174 if (buildOrderBooks_)
176 orderBookHolder_->onBookOutOfDate(
179 orderBookHolder_->snapshotRecoveryStarted();
189 if (buildOrderBooks_)
190 orderBookHolder_->snapshotRecoveryStarted();
196 OnixS::HandlerCore::MarketData::FE::PacketContainer& packetContainer,
197 bool runProcessCachedMessages)
201 Guard guard(messageQueueLock_);
203 const MoldUDP64::PacketHeader* packetHeader = &MoldUDP64::PacketHelper::header(packetContainer);
205 if (packetHeader->sequenceNumber() == expectedMessageSeqNum_)
207 const char* data =
static_cast<char*
>(packetContainer.data());
208 size_t offset =
sizeof(MoldUDP64::PacketHeader);
210 for (
unsigned i = 0; i < packetHeader->messageCount(); ++i)
213 dataSource.
messageSeqNum = packetHeader->sequenceNumber() + i;
215 const MoldUDP64::MessageBlockHeader*
const messageBlockHeader =
reinterpret_cast<const MoldUDP64::MessageBlockHeader*
>(data + offset);
216 offset +=
sizeof(MoldUDP64::MessageBlockHeader);
221 ++expectedMessageSeqNum_;
223 offset += messageBlockHeader->length();
226 if(runProcessCachedMessages)
227 processCachedMessages();
229 else if (packetHeader->sequenceNumber() > expectedMessageSeqNum_)
231 log(ONIXS_LOG_DEBUG[
this] << packetHeader->sequenceNumber() <<
" packet cached, expected: " << expectedMessageSeqNum_);
232 packetQueue_.push(dataSource, packetContainer);
236 void MessageRepository::processCachedMessages()
238 if (!packetQueue_.empty())
240 log(ONIXS_LOG_DEBUG[
this] <<
"Processing cached messages");
242 while (!packetQueue_.empty())
244 DataSource& dataSource = packetQueue_.front().context();
248 log(ONIXS_LOG_DEBUG[
this] << dataSource.
messageSeqNum <<
" packet processed from cache");
250 onPacket(dataSource, packetQueue_.front().packetContainer(),
false);
252 packetQueue_.front().packetContainer().release();
259 log(ONIXS_LOG_DEBUG[
this] << dataSource.
messageSeqNum <<
" Unexpected packet - dropped, expected: " << expectedMessageSeqNum_);
261 packetQueue_.front().packetContainer().release();
266 BOOST_ASSERT(packetQueue_.empty());
267 packetQueue_.clear();
273 Guard guard(messageQueueLock_);
275 const Byte messageType = message->
type();
281 listenerHolder_.
invokeTime(checkDowncastMsg<SecondsMsg>(message), dataSource);
312 const SystemEventMsg*
const msg = checkDowncastMsg<SystemEventMsg>(message);
318 const OrderBookStateMsg*
const msg = checkDowncastMsg<OrderBookStateMsg>(message);
327 const AddOrderMsg*
const msg = checkDowncastMsg<AddOrderMsg>(message);
334 if (buildOrderBooks_)
337 marketOrderIds_.insert(msg->
orderId());
339 orderBookHolder_->onOrderAdd(*msg);
346 const OrderExecutedMsg*
const msg = checkDowncastMsg<OrderExecutedMsg>(message);
353 if (buildOrderBooks_)
356 marketOrderIds_.erase(msg->
orderId());
358 orderBookHolder_->onOrderExecuted(*msg);
372 if (buildOrderBooks_)
380 orderBookHolder_->onOrderExecutedWithPrice(*msg);
387 const OrderReplaceMsg*
const msg = checkDowncastMsg<OrderReplaceMsg>(message);
394 if (buildOrderBooks_ && !
isMarketOrder(msg, marketOrderIds_))
395 orderBookHolder_->onOrderReplace(*msg);
401 const OrderDeleteMsg*
const msg = checkDowncastMsg<OrderDeleteMsg>(message);
408 if (buildOrderBooks_)
411 marketOrderIds_.erase(msg->
orderId());
413 orderBookHolder_->onOrderDelete(*msg);
418 const TradeMsg*
const msg = checkDowncastMsg<TradeMsg>(message);
436 const EndOfSnapshotMsg*
const msg = checkDowncastMsg<EndOfSnapshotMsg>(message);
439 orderBookHolder_->snapshotRecoveryFinished();
444 processCachedMessages();
447 throw OperationException(__FUNCTION__, OnixS::Util::format(
"Unsupported message type (messageType=%d).", messageType).c_str());
449 catch (
const std::exception& ex)
452 log(ONIXS_LOG_WARN[
this] << ex.what());
456 const std::string what =
"Unhandled exception in processMessage method";
458 log(ONIXS_LOG_WARN[
this] << what);
464 BOOST_ASSERT(expectedSeqNum < receivedSeqNum);
469 listenerHolder_.
invokeGap(expectedSeqNum, receivedSeqNum);
474 BOOST_ASSERT(seqNum1 < seqNum2);
475 BOOST_ASSERT(seqNum1 != 0);
477 BOOST_ASSERT(seqNum1 == expectedMessageSeqNum_);
479 Guard guard(messageQueueLock_);
481 if (expectedMessageSeqNum_ < seqNum2)
482 expectedMessageSeqNum_ = seqNum2;
484 log(ONIXS_LOG_DEBUG[
this] <<
"Packets " << seqNum1 <<
"-" << seqNum2 <<
" is skipped");
485 processCachedMessages();
492 if (buildOrderBooks_)
499 log(ONIXS_LOG_DEBUG[
this] <<
"Order book updated:" <<
"\n" << book);
509 ONIXS_HANDLER_NAMESPACE_END
Origin origin
the way data received
void invokeOrderExecutedWithPrice(const OrderExecutedWithPriceMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeAddOrder(const AddOrderMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeInactivity() ONIXS_NOEXCEPT
void invokeGap(SequenceNumber expectedSeqNum, SequenceNumber receivedSeqNum) ONIXS_NOEXCEPT
void invokeOrderReplace(const OrderReplaceMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeOrderBookState(const OrderBookStateMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
bool isMarketOrder(const MessageType *message, const std::set< OrderId > &marketOrderIds)
UInt64 SequenceNumber
Alias for Sequence Number type.
void invokeSnapshotRecoveryFinished() ONIXS_NOEXCEPT
void invokeEquilibriumPriceUpdate(const EquilibriumPriceUpdateMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeOrderBookUpdated(const OrderBookInternal &book) ONIXS_NOEXCEPT
void invokeOrderBookDirectory(const OrderBookDirectoryMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeOrderBookOutOfDate(const OrderBookInternal &book) ONIXS_NOEXCEPT
void onSnapshotRecoveryRestarted()
Price price() const
The display price of the new order.
MessageType::Enum type() const
Type.
void invokeTime(const SecondsMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
MessageSize binarySize() const
Size of message.
UInt64 sequenceNumber() const
Human readable long name of Instrument series.
void invokeOrderDelete(const OrderDeleteMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeSnapshotRecoveryStarted() ONIXS_NOEXCEPT
void processMessage(const DataSource &dataSource, const IncomingMessage *message)
void invokeTrade(const TradeMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeWarning(const std::string &description) ONIXS_NOEXCEPT
bool filterByInstrument(const MessageType *message, const std::set< OrderBookId > &filter)
OrderId orderId() const
The identifier assigned to the new order.
void setOrderBookIdFilter(const std::set< OrderBookId > &orderBookIdFilter)
void onGap(SequenceNumber expectedSeqNum, SequenceNumber receivedSeqNum)
UInt8 packetMessageNumber
Number of message in packet.
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
bool isNull() const
Check whether the value is nullPrice.
void onPacket(DataSource &dataSource, OnixS::HandlerCore::MarketData::FE::PacketContainer &packetContainer, bool runProcessCachedMessages=true)
Log updated order book, applied only for Debug log level.
OrderId orderId() const
The order ID is associated with the executed order.
void onSnapshotRecoveryStarted()
void log(const TMsg &msg)
Log helper.
void invokeOrderExecuted(const OrderExecutedMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
UInt8 Byte
Alias for Byte.
void invokeCombinationOrderBookLeg(const CombinationOrderBookLegMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void skipPackets(SequenceNumber seqNum1, SequenceNumber seqNum2)
void invokeTickSizeTableEntry(const TickSizeTableEntryMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
UInt64 messageSeqNum
Message sequence number.
OrderId orderId() const
The ID of the order being deleted.
bool hasOrderBookListener() const ONIXS_NOEXCEPT
void invokeSystemEvent(const SystemEventMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
virtual ~MessageRepository()
bool isMarketOrder(const AddOrderMsg *message)
Handler configuration settings.