OnixS C++ SGX Titan ITCH Market Data Handler  1.2.2
API documentation
MessageRepository.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 
20 #include "MessageRepository.h"
21 
24 
25 #include <OnixS/HandlerCore/OrderBook.h>
26 
27 #include <OnixS/ItchCore/MoldUDP64/PacketHeader.h>
28 #include <OnixS/ItchCore/MoldUDP64/PacketHelper.h>
29 
30 #include "ListenerHolder.h"
31 #include "HandlerLogger.h"
32 
33 #include "Formatting.h"
34 #include "OrderBookHolder.h"
36 #include "Formatting.Messages.h"
37 
38 #include <boost/bind.hpp>
39 
40 #include <util/String.h>
41 
42 using namespace OnixS::ItchCore;
43 
44 ONIXS_HANDLER_NAMESPACE_BEGIN
45 
46  template <typename MessageType>
47  bool isMarketOrder(const MessageType* message, const std::set<OrderId>& marketOrderIds)
48  {
49  BOOST_ASSERT(message != nullptr);
50 
51  return marketOrderIds.find(message->orderId()) != marketOrderIds.end();
52  }
53 
54  bool isMarketOrder(const AddOrderMsg* message)
55  {
56  BOOST_ASSERT(message != nullptr);
57 
58  return message->price().isNull();
59  }
60 
61  MessageRepository::MessageRepository(
62  ListenerHolder& listenerHolder,
63  const HandlerSettings& settings,
64  const Logging::LogFacility* parent,
65  HandlerCore::Common::HandlerLogger* logger,
66  OrderBookAllocator* bookAllocator) :
67  Logging::LogFacility("MessageRepository", parent, OnixS::Logging::LOG_LEVEL_DEBUG),
68  settings_(settings),
69  listenerHolder_(listenerHolder),
70  buildOrderBooks_(settings.buildInternalOrderBooks),
71  logger_(logger),
72  expectedMessageSeqNum_(1),
73  skipGaps_(false),
74  orderBookHolder_(buildOrderBooks_ ? new OrderBookHolder(boost::bind(&MessageRepository::onOrderBookUpdated, this, _1), bookAllocator, 10) : nullptr)
75  {
76  BOOST_ASSERT(logger_ != nullptr);
77  log(ONIXS_LOG_DEBUG[this] << "Skip gapped messages: " << (skipGaps_ ? "yes" : "no"));
78 
79  checkOrderBookListener();
80  }
81 
83  {
84  }
85 
86  template <typename MessageType>
87  bool MessageRepository::filterByInstrument(const MessageType* message, const std::set<OrderBookId>& filter)
88  {
89  BOOST_ASSERT(message != nullptr);
90 
91  if (filter.empty())
92  return false;
93 
94  if (filter.find(message->orderBookId()) == filter.end())
95  {
96  log(ONIXS_LOG_DEBUG[this] << "Message filtered by Instrument");
97  return true;
98  }
99 
100  return false;
101  }
102 
103  void MessageRepository::setOrderBookIdFilter(const std::set<OrderBookId>& orderBookIdFilter)
104  {
105  orderBookIdFilter_.clear();
106  orderBookIdFilter_.insert(orderBookIdFilter.begin(), orderBookIdFilter.end());
107  }
108 
109  void MessageRepository::checkOrderBookListener()
110  {
111  if (buildOrderBooks_ && !listenerHolder_.hasOrderBookListener())
112  listenerHolder_.invokeWarning("Order books are being built, but Order Book Listener is not registered");
113 
114  if (listenerHolder_.hasOrderBookListener() && !buildOrderBooks_)
115  listenerHolder_.invokeWarning("Order Book Listener is registered, but order books are not being built");
116 
117  if (skipGaps_ && buildOrderBooks_)
118  listenerHolder_.invokeWarning("impossible to build order books because gapped messages being skipped");
119  }
120 
121  template <typename MessageType>
122  inline const MessageType* MessageRepository::checkDowncastMsg(const IncomingMessage* message)
123  {
124  BOOST_ASSERT(message != nullptr);
125 
126  MessageType::validateSize(message->binarySize());
127 
128  const MessageType* const msg = static_cast<const MessageType*>(message);
129 
130  try
131  {
132  log(ONIXS_LOG_DEBUG << *msg << " received");
133  }
134  catch (const std::exception& ex)
135  {
136  const std::string what = std::string("Exception during message serialisation: ") + ex.what();
137  listenerHolder_.invokeWarning(what);
138  log(ONIXS_LOG_WARN[this] << what);
139  }
140  catch (...)
141  {
142  const std::string what = "Unknown exception during message serialisation.";
143  listenerHolder_.invokeWarning(what);
144  log(ONIXS_LOG_WARN[this] << what);
145  }
146 
147  return msg;
148  }
149 
151  {
152  log(ONIXS_LOG_DEBUG[this] << "reset");
153 
154  Guard guard(messageQueueLock_);
155 
156  packetQueue_.clear();
157  expectedMessageSeqNum_ = 1;
158 
160  }
161 
163  {
164  if (buildOrderBooks_)
165  orderBookHolder_->reset();
166 
167  marketOrderIds_.clear();
168  }
169 
171  {
172  reset();
173 
174  if (buildOrderBooks_)
175  {
176  orderBookHolder_->onBookOutOfDate(
177  boost::bind(&ListenerHolder::invokeOrderBookOutOfDate, &listenerHolder_, _1));
178 
179  orderBookHolder_->snapshotRecoveryStarted();
180  }
181 
182  listenerHolder_.invokeSnapshotRecoveryStarted();
183  }
184 
186  {
188 
189  if (buildOrderBooks_)
190  orderBookHolder_->snapshotRecoveryStarted();
191 
192  listenerHolder_.invokeSnapshotRecoveryStarted();
193  }
194 
196  OnixS::HandlerCore::MarketData::FE::PacketContainer& packetContainer,
197  bool runProcessCachedMessages)
198  {
199  BOOST_ASSERT((dataSource.origin == DataSource::replay) || (dataSource.origin == DataSource::multicast));
200 
201  Guard guard(messageQueueLock_);
202 
203  const MoldUDP64::PacketHeader* packetHeader = &MoldUDP64::PacketHelper::header(packetContainer);
204 
205  if (packetHeader->sequenceNumber() == expectedMessageSeqNum_)
206  {
207  const char* data = static_cast<char*>(packetContainer.data());
208  size_t offset = sizeof(MoldUDP64::PacketHeader);
209 
210  for (unsigned i = 0; i < packetHeader->messageCount(); ++i)
211  {
212  dataSource.packetMessageNumber = static_cast<UInt8>(i);
213  dataSource.messageSeqNum = packetHeader->sequenceNumber() + i;
214 
215  const MoldUDP64::MessageBlockHeader* const messageBlockHeader = reinterpret_cast<const MoldUDP64::MessageBlockHeader*>(data + offset);
216  offset += sizeof(MoldUDP64::MessageBlockHeader);
217 
218  IncomingMessage message = IncomingMessage(data + offset, messageBlockHeader->length());
219 
220  processMessage(dataSource, &message);
221  ++expectedMessageSeqNum_;
222 
223  offset += messageBlockHeader->length();
224  }
225 
226  if(runProcessCachedMessages)
227  processCachedMessages();
228  }
229  else if (packetHeader->sequenceNumber() > expectedMessageSeqNum_)
230  {
231  log(ONIXS_LOG_DEBUG[this] << packetHeader->sequenceNumber() << " packet cached, expected: " << expectedMessageSeqNum_);
232  packetQueue_.push(dataSource, packetContainer);
233  }
234  }
235 
236  void MessageRepository::processCachedMessages()
237  {
238  if (!packetQueue_.empty())
239  {
240  log(ONIXS_LOG_DEBUG[this] << "Processing cached messages");
241 
242  while (!packetQueue_.empty())
243  {
244  DataSource& dataSource = packetQueue_.front().context();
245 
246  if (dataSource.messageSeqNum == expectedMessageSeqNum_)
247  {
248  log(ONIXS_LOG_DEBUG[this] << dataSource.messageSeqNum << " packet processed from cache");
249 
250  onPacket(dataSource, packetQueue_.front().packetContainer(), false);
251 
252  packetQueue_.front().packetContainer().release();
253  packetQueue_.pop();
254  }
255  else if (dataSource.messageSeqNum > expectedMessageSeqNum_)
256  return;
257  else
258  {
259  log(ONIXS_LOG_DEBUG[this] << dataSource.messageSeqNum << " Unexpected packet - dropped, expected: " << expectedMessageSeqNum_);
260 
261  packetQueue_.front().packetContainer().release();
262  packetQueue_.pop();
263  }
264  }
265 
266  BOOST_ASSERT(packetQueue_.empty());
267  packetQueue_.clear();
268  }
269  }
270 
271  void MessageRepository::processMessage(const DataSource& dataSource, const IncomingMessage* message)
272  {
273  Guard guard(messageQueueLock_);
274 
275  const Byte messageType = message->type();
276 
277  try
278  {
279  if (messageType == MessageType::Seconds)
280  {
281  listenerHolder_.invokeTime(checkDowncastMsg<SecondsMsg>(message), dataSource);
282  }
283  else if (messageType == MessageType::OrderBookDirectory)
284  {
285  const OrderBookDirectoryMsg* const msg = checkDowncastMsg<OrderBookDirectoryMsg>(message);
286 
287  if(filterByInstrument(msg, orderBookIdFilter_))
288  return;
289 
290  listenerHolder_.invokeOrderBookDirectory(msg, dataSource);
291  }
292  else if (messageType == MessageType::CombinationOrderBookLeg)
293  {
294  const CombinationOrderBookLegMsg* const msg = checkDowncastMsg<CombinationOrderBookLegMsg>(message);
295 
296  if(filterByInstrument(msg, orderBookIdFilter_))
297  return;
298 
299  listenerHolder_.invokeCombinationOrderBookLeg(msg, dataSource);
300  }
301  else if (messageType == MessageType::TickSizeTableEntry)
302  {
303  const TickSizeTableEntryMsg* const msg = checkDowncastMsg<TickSizeTableEntryMsg>(message);
304 
305  if(filterByInstrument(msg, orderBookIdFilter_))
306  return;
307 
308  listenerHolder_.invokeTickSizeTableEntry(msg, dataSource);
309  }
310  else if (messageType == MessageType::SystemEvent)
311  {
312  const SystemEventMsg* const msg = checkDowncastMsg<SystemEventMsg>(message);
313 
314  listenerHolder_.invokeSystemEvent(msg, dataSource);
315  }
316  else if (messageType == MessageType::OrderBookState)
317  {
318  const OrderBookStateMsg* const msg = checkDowncastMsg<OrderBookStateMsg>(message);
319 
320  if(filterByInstrument(msg, orderBookIdFilter_))
321  return;
322 
323  listenerHolder_.invokeOrderBookState(msg, dataSource);
324  }
325  else if (messageType == MessageType::AddOrder)
326  {
327  const AddOrderMsg* const msg = checkDowncastMsg<AddOrderMsg>(message);
328 
329  if(filterByInstrument(msg, orderBookIdFilter_))
330  return;
331 
332  listenerHolder_.invokeAddOrder(msg, dataSource);
333 
334  if (buildOrderBooks_)
335  {
336  if (isMarketOrder(msg))
337  marketOrderIds_.insert(msg->orderId());
338  else
339  orderBookHolder_->onOrderAdd(*msg);
340  }
341  }
342  else if (messageType == MessageType::OrderExecuted)
343  {
344  BOOST_ASSERT(dataSource.origin != DataSource::glimpse);
345 
346  const OrderExecutedMsg* const msg = checkDowncastMsg<OrderExecutedMsg>(message);
347 
348  if(filterByInstrument(msg, orderBookIdFilter_))
349  return;
350 
351  listenerHolder_.invokeOrderExecuted(msg, dataSource);
352 
353  if (buildOrderBooks_)
354  {
355  if (isMarketOrder(msg, marketOrderIds_))
356  marketOrderIds_.erase(msg->orderId());
357  else
358  orderBookHolder_->onOrderExecuted(*msg);
359  }
360  }
361  else if (messageType == MessageType::OrderExecutedWithPrice)
362  {
363  BOOST_ASSERT(dataSource.origin != DataSource::glimpse);
364 
365  const OrderExecutedWithPriceMsg* const msg = checkDowncastMsg<OrderExecutedWithPriceMsg>(message);
366 
367  if(filterByInstrument(msg, orderBookIdFilter_))
368  return;
369 
370  listenerHolder_.invokeOrderExecutedWithPrice(msg, dataSource);
371 
372  if (buildOrderBooks_)
373  {
374  if (isMarketOrder(msg, marketOrderIds_))
375  {
376  //skipped, can see a few OrderExecutedWithPrice for the market orders
377  //marketOrderIds_.erase(msg->orderId());
378  }
379  else
380  orderBookHolder_->onOrderExecutedWithPrice(*msg);
381  }
382  }
383  else if (messageType == MessageType::OrderReplace)
384  {
385  BOOST_ASSERT(dataSource.origin != DataSource::glimpse);
386 
387  const OrderReplaceMsg* const msg = checkDowncastMsg<OrderReplaceMsg>(message);
388 
389  if(filterByInstrument(msg, orderBookIdFilter_))
390  return;
391 
392  listenerHolder_.invokeOrderReplace(msg, dataSource);
393 
394  if (buildOrderBooks_ && !isMarketOrder(msg, marketOrderIds_))
395  orderBookHolder_->onOrderReplace(*msg);
396  }
397  else if (messageType == MessageType::OrderDelete)
398  {
399  BOOST_ASSERT(dataSource.origin != DataSource::glimpse);
400 
401  const OrderDeleteMsg* const msg = checkDowncastMsg<OrderDeleteMsg>(message);
402 
403  if(filterByInstrument(msg, orderBookIdFilter_))
404  return;
405 
406  listenerHolder_.invokeOrderDelete(msg, dataSource);
407 
408  if (buildOrderBooks_)
409  {
410  if (isMarketOrder(msg, marketOrderIds_))
411  marketOrderIds_.erase(msg->orderId());
412  else
413  orderBookHolder_->onOrderDelete(*msg);
414  }
415  }
416  else if (messageType == MessageType::Trade)
417  {
418  const TradeMsg* const msg = checkDowncastMsg<TradeMsg>(message);
419 
420  if(filterByInstrument(msg, orderBookIdFilter_))
421  return;
422 
423  listenerHolder_.invokeTrade(msg, dataSource);
424  }
425  else if (messageType == MessageType::EquilibriumPriceUpdate)
426  {
427  const EquilibriumPriceUpdateMsg* const msg = checkDowncastMsg<EquilibriumPriceUpdateMsg>(message);
428 
429  if(filterByInstrument(msg, orderBookIdFilter_))
430  return;
431 
432  listenerHolder_.invokeEquilibriumPriceUpdate(msg, dataSource);
433  }
434  else if (static_cast<GlimpseMessageType::Enum>(messageType) == (GlimpseMessageType::EndOfSnapshot))
435  {
436  const EndOfSnapshotMsg* const msg = checkDowncastMsg<EndOfSnapshotMsg>(message);
437 
438  if(buildOrderBooks_)
439  orderBookHolder_->snapshotRecoveryFinished();
440 
441  listenerHolder_.invokeSnapshotRecoveryFinished();
442 
443  expectedMessageSeqNum_ = msg->sequenceNumber();
444  processCachedMessages();
445  }
446  else
447  throw OperationException(__FUNCTION__, OnixS::Util::format("Unsupported message type (messageType=%d).", messageType).c_str());
448  }
449  catch (const std::exception& ex)
450  {
451  listenerHolder_.invokeWarning(ex.what());
452  log(ONIXS_LOG_WARN[this] << ex.what());
453  }
454  catch (...)
455  {
456  const std::string what = "Unhandled exception in processMessage method";
457  listenerHolder_.invokeWarning(what);
458  log(ONIXS_LOG_WARN[this] << what);
459  }
460  }
461 
462  void MessageRepository::onGap(SequenceNumber expectedSeqNum, SequenceNumber receivedSeqNum)
463  {
464  BOOST_ASSERT(expectedSeqNum < receivedSeqNum);
465 
466  if (skipGaps_)
467  skipPackets(expectedSeqNum, receivedSeqNum);
468 
469  listenerHolder_.invokeGap(expectedSeqNum, receivedSeqNum);
470  }
471 
473  {
474  BOOST_ASSERT(seqNum1 < seqNum2);
475  BOOST_ASSERT(seqNum1 != 0);
476 
477  BOOST_ASSERT(seqNum1 == expectedMessageSeqNum_);
478 
479  Guard guard(messageQueueLock_);
480 
481  if (expectedMessageSeqNum_ < seqNum2)
482  expectedMessageSeqNum_ = seqNum2;
483 
484  log(ONIXS_LOG_DEBUG[this] << "Packets " << seqNum1 << "-" << seqNum2 << " is skipped");
485  processCachedMessages();
486  }
487 
489  {
490  listenerHolder_.invokeInactivity();
491 
492  if (buildOrderBooks_)
493  orderBookHolder_->onBookOutOfDate(boost::bind(&ListenerHolder::invokeOrderBookOutOfDate, &listenerHolder_, _1));
494  }
495 
496  void MessageRepository::onOrderBookUpdated(const OrderBookInternal& book)
497  {
498  if (settings_.logSettings & LogSettings::LogBooks)
499  log(ONIXS_LOG_DEBUG[this] << "Order book updated:" << "\n" << book);
500 
501  listenerHolder_.invokeOrderBookUpdated(book);
502  }
503 
505  {
506  skipGaps_ = false;
507  }
508 
509 ONIXS_HANDLER_NAMESPACE_END
Origin origin
the way data received
Definition: Defines.h:77
void invokeOrderExecutedWithPrice(const OrderExecutedWithPriceMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeAddOrder(const AddOrderMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeGap(SequenceNumber expectedSeqNum, SequenceNumber receivedSeqNum) ONIXS_NOEXCEPT
void invokeOrderReplace(const OrderReplaceMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeOrderBookState(const OrderBookStateMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
bool isMarketOrder(const MessageType *message, const std::set< OrderId > &marketOrderIds)
UInt64 SequenceNumber
Alias for Sequence Number type.
Definition: Defines.h:37
void invokeEquilibriumPriceUpdate(const EquilibriumPriceUpdateMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeOrderBookUpdated(const OrderBookInternal &book) ONIXS_NOEXCEPT
void invokeOrderBookDirectory(const OrderBookDirectoryMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeOrderBookOutOfDate(const OrderBookInternal &book) ONIXS_NOEXCEPT
Price price() const
The display price of the new order.
Definition: AddOrderMsg.h:71
void invokeTime(const SecondsMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
MessageSize binarySize() const
Size of message.
UInt64 sequenceNumber() const
Human readable long name of Instrument series.
void invokeOrderDelete(const OrderDeleteMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void processMessage(const DataSource &dataSource, const IncomingMessage *message)
void invokeTrade(const TradeMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeWarning(const std::string &description) ONIXS_NOEXCEPT
bool filterByInstrument(const MessageType *message, const std::set< OrderBookId > &filter)
OrderId orderId() const
The identifier assigned to the new order.
Definition: AddOrderMsg.h:40
void setOrderBookIdFilter(const std::set< OrderBookId > &orderBookIdFilter)
void onGap(SequenceNumber expectedSeqNum, SequenceNumber receivedSeqNum)
UInt8 packetMessageNumber
Number of message in packet.
Definition: Defines.h:68
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
bool isNull() const
Check whether the value is nullPrice.
Definition: Price.h:207
void onPacket(DataSource &dataSource, OnixS::HandlerCore::MarketData::FE::PacketContainer &packetContainer, bool runProcessCachedMessages=true)
Log updated order book, applied only for Debug log level.
Definition: LogSettings.h:81
OrderId orderId() const
The order ID is associated with the executed order.
void invokeOrderExecuted(const OrderExecutedMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
UInt8 Byte
Alias for Byte.
Definition: Memory.h:30
void invokeCombinationOrderBookLeg(const CombinationOrderBookLegMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void skipPackets(SequenceNumber seqNum1, SequenceNumber seqNum2)
void invokeTickSizeTableEntry(const TickSizeTableEntryMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
UInt64 messageSeqNum
Message sequence number.
Definition: Defines.h:71
OrderId orderId() const
The ID of the order being deleted.
bool hasOrderBookListener() const ONIXS_NOEXCEPT
void invokeSystemEvent(const SystemEventMsg *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
bool isMarketOrder(const AddOrderMsg *message)