OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  8.1.0
API documentation
EmdiProductDataRepository.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 #include <util/Converter.h>
20 
22 
24 #include "MessageOperator.h"
25 #include "Formatting.h"
26 
27 
28 namespace OnixS {
29 namespace Eurex {
30 namespace MarketData {
31 namespace Implementation {
32 
34 {
35 public :
37  : SnapshotTrade(id, sequenceNumber, tradeEntry) {}
38 };
39 
41 {
42 public :
44  : IncrementalTrade(id, sequenceNumber, tradeEntry) {}
45 };
46 
47 
49 using namespace HandlerCore::Common;
50 
52  MarketSegmentId marketSegmentId,
53  MarketDepth marketDepth,
54  bool buildInternalOrderBooks,
55  LogFacility* parent,
56  HandlerLogger* logger,
57  ListenerHolder* listenerHolder,
58  const HandlerSettings& settings,
59  SecurityIdFilter filterSecurityId)
60  : base(i2str(marketSegmentId), parent, Logging::LOG_LEVEL_DEBUG)
61  , logger_(logger)
62  , listenerHolder_(listenerHolder)
63  , settings_(settings)
64  , marketSegmentId_(marketSegmentId)
65  , marketDepth_(marketDepth)
66  , buildInternalOrderBooks_(buildInternalOrderBooks)
67  , filterSecurityId_(filterSecurityId)
68  , nextMsgSeqNum_(1)
69  , synchronized_(true)
70  , cachedIncremetalQueue_(settings_.messagePoolSize / 4)
71 {
72  BOOST_ASSERT((!buildInternalOrderBooks_) || marketDepth_ != 0);
73 }
74 
76 {
77 }
78 
79 template<typename MessageType>
80 bool EmdiProductDataRepository::applyDepth(const MessageType& wrapper)
81 {
82  if(!buildInternalOrderBooks_)
83  return false;
84 
85  bool invalidMessage = false;
86 
87  try
88  {
89  updateBook(wrapper);
90  }
91  catch(const std::exception& ex)
92  {
93  invalidMessage = true;
94  reportWarning(TextBuilder() << "Message cannot be applied: " << ex.what());
95  }
96  catch(...)
97  {
98  invalidMessage = true;
99  reportWarning(TextBuilder() << "Message cannot be applied: unknown exception");
100  }
101 
102  if(invalidMessage)
103  {
104  //treat as lost
105  nextMsgSeqNum_ = UndefinedSeqNum;
106  synchronized_ = false;
107  listenerHolder_->invokeDepthReset(marketSegmentId_);
108  }
109 
110  return invalidMessage;
111 }
112 
113 void EmdiProductDataRepository::processDepthSnapshot(const CORE::Message& msg, const DataSource& dataSource)
114 {
115  log(ONIXS_LOG_DEBUG[this] << msg);
116 
117  UInt32 lastMsgSeqNumProcessed;
118  msg[Tags::LastMsgSeqNumProcessed].convertTo(lastMsgSeqNumProcessed);
119 
120  if(nextMsgSeqNum_ < lastMsgSeqNumProcessed + 1)
121  synchronized_ = false;
122 
123  if(!synchronized_)
124  {
125  nextMsgSeqNum_ = lastMsgSeqNumProcessed + 1;
126 
127  const DepthSnapshotWrapper wrapper(msg);
128  listenerHolder_->invokeDepthSnapshot(&wrapper, dataSource);
129 
130  applyDepth(wrapper);
131  }
132 }
133 
134 void EmdiProductDataRepository::dispatchIncremental(
135  const CORE::Message& msg,
136  TemplateId templateId,
137  const DataSource& dataSource,
138  bool* gapDetected)
139 {
140  switch (templateId)
141  {
143  {
144  const DepthIncrementalWrapper wrapper(msg);
145  listenerHolder_->invokeDepthIncremental(&wrapper, dataSource);
146 
147  const bool invalidMessage = applyDepth(wrapper);
148 
149  if(invalidMessage)
150  {
151  if(gapDetected)
152  *gapDetected = true;
153 
154  return;
155  }
156  }
157  break;
158 
160  {
161  const TopOfBookImpliedWrapper wrapper(msg);
162  listenerHolder_->invokeTopOfBookImplied(&wrapper, dataSource);
163  }
164  break;
165 
167  {
168  const ProductStateChangeWrapper wrapper(msg);
169  listenerHolder_->invokeProductStateChange(&wrapper, dataSource);
170  }
171  break;
172 
174  {
175  const MassInstrumentStateChangeWrapper wrapper(msg);
176  listenerHolder_->invokeMassInstrumentStateChange(&wrapper, dataSource);
177  }
178  break;
179 
181  {
182  const InstrumentStateChangeWrapper wrapper(msg);
183  listenerHolder_->invokeInstrumentStateChange(&wrapper, dataSource);
184  }
185  break;
186 
188  {
189  const QuoteRequestWrapper wrapper(msg);
190  listenerHolder_->invokeQuoteRequest(&wrapper, dataSource);
191  }
192  break;
193 
195  {
196  const CrossRequestWrapper wrapper(msg);
197  listenerHolder_->invokeCrossRequest(&wrapper, dataSource);
198  }
199  break;
200 
202  {
203  const ComplexInstrumentUpdateWrapper wrapper(msg);
204  listenerHolder_->invokeComplexInstrumentUpdate(&wrapper, dataSource);
205  }
206  break;
207 
209  {
210  const FlexibleInstrumentUpdateWrapper wrapper(msg);
211  listenerHolder_->invokeFlexibleInstrumentUpdate(&wrapper, dataSource);
212  }
213  break;
214 
215  default:
216  throw OnixS::BadArgumentException(BOOST_CURRENT_FUNCTION, "templateId", templateId);
217  break;
218  }
219 }
220 
222  const CORE::Message& msg,
223  TemplateId templateId,
224  const DataSource& dataSource,
225  bool* gapDetected)
226 {
227  BOOST_ASSERT(gapDetected != nullptr);
228 
229  log(ONIXS_LOG_DEBUG[this] << msg);
230 
231  if(synchronized_)
232  {
233  CORE::SeqNumber seqNum = 0;
234  msg[Tags::MsgSeqNum].convertTo(seqNum);
235 
236  if(seqNum == nextMsgSeqNum_)
237  {
238  dispatchIncremental(msg, templateId, dataSource, gapDetected);
239  ++nextMsgSeqNum_;
240  }
241  else
242  {
243  //gap detected
244  log(ONIXS_LOG_INFO[this] << "Message gap detected: expected " << nextMsgSeqNum_ << ", but received " << seqNum);
245 
246  nextMsgSeqNum_ = UndefinedSeqNum;
247  synchronized_ = false;
248  *gapDetected = true;
249 
250  listenerHolder_->invokeDepthReset(marketSegmentId_);
251  }
252  }
253 
254  if(!synchronized_)
255  {
256  cachedIncremetalQueue_.push(msg, dataSource, templateId);
257 
258  if(templateId == DepthIncrementalTemplateId)
259  {
260  const DepthIncrementalWrapper wrapper(msg);
261  listenerHolder_->invokeCachedDepthIncremental(&wrapper, dataSource);
262  }
263  }
264 }
265 
267  const CORE::Message& msg,
268  const CORE::FieldValue&,
269  TemplateId templateId,
270  const DataSource& dataSource,
271  bool* gapDetected)
272 {
273  processIncremental(msg, templateId, dataSource, gapDetected);
274 }
275 
277 {
278  while(!cachedIncremetalQueue_.empty())
279  {
280  MessageInfo messageInfo = cachedIncremetalQueue_.pop();
281  const CORE::Message& msg = *messageInfo.message;
282 
283  CORE::SeqNumber seqNum = 0;
284  msg[Tags::MsgSeqNum].convertTo(seqNum);
285 
286  if(seqNum == nextMsgSeqNum_)
287  {
288  log(ONIXS_LOG_DEBUG[this] << "Applied cached message: " << msg);
289 
290  const TemplateId templateId = messageInfo.templateId;
291  DataSource& dataSource = *messageInfo.dataSource;
292 
293  dataSource.cached = true;
294  dataSource.origin = DataSource::Undefined;
295 
296  dispatchIncremental(msg, templateId, dataSource, nullptr);
297 
298  ++nextMsgSeqNum_;
299  }
300  else if(seqNum > nextMsgSeqNum_)
301  {
302  log(ONIXS_LOG_INFO[this] << "Cached message gap detected: expected " << nextMsgSeqNum_ << ", but received " << seqNum);
303 
304  nextMsgSeqNum_ = UndefinedSeqNum;
305  synchronized_ = false;
306 
307  listenerHolder_->invokeDepthReset(marketSegmentId_);
308 
309  cachedIncremetalQueue_.reset();
310 
311  return;
312  }
313  else
314  {
315  log(ONIXS_LOG_DEBUG[this] << "Ignored cached message: " << msg);
316  }
317  }
318 
319  synchronized_ = true;
320  cachedIncremetalQueue_.reset();
321 }
322 
323 void EmdiProductDataRepository::processBeacon(const CORE::Message &msg, bool *gapDetected)
324 {
325  log(ONIXS_LOG_DEBUG[this] << msg);
326 
327  if(!synchronized_)
328  return;
329 
330  UInt32 lastMsgSeqNumProcessed;
331  msg[Tags::LastMsgSeqNumProcessed].convertTo(lastMsgSeqNumProcessed);
332 
333  if(lastMsgSeqNumProcessed + 1 != nextMsgSeqNum_)
334  {
335  //gap detected
336  log(ONIXS_LOG_INFO[this] << "Message gap detected: expected " << nextMsgSeqNum_ << ", but lastMsgSeqNumProcessed " << lastMsgSeqNumProcessed);
337 
338  nextMsgSeqNum_ = UndefinedSeqNum;
339  synchronized_ = false;
340  *gapDetected = true;
341 
342  listenerHolder_->invokeDepthReset(marketSegmentId_);
343  }
344 }
345 
346 EditableOrderBook* EmdiProductDataRepository::getOrCreateBook(SecurityId id)
347 {
348  BOOST_ASSERT(buildInternalOrderBooks_);
349 
350  BooksEntry book = books_.find(id);
351  if (books_.end() != book)
352  return book->second.get();
353  else
354  {
355  EditableOrderBookPtr book(new EditableOrderBook(id, marketDepth_, this, logger_));
356  books_.insert(std::make_pair(id, book));
357 
358  return book.get();
359  }
360 }
361 
362 void EmdiProductDataRepository::updateBook(const DepthSnapshot& snapshot)
363 {
364  BOOST_ASSERT(buildInternalOrderBooks_);
365 
366  const SecurityId securityId = snapshot.securityId();
367 
368  if(!filterSecurityId_(securityId))
369  return;
370 
371  EditableOrderBook* const book = getOrCreateBook(securityId);
372  BOOST_ASSERT(book != nullptr);
373 
374  book->clear();
375 
376  UInt32 sequenceNumber = 0;
377  snapshot.lastMsgSeqNumProcessed(sequenceNumber);
378 
379  book->setLastMsgSeqNumProcessed(sequenceNumber);
380 
381  const MDSnapshotEntries& entries = snapshot.mdEntries();
382  for(size_t i = 0, size = entries.size(); i < size; ++i)
383  {
384  const MDSnapshotEntry& entry = entries[i];
385 
386  switch(entry.mdEntryType())
387  {
388  case MDEntryType::Bid:
389  book->apply(entry, MDEntryType::Bid);
390  break;
391  case MDEntryType::Offer:
392  book->apply(entry, MDEntryType::Offer);
393  break;
395  book->emptyBook(entry);
396  break;
397  case MDEntryType::Trade:
398  {
399  const SnapshotTradeWrapper trade(securityId, sequenceNumber, entry);
400  listenerHolder_->invokeSnapshotTrade(trade);
401  }
402  break;
408  break;
409 
411  {
412  log(ONIXS_LOG_DEBUG[this] << "Unsupported snapshot entry type: " << (int) entry.mdEntryType());
413  }
414  break;
415 
416  default:
417  BOOST_ASSERT(false);
418  break;
419  }
420  }
421 
422  if( settings_.logSettings & LogSettings::LogBooks)
423  log(ONIXS_LOG_DEBUG[this] << "Order book updated: " << "\n" << book->toFormattedString() );
424 
425  listenerHolder_->invokeOrderBookUpdated(*book);
426 }
427 
428 void EmdiProductDataRepository::updateBook(const DepthIncremental& incremental)
429 {
430  BOOST_ASSERT(buildInternalOrderBooks_);
431 
432  const SequenceNumber seqNum = incremental.seqNum();
433  EditableOrderBook* updatedBook = nullptr;
434 
435  const MDIncrementalEntries& entries = incremental.mdEntries();
436  for(size_t i = 0, size = entries.size(); i < size; ++i)
437  {
438  const MDIncrementalEntry& entry = entries[i];
439 
440  const SecurityId securityId = entry.securityId();
441 
442  if(!filterSecurityId_(securityId))
443  continue;
444 
445  switch(entry.mdEntryType())
446  {
447  case MDEntryType::Bid:
448  {
449  updatedBook = getOrCreateBook(securityId);
450  BOOST_ASSERT(updatedBook != nullptr);
451  updatedBook->apply(entry, MDEntryType::Bid);
452  }
453  break;
454  case MDEntryType::Offer:
455  {
456  updatedBook = getOrCreateBook(securityId);
457  BOOST_ASSERT(updatedBook != nullptr);
458  updatedBook->apply(entry, MDEntryType::Offer);
459  }
460  break;
462  {
463  updatedBook = getOrCreateBook(securityId);
464  BOOST_ASSERT(updatedBook != nullptr);
465  updatedBook->clear();
466  }
467  break;
468  case MDEntryType::Trade:
469  {
470  const IncrementalTradeWrapper trade(securityId, seqNum, entry);
471  listenerHolder_->invokeIncrementalTrade(trade);
472  }
473  break;
479  break;
480 
482  {
483  log(ONIXS_LOG_DEBUG[this] << "Unsupported snapshot entry type: " << (int)entry.mdEntryType());
484  }
485  break;
486 
487  default:
488  BOOST_ASSERT(false);
489  break;
490  }
491 
492  if(updatedBook != nullptr)
493  {
494  if ((i == size - 1) || (securityId != entries[i + 1].securityId()))
495  {
496  if( settings_.logSettings & LogSettings::LogBooks)
497  log(ONIXS_LOG_DEBUG[this] << "Order book updated: " << "\n" << updatedBook->toFormattedString() );
498 
499  updatedBook->setLastMsgSeqNumProcessed(seqNum);
500  listenerHolder_->invokeOrderBookUpdated(*updatedBook);
501 
502  updatedBook = nullptr;
503  }
504  }
505  }
506 }
507 
509 {
510  const bool needReset = synchronized_;
511 
512  log(ONIXS_LOG_INFO[this] << "Reset");
513 
514  cachedIncremetalQueue_.reset();
515  nextMsgSeqNum_ = UndefinedSeqNum;
516  synchronized_ = false;
517 
518  if(needReset)
519  listenerHolder_->invokeDepthReset(marketSegmentId_);
520 }
521 
523 {
524  listenerHolder_->invokeDepthOutOfDate(marketSegmentId_);
525 
526  if(buildInternalOrderBooks_)
527  {
528  for (BooksEntry book = books_.begin() ; book != books_.end(); book++)
529  listenerHolder_->invokeOrderBookOutOfDate(*(book->second.get()));
530  }
531 }
532 
534 {
535  return !cachedIncremetalQueue_.empty();
536 }
537 
539 {
540  return buildInternalOrderBooks_;
541 }
542 
544 {
545  return synchronized_;
546 }
547 
549 {
550  return marketDepth_;
551 }
552 
554 {
555  return marketSegmentId_;
556 }
557 
558 void EmdiProductDataRepository::reportWarning(const OnixS::Util::TextBuilder& tb)
559 {
560  listenerHolder_->invokeWarning(tb.toString());
561  log(ONIXS_LOG_WARN[this] << tb.toString());
562 }
563 
564 
565 }}}}
566 
567 
unsigned int SequenceNumber
Alias for sequence numbers.
void invokeFlexibleInstrumentUpdate(const FlexibleInstrumentUpdate *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeTopOfBookImplied(const TopOfBookImplied *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
SecurityId securityId() const
Instrument identifier.
SequenceNumber seqNum() const
Definition: Message.cpp:178
const Tag LastMsgSeqNumProcessed
Definition: Tags.h:65
void invokeIncrementalTrade(const IncrementalTrade &trade) ONIXS_NOEXCEPT
void invokeComplexInstrumentUpdate(const ComplexInstrumentUpdate *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
UInt32 MarketDepth
Alias for Market depth type.
Definition: Defines.h:43
MDEntryType::Enum mdEntryType() const
Defines the entry type.
Definition: DepthSnapshot.h:62
SnapshotTradeWrapper(SecurityId id, UInt32 sequenceNumber, const MDSnapshotEntry &tradeEntry)
Log updated order book, applied only for Debug log level.
Definition: LogSettings.h:78
void invokeMassInstrumentStateChange(const MassInstrumentStateChange *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void push(const CORE::Message &message, const DataSource &dataSource, TemplateId templateId)
IncrementalTradeWrapper(SecurityId id, UInt32 sequenceNumber, const MDIncrementalEntry &tradeEntry)
Handler base configuration settings.
void invokeDepthReset(MarketSegmentId marketSegmentId) ONIXS_NOEXCEPT
Util::TextBuilder TextBuilder
Definition: Formatting.h:43
void invokeCachedDepthIncremental(const DepthIncremental *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeInstrumentStateChange(const InstrumentStateChange *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
SecurityId securityId() const
Instrument identifier.
void processIncremental(const CORE::Message &, const CORE::FieldValue &, TemplateId, const DataSource &, bool *)
void invokeOrderBookUpdated(const OrderBook &book) ONIXS_NOEXCEPT
size_t size() const
Return number of instances in repeating group.
Definition: Group.h:123
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
void invokeSnapshotTrade(const SnapshotTrade &trade) ONIXS_NOEXCEPT
UInt32 sequenceNumber() const
Sequence number of the last processed message.
Definition: Trade.h:79
bool lastMsgSeqNumProcessed(UInt32 &seqNum) const
Last message sequence number sent regardless of message type.
Int64 SecurityId
Alias for Security Id type.
Definition: Defines.h:51
void invokeQuoteRequest(const QuoteRequest *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeDepthOutOfDate(MarketSegmentId marketSegmentId) ONIXS_NOEXCEPT
void apply(const MDSnapshotEntry &entry, MDEntryType::Enum entryType)
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
Used to identify absence of value.
Definition: Defines.h:224
void invokeOrderBookOutOfDate(const OrderBook &book) ONIXS_NOEXCEPT
std::string toFormattedString() const
Returns formatted presentation of the book.
Definition: OrderBook.h:267
void invokeDepthSnapshot(const DepthSnapshot *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
EmdiProductDataRepository(MarketSegmentId marketSegmentId, MarketDepth marketDepth, bool buildInternalOrderBooks, LogFacility *parent, HandlerCore::Common::HandlerLogger *logger, ListenerHolder *listenerHolder, const HandlerSettings &settings, SecurityIdFilter filterSecurityId)
UInt32 MarketSegmentId
Alias for Market Segment ID type.
Definition: Defines.h:40
void processDepthSnapshot(const CORE::Message &msg, const DataSource &dataSource)
void invokeProductStateChange(const ProductStateChange *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
OnixS::FIX::Core::FAST::TemplateId TemplateId
void invokeWarning(const std::string &description) ONIXS_NOEXCEPT
MDEntryType::Enum mdEntryType() const
Defines the entry type.
MDIncrementalEntries mdEntries() const
Entries.
void invokeCrossRequest(const CrossRequest *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeDepthIncremental(const DepthIncremental *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
MDSnapshotEntries mdEntries() const
Entries.
Market data snapshot entry.
Definition: DepthSnapshot.h:51