OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.2.1
API documentation
RdiDataRepositoryEx.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
20 
21 #include "InternalDefines.h"
22 #include "MessageOperator.h"
23 
24 #include "RdiDataRepositoryEx.h"
25 
26 
27 namespace OnixS {
28 namespace Eurex {
29 namespace MarketData {
30 namespace Implementation {
31 
33 
34 
36 {
37 }
38 
41  mDCount_(0),
42  lastMsgSeqNumProcessed_(0)
43 {
44 }
45 
46 template <typename MessageType>
47 void RdiDataRepositoryEx::process(
48  const MessageType& msg,
49  const DataSource& dataSource,
50  bool* /*skipRestOfPacket*/)
51 {
52  const CORE::SeqNumber seqNum = msg.getUInt32(Tags::MsgSeqNum);
53 
54  if(msgSnapshotSeqNums_.find(seqNum) == msgSnapshotSeqNums_.end())
55  {
56  RdiDataRepositoryGeneric::apply(msg, dataSource);
57 
58  msgSnapshotSeqNums_.insert(seqNum);
59 
60  if (msgSnapshotSeqNums_.size() == lastMsgSeqNumProcessed_)
62  }
63 }
64 
66  const DataSource& dataSource,
67  const CORE::Message& msg,
68  TemplateId templateId,
69  bool* skipRestOfPacket)
70 {
71  BOOST_ASSERT(skipRestOfPacket);
72 
73  if(synchronized())
74  {
75  *skipRestOfPacket = true;
76  return;
77  }
78 
79  *skipRestOfPacket = false;
80 
81  if(templateId == ResetTemplateId)
82  return;
83 
84  BOOST_ASSERT(msg[Tags::MsgType] != "Reset");
85 
86  log(ONIXS_LOG_DEBUG[this] << msg);
87 
88 
89  if(templateId == MarketDataReportTemplateId)
90  {
91  processMarketDataReportMessage(msg);
92  }
93  else
94  {
95  if (msgSnapshotSeqNums_.empty())
96  config_.listenerHolder->invokeReferenceDataStart();
97 
98  switch (templateId)
99  {
101  process(ProductSnapshotWrapper(msg), dataSource, skipRestOfPacket);
102  break;
103 
105  process(InstrumentSnapshotWrapper(msg), dataSource, skipRestOfPacket);
106  break;
107 
109  process(InstrumentIncrementalWrapper(msg), dataSource, skipRestOfPacket);
110  break;
111 
113  process(VarianceFuturesStatusWrapper(msg), dataSource, skipRestOfPacket);
114  break;
115 
117  process(TotalReturnFuturesStatusWrapper(msg), dataSource, skipRestOfPacket);
118  break;
119 
121  process(TradeAtReferencePriceStatusWrapper(msg), dataSource, skipRestOfPacket);
122  break;
123 
124  default:
125  throw OnixS::BadArgumentException(BOOST_CURRENT_FUNCTION, "templateId", templateId);
126  break;
127  }
128  }
129 }
130 
131 void RdiDataRepositoryEx::processMarketDataReportMessage(const CORE::Message& msg)
132 {
133  const UInt32 mDReportEvent = msg[Tags::MDReportEvent].get<UInt32>();
134 
135  if(mDReportEvent == 0) //StartOfReferenceData
136  {
137  msg[Tags::MDReportCount].convertTo(mDCount_);
138  msg[Tags::LastMsgSeqNumProcessed].convertTo(lastMsgSeqNumProcessed_);
139 
140  UInt32 totNoMarketSegments;
141  UInt32 totNoInstruments;
142  msg[Tags::TotNoMarketSegmentReports].convertTo(totNoMarketSegments);
143  msg[Tags::TotNoInstrumentReports].convertTo(totNoInstruments);
144 
145  log(ONIXS_LOG_DEBUG[this] << "MDCount=" << mDCount_);
146  log(ONIXS_LOG_DEBUG[this] << "LastMsgSeqNumProcessed=" << lastMsgSeqNumProcessed_);
147  log(ONIXS_LOG_DEBUG[this] << "TotNoMarketSegmentReports=" << totNoMarketSegments);
148  log(ONIXS_LOG_DEBUG[this] << "TotNoInstrumentReports=" << totNoInstruments);
149 
150  if( nextIncrementalMsgSeqNum() == 0 )
151  nextIncrementalMsgSeqNum() = lastMsgSeqNumProcessed_ - mDCount_ + 1;
152  }
153 }
154 
155 void RdiDataRepositoryEx::onIncrementalSeqNumProcessed(SequenceNumber seqNum)
156 {
157  msgSnapshotSeqNums_.insert(mDCount_ + seqNum);
158 }
159 
160 }}}}
161 
unsigned int SequenceNumber
Alias for sequence numbers.
const Tag LastMsgSeqNumProcessed
Definition: Tags.h:67
const TemplateId TradeAtReferencePriceStatusTemplateId
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
void onSnapshotMessage(const DataSource &, const CORE::Message &, TemplateId, bool *skipRestOfPacket) override
const Tag TotNoMarketSegmentReports
Definition: Tags.h:157
const Tag TotNoInstrumentReports
Definition: Tags.h:158
OnixS::FIX::Core::FAST::TemplateId TemplateId