OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  8.1.0
API documentation
EmdiDataRepository.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 #include <boost/bind.hpp>
20 
22 
23 #include "MessageOperator.h"
25 
26 #include "EmdiDataRepository.h"
27 
28 
29 namespace OnixS {
30 namespace Eurex {
31 namespace MarketData {
32 namespace Implementation {
33 
35 
36 namespace {
37 
38 bool extractMarketSegmentId(const CORE::FieldSet& msg, MarketSegmentId& marketSegmentId)
39 {
40  const CORE::FieldValue& marketSegmentIdValue = msg[Tags::MarketSegmentID];
41  if(CORE::DataTypes::Null == marketSegmentIdValue.type())
42  return false;
43  if(!marketSegmentIdValue.convertTo(marketSegmentId))
44  return false;
45 
46  BOOST_ASSERT(marketSegmentId != UndefinedMarketSegmentId);
47 
48  return true;
49 }
50 
51 bool extractSecurityID(const CORE::Message& msg, SecurityId& securityId)
52 {
53  const CORE::FieldValue& securityIdValue = msg[Tags::SecurityID];
54  if(CORE::DataTypes::Null == securityIdValue.type())
55  return false;
56  if(!securityIdValue.convertTo(securityId))
57  return false;
58 
59  BOOST_ASSERT(securityId != UndefinedSecurityId);
60 
61  return true;
62 }
63 
64 bool extractSenderSubId(const CORE::Message& msg, UInt32& senderSubId)
65 {
66  const CORE::FieldValue& senderSubIdValue = msg[Tags::SenderSubID];
67  if(CORE::DataTypes::Null == senderSubIdValue.type())
68  return false;
69 
70  if(!senderSubIdValue.convertTo(senderSubId))
71  return false;
72 
73  BOOST_ASSERT(senderSubId != 0);
74 
75  return true;
76 }
77 
78 MarketSegmentId getMarketSegmentId(const CORE::Message& msg, TemplateId templateId)
79 {
80  MarketSegmentId marketSegmentId = UndefinedMarketSegmentId;
81 
82  if (ComplexInstrumentTemplateId == templateId || FlexibleInstrumentTemplateId == templateId)
83  {
84  try
85  {
86  const CORE::FieldIndex fieldIndex = msg.structure().find(Tags::NoMarketSegments);
87 
88  BOOST_ASSERT(fieldIndex != static_cast<CORE::FieldIndex>(CORE::FieldIndexTraits::OutOfRange));
89 
90  const CORE::FieldValue& fieldValue = msg.indexed()[fieldIndex];
91  const CORE::RepeatingGroup* group = fieldValue.get<const CORE::RepeatingGroup*>();
92 
93  BOOST_ASSERT(group != nullptr);
94 
95  if(!extractMarketSegmentId((*group)[0], marketSegmentId))
96  throw OnixS::BadArgumentException(BOOST_CURRENT_FUNCTION, "Unable to extract marketSegmentId");
97  }
98  catch(...)
99  {
100  throw OnixS::BadArgumentException(BOOST_CURRENT_FUNCTION, "Unable to extract marketSegmentId");
101  }
102  }
103  else
104  {
105  if(!extractMarketSegmentId(msg, marketSegmentId))
106  throw OnixS::BadArgumentException(BOOST_CURRENT_FUNCTION, "Unable to extract marketSegmentId");
107  }
108 
109  return marketSegmentId;
110 }
111 }
112 
114  : base(config)
115  , recoveryStarted_(false)
116  , firstMarketSegmentId_(0)
117  , lastSnapshotPacketSeqNum_(0)
118  , recoveryCycleStarted_(false)
119  , recoveryCycleFinished_(false)
120 {
121  productDataRepositories_.reserve(100);
122 
123  snapsotRecoveryCycle_ = boost::bind(&EmdiDataRepository::snapshotRecoveryCycleMethodOne, this, _1, _2);
124  resetRecoveryProcess();
125 }
126 
128 {
129  clearProductDataRepositories();
130 }
131 
132 void EmdiDataRepository::clearProductDataRepositories()
133 {
134  BOOST_FOREACH(EmdiProductDataRepository* repo, productDataRepositories_)
135  {
136  BOOST_ASSERT(repo);
137  delete repo;
138  }
139 
140  productDataRepositories_.clear();
141 }
142 
144 {
145  bool operator() (const EmdiProductDataRepository* r, const EmdiProductDataRepository* l) const
146  {
147  BOOST_ASSERT(r);
148  BOOST_ASSERT(l);
149 
150  return r->marketSegmentId() < l->marketSegmentId();
151  }
152 
153  bool operator() (const EmdiProductDataRepository* r, MarketSegmentId l) const
154  {
155  BOOST_ASSERT(r);
156 
157  return r->marketSegmentId() < l;
158  }
159 };
160 
161 EmdiProductDataRepository* EmdiDataRepository::getOrCreateProductDataRepository(
162  MarketSegmentId marketSegmentId,
163  bool create)
164 {
165  std::vector<EmdiProductDataRepository*>::const_iterator it = std::lower_bound(
166  productDataRepositories_.begin(),
167  productDataRepositories_.end(),
168  marketSegmentId,
170  );
171 
172  if ((it != productDataRepositories_.end()) && ((*it)->marketSegmentId() == marketSegmentId))
173  return *it;
174 
175  if (!create)
176  return nullptr;
177 
178  MarketDepth marketDepth = 0;
179 
180  MarketSegmentId2Depth_::const_iterator marketSegmentId2DepthIt = marketSegmentId2Depth_.find(marketSegmentId);
181  if(marketSegmentId2DepthIt == marketSegmentId2Depth_.end())
182  {
183  if (shouldBuildInternalOrderBooks())
184  {
185  reportWarning(TextBuilder() << "No max market depth for " << marketSegmentId << ", book will not be built.");
186  }
187  }
188  else
189  {
190  marketDepth = marketSegmentId2DepthIt->second;
191 
192  log(ONIXS_LOG_DEBUG[this] << "Max market depth for " << marketSegmentId << ": " << marketDepth);
193  }
194 
195  const bool buildInternalOrderBooks = shouldBuildInternalOrderBooks() && (0 != marketDepth);
196 
198  marketSegmentId,
199  marketDepth,
200  buildInternalOrderBooks,
201  this,
202  config_.logger,
205  boost::bind(&EmdiDataRepository::filterSecurityId, this, _1));
206 
207  productDataRepositories_.push_back(rep);
208 
209  std::sort(
210  productDataRepositories_.begin(),
211  productDataRepositories_.end(),
213 
214  return rep;
215 }
216 
217 void EmdiDataRepository::resetRecoveryProcess()
218 {
219  firstMarketSegmentId_ = UndefinedMarketSegmentId;
220  lastSnapshotPacketSeqNum_ = UndefinedSeqNum;
221  recoveryCycleStarted_ = false;
222  recoveryCycleFinished_ = false;
223  securityIDs_.clear();
224  marketSegmentIDs_.clear();
225 }
226 
227 void EmdiDataRepository::onSnapshotMessage(const DataSource& dataSource, const CORE::Message& msg, bool* skipRestOfPacket)
228 {
229  if(recoveryCycleFinished_)
230  {
231  log(ONIXS_LOG_DEBUG[this] << "Recovery finished, packet will be skipped");
232  *skipRestOfPacket = true;
233  return;
234  }
235 
236  log(ONIXS_LOG_DEBUG[this] << "SM: " << msg);
237 
238  if(lastSnapshotPacketSeqNum_ != UndefinedSeqNum)
239  {
240  if( lastSnapshotPacketSeqNum_ + 1 < dataSource.packetSeqNum)
241  {
242  log(ONIXS_LOG_DEBUG[this] << "Snapshot gap detected: reset recovery process");
243  resetRecoveryProcess();
244  }
245  else
246  lastSnapshotPacketSeqNum_ = dataSource.packetSeqNum;
247  }
248 
249  snapsotRecoveryCycle_(dataSource, msg);
250 }
251 
252 void EmdiDataRepository::processBeacon(
253  const CORE::Message& msg,
254  bool* gapDetected)
255 {
256  UInt32 senderSubId = 0;
257  if(!extractSenderSubId(msg, senderSubId))
258  {
259  log(ONIXS_LOG_DEBUG[this] << "IM: " << msg);
260  return;
261  }
262 
263  EmdiProductDataRepository* const productDataRepository = getOrCreateProductDataRepository(
264  senderSubId,
265  marketSegmentIdfilters_.empty()
266  );
267 
268  if (!productDataRepository)
269  {
270  log(ONIXS_LOG_DEBUG[this] << "IM: " << msg);
271  log(ONIXS_LOG_DEBUG[this] << "IM: " << "heartbeat filtered, senderSubId " << senderSubId);
272  return;
273  }
274 
275  productDataRepository->processBeacon(msg, gapDetected);
276 }
277 
279  const DataSource& dataSource,
280  const CORE::Message& msg,
281  TemplateId templateId)
282 {
283  bool gapDetected = false;
284 
285  if (BeaconTemplateId == templateId)
286  {
287  processBeacon(msg, &gapDetected);
288  }
289  else
290  {
291  const MarketSegmentId marketSegmentId = getMarketSegmentId(msg, templateId);
292 
293  EmdiProductDataRepository* const productDataRepository = getOrCreateProductDataRepository(
294  marketSegmentId,
295  marketSegmentIdfilters_.empty()
296  );
297 
298  if (!productDataRepository)
299  {
300  log(ONIXS_LOG_DEBUG[this] << "IM: " << msg);
301  log(ONIXS_LOG_DEBUG[this] << "IM: " << "message filtered, marketSegmentId " << marketSegmentId);
302  return;
303  }
304 
305  productDataRepository->processIncremental(msg, templateId, dataSource, &gapDetected);
306  }
307 
308  if(gapDetected)
309  {
310  if(recoveryCycleStarted_)
311  log(ONIXS_LOG_DEBUG[this] << "Incremental gap detected: start-reset snapshot recovery process");
312 
313  if(!recoveryStarted_)
314  {
315  resetRecoveryProcess();
316 
318  recoveryStarted_ = true;
319  }
320  }
321 }
322 
324 {
325  if (shouldStartRecoveryOnPacketGap() && !recoveryStarted_)
326  {
327  resetRecoveryProcess();
328 
330  recoveryStarted_ = true;
331  }
332 }
333 
335 {
336  if(!recoveryStarted_)
337  {
338  resetRecoveryProcess();
340  recoveryStarted_ = true;
341 
342  BOOST_FOREACH(EmdiProductDataRepository* repo, productDataRepositories_)
343  {
344  BOOST_ASSERT(repo);
345  repo->reset();
346  }
347  }
348 }
349 
351 {
352  resetRecoveryProcess();
353 }
354 
356 {
357  BOOST_FOREACH(EmdiProductDataRepository* repo, productDataRepositories_)
358  {
359  BOOST_ASSERT(repo);
360  repo->noDataOnIncrementalFeeds();
361  }
362 }
363 
364 void EmdiDataRepository::snapshotRecoveryCycleMethodTwo(const DataSource& dataSource, const CORE::Message& msg)
365 {
366  MarketSegmentId marketSegmentId = UndefinedMarketSegmentId;
367  if(!extractMarketSegmentId(msg, marketSegmentId))
368  return;
369 
370  SecurityId securityId = UndefinedSecurityId;
371  if(!extractSecurityID(msg, securityId))
372  return;
373 
374  if(securityIDs_.empty())
375  {
376  securityIDs_.insert(securityId);
377  lastSnapshotPacketSeqNum_ = dataSource.packetSeqNum;
378  return;
379  }
380 
381  if(!recoveryCycleStarted_)
382  {
383  BOOST_ASSERT(securityIDs_.size() == 1);
384 
385  if(securityIDs_.find(securityId) != securityIDs_.end())
386  {
387  recoveryCycleStarted_ = true;
388  log(ONIXS_LOG_DEBUG[this] << "Snapshot cycle started, using Method 2");
389  }
390  else
391  return;
392  }
393  else
394  {
395  if(securityIDs_.find(securityId) != securityIDs_.end())
396  {
397  BOOST_ASSERT(securityIDs_.size() > 1);
398 
399  recoveryCycleFinished_ = true;
400  log(ONIXS_LOG_DEBUG[this] << "Snapshot cycle finished");
401 
402  bool allSyncronized = processCachedIncrementals();
403  if(allSyncronized)
404  {
405  log(ONIXS_LOG_DEBUG[this] << "Recovery finished");
407  recoveryStarted_ = false;
408  return;
409  }
410  else
411  {
412  log(ONIXS_LOG_DEBUG[this] << "Detected gap, will continue recovery");
413  resetRecoveryProcess();
414  return;
415  }
416  }
417  else
418  {
419  bool success = securityIDs_.insert(securityId).second;
420  BOOST_ASSERT(success);
421  (void)success;
422  }
423  }
424 
425  processDepthSnapshotMsg(dataSource, msg, marketSegmentId);
426 }
427 
428 //A change of product MarketSegmentID (1300) for a given SenderCompID (49) indicates the end of the
429 //depth snapshot messages for the respective product. This allows applications to easily determine when
430 //they�ve received a snapshot for every instrument in the products they�re interested in and leave the snapshot
431 //feed.
432 void EmdiDataRepository::snapshotRecoveryCycleMethodOne(const DataSource& dataSource, const CORE::Message& msg)
433 {
434  MarketSegmentId marketSegmentId = UndefinedMarketSegmentId;
435  if(!extractMarketSegmentId(msg, marketSegmentId))
436  return;
437 
438  SecurityId securityId = UndefinedSecurityId;
439  if(!extractSecurityID(msg, securityId))
440  return;
441 
442  if( !recoveryCycleStarted_ )
443  {
444  if(firstMarketSegmentId_ == UndefinedMarketSegmentId)
445  {
446  firstMarketSegmentId_ = marketSegmentId;
447  lastSnapshotPacketSeqNum_ = dataSource.packetSeqNum;
448  return;
449  }
450  else
451  {
452  if( firstMarketSegmentId_ != marketSegmentId)
453  {
454  recoveryCycleStarted_ = true;
455  firstMarketSegmentId_ = marketSegmentId;
456  marketSegmentIDs_.insert(marketSegmentId);
457 
458  log(ONIXS_LOG_DEBUG[this] << "Snapshot cycle started, using Method 1, firstMarketSegmentId=" << firstMarketSegmentId_);
459  }
460  else
461  {
462  if(securityIDs_.find(securityId) != securityIDs_.end())
463  {
464  log(ONIXS_LOG_DEBUG[this] << "Changing Snapshot Recovery Method to 2");
465  snapsotRecoveryCycle_ = boost::bind(&EmdiDataRepository::snapshotRecoveryCycleMethodTwo, this, _1, _2);
466  resetRecoveryProcess();
467  return;
468  }
469  securityIDs_.insert(securityId);
470  return;
471  }
472  }
473  }
474  else
475  {
476  if(firstMarketSegmentId_ == marketSegmentId && marketSegmentIDs_.size() > 1)
477  {
478  recoveryCycleFinished_ = true;
479  log(ONIXS_LOG_DEBUG[this] << "Snapshot cycle finished");
480 
481  const bool allSyncronized = processCachedIncrementals();
482  if(allSyncronized)
483  {
484  log(ONIXS_LOG_DEBUG[this] << "Recovery finished");
486  recoveryStarted_ = false;
487  return;
488  }
489  else
490  {
491  log(ONIXS_LOG_DEBUG[this] << "Detected gap, will continue recovery");
492  resetRecoveryProcess();
493  return;
494  }
495  }
496 
497  marketSegmentIDs_.insert(marketSegmentId);
498  }
499 
500  processDepthSnapshotMsg(dataSource, msg, marketSegmentId);
501 }
502 
503 void EmdiDataRepository::processDepthSnapshotMsg(const DataSource& dataSource, const CORE::Message& msg, const MarketSegmentId& marketSegmentId)
504 {
505  BOOST_ASSERT(msg[Tags::MsgType] == "W");
506 
507  EmdiProductDataRepository* const productDataRepository = getOrCreateProductDataRepository(
508  marketSegmentId,
509  marketSegmentIdfilters_.empty()
510  );
511 
512  if (!productDataRepository)
513  {
514  log(ONIXS_LOG_DEBUG[this] << "SM: " << "message filtered, marketSegmentId " << marketSegmentId);
515  return;
516  }
517 
518  productDataRepository->processDepthSnapshot(msg, dataSource);
519 }
520 
521 bool EmdiDataRepository::processCachedIncrementals()
522 {
523  log(ONIXS_LOG_DEBUG[this] << "Playing cached incrementals");
524 
525  BOOST_FOREACH(EmdiProductDataRepository* repo, productDataRepositories_)
526  {
527  BOOST_ASSERT(repo);
529 
530  if(!repo->syncronized())
531  return false;
532  }
533 
534  return true;
535 }
536 
538 {
539  marketSegmentIdfilters_.clear();
540  std::copy(filters.begin(), filters.end(), std::inserter(marketSegmentIdfilters_, marketSegmentIdfilters_.end()));
541 
542  if (marketSegmentId2Depth_.empty() && shouldBuildInternalOrderBooks())
543  return;
544 }
545 
547 {
548  if(!config_.settings.buildInternalOrderBooks)
549  {
550  reportWarning(TextBuilder() << "Setting MarketSegmentId2Depth with book build disabled, action has no effect.");
551  return;
552  }
553 
554  marketSegmentId2Depth_.clear();
555  std::copy(map.begin(), map.end(), std::inserter(marketSegmentId2Depth_, marketSegmentId2Depth_.end()));
556 
557  if(marketSegmentIdfilters_.empty())
558  return;
559 }
560 
562 {
563  marketSegmentIdfilters_.clear();
564  clearProductDataRepositories();
565 }
566 
568 {
569  clearProductDataRepositories();
570 
571  if (!marketSegmentIdfilters_.empty())
572  {
573  BOOST_FOREACH(MarketSegmentId marketSegmentId, marketSegmentIdfilters_)
574  {
575  const EmdiProductDataRepository* const emdiProductDataRepository =
576  getOrCreateProductDataRepository(marketSegmentId, true);
577 
578  BOOST_ASSERT(emdiProductDataRepository != nullptr); (void) emdiProductDataRepository;
579  }
580  }
581  else
582  {
583  typedef std::pair<MarketSegmentId, MarketDepth> Pair;
584  BOOST_FOREACH(const Pair& pair, marketSegmentId2Depth_)
585  {
586  const EmdiProductDataRepository* const emdiProductDataRepository =
587  getOrCreateProductDataRepository(pair.first, true);
588 
589  BOOST_ASSERT(emdiProductDataRepository != nullptr); (void) emdiProductDataRepository;
590  }
591  }
592 }
593 
595 {
596  BOOST_FOREACH(EmdiProductDataRepository* repo, productDataRepositories_)
597  {
598  BOOST_ASSERT(repo);
599  if(repo->hasCashedItems())
600  return true;
601  }
602 
603  return false;
604 }
605 
607 {
608  securityIdFilters_.clear();
609  std::copy(filters.begin(), filters.end(), std::inserter(securityIdFilters_, securityIdFilters_.end()));
610 }
611 
613 {
614  securityIdFilters_.clear();
615 }
616 
617 bool EmdiDataRepository::filterSecurityId(const SecurityId& securityId) const
618 {
619  if(securityIdFilters_.empty())
620  return true;
621 
622  return securityIdFilters_.end() != securityIdFilters_.find(securityId);
623 }
624 
625 // for testing only
626 EmdiDataRepository::EmdiProductDataRepositories_ EmdiDataRepository::productDataRepositories()
627 {
628  EmdiDataRepository::EmdiProductDataRepositories_ productDataRepositories;
629 
630  BOOST_FOREACH(EmdiProductDataRepository* repo, productDataRepositories_)
631  {
632  BOOST_ASSERT(repo);
633 
634  productDataRepositories[repo->marketSegmentId()] = repo;
635  }
636 
637  return productDataRepositories;
638 }
639 
640 }}}}
MarketDepthTraits::MarketSegmentId2Depth MarketSegmentId2Depth
UInt32 MarketDepth
Alias for Market depth type.
Definition: Defines.h:43
void onSnapshotMessage(const DataSource &dataSource, const CORE::Message &msg, bool *skipRestOfPacket)
void setMarketSegmentId2Depth(const MarketSegmentId2Depth &map)
Util::TextBuilder TextBuilder
Definition: Formatting.h:43
void processIncremental(const CORE::Message &, const CORE::FieldValue &, TemplateId, const DataSource &, bool *)
unsigned int UInt32
Definition: Numeric.h:41
void onIncrementalMessage(const DataSource &dataSource, const CORE::Message &msg, TemplateId templateId)
Definition: Defines.h:30
FilteringTraits::MarketSegmentIdFilters MarketSegmentIdFilters
SequenceNumber packetSeqNum
Packet sequence number.
Definition: Defines.h:66
const MarketSegmentId UndefinedMarketSegmentId
Definition: Defines.h:45
Int64 SecurityId
Alias for Security Id type.
Definition: Defines.h:51
void setMarketSegmentIdFilters(const MarketSegmentIdFilters &filters)
FilteringTraits::SecurityIdFilters SecurityIdFilters
UInt32 MarketSegmentId
Alias for Market Segment ID type.
Definition: Defines.h:40
void processDepthSnapshot(const CORE::Message &msg, const DataSource &dataSource)
OnixS::FIX::Core::FAST::TemplateId TemplateId