OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  8.1.0
API documentation
RdiDataRepository.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
20 
21 #include "MessageOperator.h"
22 #include "RdiDataRepository.h"
23 
24 #include "InternalDefines.h"
25 
26 
27 namespace OnixS {
28 namespace Eurex {
29 namespace MarketData {
30 namespace Implementation {
31 
33 
35 {
36 }
37 
40  mDCount_(0),
41  lastMsgSeqNumProcessed_(0),
42  nextSnapshotMsgSeqNum_(0)
43 {
44 }
45 
46 template <typename MessageType>
47 void RdiDataRepository::process(
48  const MessageType& msg,
49  const DataSource& dataSource,
50  bool* /*skipRestOfPacket*/)
51 {
52  const CORE::SeqNumber seqNum = msg.getUInt32(Tags::MsgSeqNum);
53 
54  if( seqNum == nextSnapshotMsgSeqNum_)
55  {
56  ++nextSnapshotMsgSeqNum_;
57 
58  RdiDataRepositoryGeneric::apply(msg, dataSource);
59  }
60  else
61  {
62  log(ONIXS_LOG_INFO[this] << "Message gap detected, expected " << nextSnapshotMsgSeqNum_ << ", received " << seqNum);
63  nextSnapshotMsgSeqNum_ = 0;
64  }
65 }
66 
68  const DataSource& dataSource,
69  const CORE::Message& msg,
70  TemplateId templateId,
71  bool* skipRestOfPacket)
72 {
73  BOOST_ASSERT(skipRestOfPacket);
74 
75  if(synchronized())
76  {
77  *skipRestOfPacket = true;
78  return;
79  }
80 
81  if(templateId == ResetTemplateId)
82  return;
83 
84  BOOST_ASSERT(msg[Tags::MsgType] != "Reset");
85 
86  log(ONIXS_LOG_DEBUG[this] << msg);
87 
88 
89  if(templateId == MarketDataReportTemplateId)
90  {
91  processMarketDataReportMessage(msg);
92  }
93  else
94  {
95  switch (templateId)
96  {
98  process(ProductSnapshotWrapper(msg), dataSource, skipRestOfPacket);
99  break;
100 
102  process(InstrumentSnapshotWrapper(msg), dataSource, skipRestOfPacket);
103  break;
104 
106  process(InstrumentIncrementalWrapper(msg), dataSource, skipRestOfPacket);
107  break;
108 
110  process(VarianceFuturesStatusWrapper(msg), dataSource, skipRestOfPacket);
111  break;
112 
114  process(TotalReturnFuturesStatusWrapper(msg), dataSource, skipRestOfPacket);
115  break;
116 
118  process(TradeAtReferencePriceStatusWrapper(msg), dataSource, skipRestOfPacket);
119  break;
120 
121  default:
122  throw OnixS::BadArgumentException(BOOST_CURRENT_FUNCTION, "templateId", templateId);
123  break;
124  }
125  }
126 
127  *skipRestOfPacket = nextSnapshotMsgSeqNum_ == 0 ;
128 }
129 
130 void RdiDataRepository::processMarketDataReportMessage(const CORE::Message& msg)
131 {
132  const UInt32 mDReportEvent = msg[Tags::MDReportEvent].get<UInt32>();
133 
134  if(mDReportEvent == 0) //StartOfReferenceData
135  {
136  nextSnapshotMsgSeqNum_ = 1;
137  msg[Tags::MDReportCount].convertTo(mDCount_);
138  msg[Tags::LastMsgSeqNumProcessed].convertTo(lastMsgSeqNumProcessed_);
139 
140  UInt32 totNoMarketSegments;
141  UInt32 totNoInstruments;
142  msg[Tags::TotNoMarketSegmentReports].convertTo(totNoMarketSegments);
143  msg[Tags::TotNoInstrumentReports].convertTo(totNoInstruments);
144 
145  log(ONIXS_LOG_DEBUG[this] << "MDCount=" << mDCount_);
146  log(ONIXS_LOG_DEBUG[this] << "LastMsgSeqNumProcessed=" << lastMsgSeqNumProcessed_);
147  log(ONIXS_LOG_DEBUG[this] << "TotNoMarketSegmentReports=" << totNoMarketSegments);
148  log(ONIXS_LOG_DEBUG[this] << "TotNoInstrumentReports=" << totNoInstruments);
149 
150  nextIncrementalMsgSeqNum() = lastMsgSeqNumProcessed_ - mDCount_ + 1;
151 
152  config_.listenerHolder->invokeReferenceDataStart();
153  }
154  else if(mDReportEvent == 1 && nextSnapshotMsgSeqNum_ > 0) //EndOfReferenceData
155  {
156  log(ONIXS_LOG_INFO[this] << "Recovery finished, NextIncrementalMsgSeqNum=" << nextIncrementalMsgSeqNum());
157 
158  if(nextSnapshotMsgSeqNum_ == lastMsgSeqNumProcessed_ + 1 )
159  {
160  finishRecovery();
161  }
162  else
163  log(ONIXS_LOG_DEBUG[this] << "NextSnapshotMsgSeqNum != LastMsgSeqNumProcessed + 1, " << nextSnapshotMsgSeqNum_ << " != " << lastMsgSeqNumProcessed_ << "+ 1");
164  }
165 }
166 
167 }}}}
168 
const Tag LastMsgSeqNumProcessed
Definition: Tags.h:65
const TemplateId TradeAtReferencePriceStatusTemplateId
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
void onSnapshotMessage(const DataSource &, const CORE::Message &, TemplateId, bool *skipRestOfPacket) override
const Tag TotNoMarketSegmentReports
Definition: Tags.h:141
const Tag TotNoInstrumentReports
Definition: Tags.h:142
OnixS::FIX::Core::FAST::TemplateId TemplateId