OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  8.1.0
API documentation
MdiProductDataRepository.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 #include <util/Converter.h>
20 
22 
24 #include "MessageOperator.h"
25 #include "Formatting.h"
26 
27 
28 namespace OnixS {
29 namespace Eurex {
30 namespace MarketData {
31 namespace Implementation {
32 
33 class SnapshotTradeWrapper : public SnapshotTrade
34 {
35 public :
37  : SnapshotTrade(id, sequenceNumber, tradeEntry) {}
38 };
39 
41 {
42 public :
44  : IncrementalTrade(id, sequenceNumber, tradeEntry) {}
45 };
46 
48  : synchronized_(false)
49  , book_(nullptr)
50 {
51  if(createBook)
52  book_.reset(new EditableOrderBook(securityId, repository->marketDepth_, repository, repository->logger_));
53 }
54 
56 {
57  synchronized_ = false;
58  if (book_)
59  book_->clear();
60 }
61 
63 using namespace HandlerCore::Common;
64 
66  MarketSegmentId marketSegmentId,
67  MarketDepth marketDepth,
68  bool buildInternalOrderBooks,
69  LogFacility* parent,
70  HandlerLogger* logger,
71  ListenerHolder* listenerHolder,
72  const HandlerSettings& settings,
73  SecurityIdFilter filterSecurityId)
74  : base(i2str(marketSegmentId), parent, Logging::LOG_LEVEL_DEBUG)
75  , logger_(logger)
76  , listenerHolder_(listenerHolder)
77  , settings_(settings)
78  , marketSegmentId_(marketSegmentId)
79  , marketDepth_(marketDepth)
80  , buildInternalOrderBooks_(buildInternalOrderBooks)
81  , filterSecurityId_(filterSecurityId)
82  , nextMsgSeqNum_(1)
83  , synchronized_(true)
84 {
85  BOOST_ASSERT((!buildInternalOrderBooks_) || marketDepth_ != 0);
86 }
87 
89 {
90  const bool needReset = synchronized_;
91 
92  log(ONIXS_LOG_INFO[this] << "Reset");
93 
94  nextMsgSeqNum_ = 1;
95  synchronized_ = false;
96 
97  for(SecurityContextsEntry securityContext = securityContexts_.begin(), end = securityContexts_.end();
98  securityContext != end;
99  ++securityContext)
100  {
101  securityContext->second->reset();
102  }
103 
104  if (needReset)
105  listenerHolder_->invokeDepthReset(marketSegmentId_);
106 }
107 
109 {
110  SecurityContextsEntry securityContext = securityContexts_.find(id);
111  if (securityContexts_.end() != securityContext)
112  return securityContext->second.get();
113  else
114  {
115  SecurityContextPtr newSecurityContext(new SecurityContext(buildInternalOrderBooks_, id, this));
116 
117  const ONIXS_UNUSED bool inserted =
118  securityContexts_.insert(std::make_pair(id, newSecurityContext)).second;
119 
120  BOOST_ASSERT(inserted);
121 
122  return newSecurityContext.get();
123  }
124 }
125 
127  const CORE::Message& msg,
128  TemplateId templateId,
129  const DataSource& dataSource)
130 {
131  log(ONIXS_LOG_DEBUG[this] << msg);
132 
133  CORE::SeqNumber seqNum = 0;
134  msg[Tags::MsgSeqNum].convertTo(seqNum);
135 
136  if (synchronized_)
137  {
138  if (seqNum != nextMsgSeqNum_)
139  {
140  log(ONIXS_LOG_DEBUG[this] << "Incremental gap detected");
141  reset();
142  }
143  else
144  ++nextMsgSeqNum_;
145  }
146 
147  try
148  {
149  switch (templateId)
150  {
152  {
153  processDepthSnapshot(msg, dataSource);
154 
155  synchronized_ = true;
156  nextMsgSeqNum_ = seqNum + 1;
157  }
158  break;
159 
161  {
162  if (synchronized_)
163  processDepthIncremental(msg, dataSource);
164  else
165  {
166  const DepthIncrementalWrapper wrapper(msg);
167  listenerHolder_->invokeCachedDepthIncremental(&wrapper, dataSource);
168  }
169  }
170  break;
171 
173  {
174  const TopOfBookImpliedWrapper wrapper(msg);
175  listenerHolder_->invokeTopOfBookImplied(&wrapper, dataSource);
176  }
177  break;
178 
180  {
181  const ProductStateChangeWrapper wrapper(msg);
182  listenerHolder_->invokeProductStateChange(&wrapper, dataSource);
183  }
184  break;
185 
187  {
188  const MassInstrumentStateChangeWrapper wrapper(msg);
189  listenerHolder_->invokeMassInstrumentStateChange(&wrapper, dataSource);
190  }
191  break;
192 
194  {
195  const InstrumentStateChangeWrapper wrapper(msg);
196  listenerHolder_->invokeInstrumentStateChange(&wrapper, dataSource);
197  }
198  break;
199 
201  {
202  const QuoteRequestWrapper wrapper(msg);
203  listenerHolder_->invokeQuoteRequest(&wrapper, dataSource);
204  }
205  break;
206 
208  {
209  const CrossRequestWrapper wrapper(msg);
210  listenerHolder_->invokeCrossRequest(&wrapper, dataSource);
211  }
212  break;
213 
215  {
216  const ComplexInstrumentUpdateWrapper wrapper(msg);
217  listenerHolder_->invokeComplexInstrumentUpdate(&wrapper, dataSource);
218  }
219  break;
220 
222  {
223  const FlexibleInstrumentUpdateWrapper wrapper(msg);
224  listenerHolder_->invokeFlexibleInstrumentUpdate(&wrapper, dataSource);
225  }
226  break;
227 
228  default:
229  throw OnixS::BadArgumentException(BOOST_CURRENT_FUNCTION, "templateId", templateId);
230  break;
231  }
232  }
233  catch (const std::exception& ex)
234  {
235  TextBuilder tb;
236  tb << "Exception while processing incremenatl message: " << ex.what();
237  log(ONIXS_LOG_WARN[this] << tb.toString());
238  reset();
239  }
240  catch (...)
241  {
242  TextBuilder tb;
243  tb << "Unknown exception while processing incremenatl message!";
244  log(ONIXS_LOG_WARN[this] << "Unknown exception while processing incremenatl message!");
245  reset();
246  }
247 }
248 
249 void MdiProductDataRepository::processDepthSnapshot(const CORE::Message& msg, const DataSource& dataSource)
250 {
251  const DepthSnapshotWrapper snapshot(msg);
252 
253  const SecurityId securityId = snapshot.securityId();
254  SecurityContext* const securityContext = getOrCreateSecurityContext(securityId);
255 
256  if(!securityContext->synchronized() || snapshot.refreshIndicator() == RefreshIndicator::MandatoryRefresh)
257  {
258  listenerHolder_->invokeDepthSnapshot(&snapshot, dataSource);
259  securityContext->synchronized(true);
260 
261  if (!buildInternalOrderBooks_)
262  return;
263 
264  if (!filterSecurityId_(securityId))
265  return;
266 
267  EditableOrderBook* book = securityContext->book();
268  book->clear();
269 
270  const MDSnapshotEntries& entries = snapshot.mdEntries();
271  for (size_t i = 0, size = entries.size(); i < size; ++i)
272  {
273  const MDSnapshotEntry& entry = entries[i];
274 
275  switch (entry.mdEntryType())
276  {
277  case MDEntryType::Bid:
278  book->apply(entry, MDEntryType::Bid);
279  break;
280  case MDEntryType::Offer:
281  book->apply(entry, MDEntryType::Offer);
282  break;
284  book->emptyBook(entry);
285  break;
286  case MDEntryType::Trade:
287  {
288  const SnapshotTradeWrapper trade(securityId, 0, entry);
289  log(ONIXS_LOG_DEBUG[this] << trade);
290  listenerHolder_->invokeSnapshotTrade(trade);
291  break;
292  }
298  break;
299 
301  log(ONIXS_LOG_DEBUG[this] << "Unsupported snapshot entry type: " << (int)entry.mdEntryType());
302  break;
303 
304  default:
305  BOOST_ASSERT(false);
306  break;
307  }
308  }
309 
310  if (settings_.logSettings & LogSettings::LogBooks)
311  log(ONIXS_LOG_DEBUG[this] << "Order book updated: " << "\n" << book->toFormattedString());
312 
313  listenerHolder_->invokeOrderBookUpdated(*book);
314  }
315 }
316 
317 void MdiProductDataRepository::processDepthIncremental(const CORE::Message& msg, const DataSource& dataSource)
318 {
319  const DepthIncrementalWrapper incremental(msg);
320  listenerHolder_->invokeDepthIncremental(&incremental, dataSource);
321 
322  if (!buildInternalOrderBooks_)
323  return;
324 
325  EditableOrderBook* updatedBook = nullptr;
326 
327  const MDIncrementalEntries& entries = incremental.mdEntries();
328  for (size_t i = 0, size = entries.size(); i < size; ++i)
329  {
330  const MDIncrementalEntry& entry = entries[i];
331 
332  const SecurityId securityId = entry.securityId();
333 
334  if (!filterSecurityId_(securityId))
335  continue;
336 
337  switch (entry.mdEntryType())
338  {
339  case MDEntryType::Bid:
340  {
341  SecurityContext* const securityContext = getOrCreateSecurityContext(securityId);
342  if (!securityContext->synchronized())
343  break;
344 
345  updatedBook = securityContext->book();
346  BOOST_ASSERT(updatedBook != nullptr);
347 
348  updatedBook->apply(entry, MDEntryType::Bid);
349  break;
350  }
351  case MDEntryType::Offer:
352  {
353  SecurityContext* const securityContext = getOrCreateSecurityContext(securityId);
354  if (!securityContext->synchronized())
355  break;
356 
357  updatedBook = securityContext->book();
358  BOOST_ASSERT(updatedBook != nullptr);
359 
360  updatedBook->apply(entry, MDEntryType::Offer);
361  break;
362  }
364  {
365  SecurityContext* const securityContext = getOrCreateSecurityContext(securityId);
366  if (!securityContext->synchronized())
367  break;
368 
369  updatedBook = securityContext->book();
370  BOOST_ASSERT(updatedBook != nullptr);
371 
372  updatedBook->clear();
373  break;
374  }
375  case MDEntryType::Trade:
376  {
377  const IncrementalTradeWrapper trade(securityId, 0, entry);
378  log(ONIXS_LOG_DEBUG[this] << trade);
379  listenerHolder_->invokeIncrementalTrade(trade);
380  break;
381  }
387  break;
388 
390  log(ONIXS_LOG_DEBUG[this] << "Unsupported snapshot entry type: " << (int)entry.mdEntryType());
391  break;
392 
393  default:
394  BOOST_ASSERT(false);
395  break;
396  }
397 
398  if (updatedBook != nullptr)
399  {
400  if ((i == size - 1) || (securityId != entries[i + 1].securityId()))
401  {
402  if (settings_.logSettings & LogSettings::LogBooks)
403  log(ONIXS_LOG_DEBUG[this] << "Order book updated: " << "\n" << updatedBook->toFormattedString());
404 
405  listenerHolder_->invokeOrderBookUpdated(*updatedBook);
406 
407  updatedBook = nullptr;
408  }
409  }
410  }
411 }
412 
413 //void MdiProductDataRepository::processIncremental(
414 // const CORE::Message& msg,
415 // const CORE::FieldValue&,
416 // TemplateId templateId,
417 // const DataSource& dataSource,
418 // bool* gapDetected)
419 //{
420 // processMessage(msg, templateId, dataSource);
421 //}
422 
423 //void MdiProductDataRepository::dispatchIncremental(
424 // const CORE::Message& msg,
425 // TemplateId templateId,
426 // const DataSource& dataSource,
427 // bool* gapDetected)
428 //{
429 // switch (templateId)
430 // {
431 // case MdiDepthIncrementalTemplateId:
432 // {
433 // const DepthIncrementalWrapper wrapper(msg);
434 // listenerHolder_->invokeDepthIncremental(&wrapper, dataSource);
435 //
436 // const bool invalidMessage = applyDepth(wrapper);
437 //
438 // if (invalidMessage)
439 // {
440 // if (gapDetected)
441 // *gapDetected = true;
442 //
443 // return;
444 // }
445 // }
446 // break;
447 //
448 // case MdiTopOfBookImpliedTemplateId:
449 // {
450 // const TopOfBookImpliedWrapper wrapper(msg);
451 // listenerHolder_->invokeTopOfBookImplied(&wrapper, dataSource);
452 // }
453 // break;
454 //
455 // case MdiProductStateChangeTemplateId:
456 // {
457 // const ProductStateChangeWrapper wrapper(msg);
458 // listenerHolder_->invokeProductStateChange(&wrapper, dataSource);
459 // }
460 // break;
461 //
462 // case MdiMassInstrumentStateChangeTemplateId:
463 // {
464 // const MassInstrumentStateChangeWrapper wrapper(msg);
465 // listenerHolder_->invokeMassInstrumentStateChange(&wrapper, dataSource);
466 // }
467 // break;
468 //
469 // case MdiInstrumentStateChangeTemplateId:
470 // {
471 // const InstrumentStateChangeWrapper wrapper(msg);
472 // listenerHolder_->invokeInstrumentStateChange(&wrapper, dataSource);
473 // }
474 // break;
475 //
476 // case MdiQuoteRequestTemplateId:
477 // {
478 // const QuoteRequestWrapper wrapper(msg);
479 // listenerHolder_->invokeQuoteRequest(&wrapper, dataSource);
480 // }
481 // break;
482 //
483 // case MdiCrossRequestTemplateId:
484 // {
485 // const CrossRequestWrapper wrapper(msg);
486 // listenerHolder_->invokeCrossRequest(&wrapper, dataSource);
487 // }
488 // break;
489 //
490 // case MdiComplexInstrumentTemplateId:
491 // {
492 // const ComplexInstrumentUpdateWrapper wrapper(msg);
493 // listenerHolder_->invokeComplexInstrumentUpdate(&wrapper, dataSource);
494 // }
495 // break;
496 //
497 // case MdiFlexibleInstrumentTemplateId:
498 // {
499 // const FlexibleInstrumentUpdateWrapper wrapper(msg);
500 // listenerHolder_->invokeFlexibleInstrumentUpdate(&wrapper, dataSource);
501 // }
502 // break;
503 //
504 // default:
505 // throw OnixS::BadArgumentException(BOOST_CURRENT_FUNCTION, "templateId", templateId);
506 // break;
507 // }
508 //}
509 
510 //template<typename MessageType>
511 //bool MdiProductDataRepository::applyDepth(const MessageType& wrapper)
512 //{
513 // if (!buildInternalOrderBooks_)
514 // return false;
515 //
516 // bool invalidMessage = false;
517 //
518 // try
519 // {
520 // updateBook(wrapper);
521 // }
522 // catch (const std::exception& ex)
523 // {
524 // invalidMessage = true;
525 // log(ONIXS_LOG_WARN[this] << "Message cannot be applied: " << ex.what());
526 // }
527 // catch (...)
528 // {
529 // invalidMessage = true;
530 // log(ONIXS_LOG_WARN[this] << "Message cannot be applied: unknown exception");
531 // }
532 //
533 // if (invalidMessage)
534 // {
535 // //treat as lost
536 // nextMsgSeqNum_ = UndefinedSeqNum;
537 // synchronized_ = false;
538 // listenerHolder_->invokeDepthReset(marketSegmentId_);
539 // }
540 //
541 // return invalidMessage;
542 //}
543 
545 {
546  listenerHolder_->invokeDepthOutOfDate(marketSegmentId_);
547 
548  if(buildInternalOrderBooks_)
549  {
550  for (SecurityContextsEntry securityContext = securityContexts_.begin(), end = securityContexts_.end();
551  securityContext != end; ++securityContext)
552  {
553  if(securityContext->second->synchronized())
554  listenerHolder_->invokeOrderBookOutOfDate(*(securityContext->second->book()));
555  }
556  }
557 }
558 
560 {
561  return marketSegmentId_;
562 }
563 
564 }}}}
565 
566 
const TemplateId MdiMassInstrumentStateChangeTemplateId
void invokeFlexibleInstrumentUpdate(const FlexibleInstrumentUpdate *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeTopOfBookImplied(const TopOfBookImplied *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void processDepthIncremental(const CORE::Message &msg, const DataSource &dataSource)
SecurityId securityId() const
Instrument identifier.
void invokeIncrementalTrade(const IncrementalTrade &trade) ONIXS_NOEXCEPT
void invokeComplexInstrumentUpdate(const ComplexInstrumentUpdate *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
UInt32 MarketDepth
Alias for Market depth type.
Definition: Defines.h:43
MDEntryType::Enum mdEntryType() const
Defines the entry type.
Definition: DepthSnapshot.h:62
SnapshotTradeWrapper(SecurityId id, UInt32 sequenceNumber, const MDSnapshotEntry &tradeEntry)
Log updated order book, applied only for Debug log level.
Definition: LogSettings.h:78
void invokeMassInstrumentStateChange(const MassInstrumentStateChange *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
IncrementalTradeWrapper(SecurityId id, UInt32 sequenceNumber, const MDIncrementalEntry &tradeEntry)
Handler base configuration settings.
void invokeDepthReset(MarketSegmentId marketSegmentId) ONIXS_NOEXCEPT
Util::TextBuilder TextBuilder
Definition: Formatting.h:43
void invokeCachedDepthIncremental(const DepthIncremental *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeInstrumentStateChange(const InstrumentStateChange *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
SecurityId securityId() const
Instrument identifier.
void invokeOrderBookUpdated(const OrderBook &book) ONIXS_NOEXCEPT
void processMessage(const CORE::Message &, TemplateId, const DataSource &)
size_t size() const
Return number of instances in repeating group.
Definition: Group.h:123
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
void invokeSnapshotTrade(const SnapshotTrade &trade) ONIXS_NOEXCEPT
UInt32 sequenceNumber() const
Sequence number of the last processed message.
Definition: Trade.h:79
Int64 SecurityId
Alias for Security Id type.
Definition: Defines.h:51
void invokeQuoteRequest(const QuoteRequest *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeDepthOutOfDate(MarketSegmentId marketSegmentId) ONIXS_NOEXCEPT
void apply(const MDSnapshotEntry &entry, MDEntryType::Enum entryType)
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
void processDepthSnapshot(const CORE::Message &msg, const DataSource &dataSource)
Used to identify absence of value.
Definition: Defines.h:224
void invokeOrderBookOutOfDate(const OrderBook &book) ONIXS_NOEXCEPT
SecurityContext(bool createBook, SecurityId securityId, MdiProductDataRepository *repository)
RefreshIndicator::Enum refreshIndicator() const
Refresh Indicator.
std::string toFormattedString() const
Returns formatted presentation of the book.
Definition: OrderBook.h:267
MdiProductDataRepository(MarketSegmentId marketSegmentId, MarketDepth marketDepth, bool buildInternalOrderBooks, LogFacility *parent, HandlerCore::Common::HandlerLogger *logger, ListenerHolder *listenerHolder, const HandlerSettings &settings, SecurityIdFilter filterSecurityId)
void invokeDepthSnapshot(const DepthSnapshot *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
UInt32 MarketSegmentId
Alias for Market Segment ID type.
Definition: Defines.h:40
void invokeProductStateChange(const ProductStateChange *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
OnixS::FIX::Core::FAST::TemplateId TemplateId
MDEntryType::Enum mdEntryType() const
Defines the entry type.
MDIncrementalEntries mdEntries() const
Entries.
void invokeCrossRequest(const CrossRequest *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
void invokeDepthIncremental(const DepthIncremental *msg, const DataSource &dataSource) ONIXS_NOEXCEPT
MDSnapshotEntries mdEntries() const
Entries.
Market data snapshot entry.
Definition: DepthSnapshot.h:51