OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.4.2
API documentation
RdiDataRepositoryGeneric.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 
21 
22 #include "Formatting.h"
23 #include "MessageOperator.h"
25 
26 
27 
28 namespace OnixS {
29 namespace Eurex {
30 namespace MarketData {
31 namespace Implementation {
32 
33 
35 {
36  return descriptors_.findAllEmdiDescriptors();
37 }
38 
40 {
41  return descriptors_.findEmdiDescriptors(segments);
42 }
43 
45 {
46  return descriptors_.findAllMdiDescriptors();
47 }
48 
50 {
51  return descriptors_.findMdiDescriptors(segments);
52 }
53 
55 {
56  return descriptors_.findAllEobiDescriptors();
57 }
58 
60 {
61  return descriptors_.findEobiDescriptors(segments);
62 }
63 
64 
67  synchronized_(false),
68  nextIncrementalMsgSeqNum_(UndefinedSeqNum),
69  senderCompId_(0),
70  cachedIncremetalQueue_(config.settings.messagePoolSize / 4)
71 {
72 
73 }
74 
76 {
77 }
78 
79 void RdiDataRepositoryGeneric::dispatchIncremental(
80  const DataSource& dataSource,
81  const CORE::Message& msg,
82  TemplateId templateId)
83 {
84  switch(templateId)
85  {
87  {
88  const InstrumentIncrementalWrapper wrapper(msg);
89  config_.listenerHolder->invokeReferenceDataInstrumentUpdate(&wrapper, dataSource);
90  }
91  break;
92 
94  {
95  const VarianceFuturesStatusWrapper wrapper(msg);
96  config_.listenerHolder->invokeReferenceDataVarianceFuturesStatus(&wrapper, dataSource);
97  }
98  break;
99 
101  {
102  const TotalReturnFuturesStatusWrapper wrapper(msg);
103  config_.listenerHolder->invokeReferenceDataTotalReturnFuturesStatus(&wrapper, dataSource);
104  }
105  break;
106 
108  {
109  const TradeAtReferencePriceStatusWrapper wrapper(msg);
110  config_.listenerHolder->invokeReferenceDataTradeAtReferencePriceStatus(&wrapper, dataSource);
111  }
112  break;
113 
114  default:
115  reportWarning(TextBuilder() << "Unexpected incremental message with templateId=" << templateId);
116  break;
117  }
118 }
119 
120 void RdiDataRepositoryGeneric::enqueueIncremental(
121  const DataSource& dataSource,
122  const CORE::Message& msg,
123  TemplateId templateId)
124 {
125  cachedIncremetalQueue_.push(msg, dataSource, templateId);
126 }
127 
128 void RdiDataRepositoryGeneric::applyIncremental(
129  const DataSource& dataSource,
130  const CORE::Message& msg,
131  TemplateId templateId)
132 {
133  if (senderCompIdChanged(dataSource.senderCompID))
134  {
135  processGapDetected();
136  return;
137  }
138 
140 
141  msg[Tags::MsgSeqNum].convertTo(seqNum);
142 
143  if(seqNum == nextIncrementalMsgSeqNum_)
144  {
146  ++nextIncrementalMsgSeqNum_;
147 
148  dispatchIncremental(dataSource, msg, templateId);
149  }
150  else if(seqNum > nextIncrementalMsgSeqNum_)
151  {
152  processGapDetected();
153  }
154  else
155  log(ONIXS_LOG_DEBUG[this] << "Ignored message: " << msg);
156 }
157 
158 void RdiDataRepositoryGeneric::processIncremental(
159  const DataSource& dataSource,
160  const CORE::Message& msg,
161  TemplateId templateId)
162 {
163  if(synchronized_)
164  {
165  applyIncremental(dataSource, msg, templateId);
166  }
167 
168  if(!synchronized_)
169  {
170  enqueueIncremental(dataSource, msg, templateId);
171  }
172 }
173 
175  const DataSource& dataSource,
176  const CORE::Message& msg,
177  TemplateId templateId)
178 {
179  log(ONIXS_LOG_DEBUG[this] << msg);
180 
181  BOOST_ASSERT(msg[Tags::MsgType] != "Reset");
182 
183  if(templateId == BeaconTemplateId)
184  {
185  processBeaconMsg(dataSource, msg);
186  return;
187  }
188 
189  processIncremental(dataSource, msg, templateId);
190 }
191 
193 {
194  if (senderCompId_ == 0)
195  senderCompId_ = value;
196 
197  if (senderCompId_ != value)
198  {
199  senderCompId_ = value;
200  return true;
201  }
202  return false;
203 }
204 
205 void RdiDataRepositoryGeneric::processBeaconMsg(const DataSource& dataSource, const CORE::Message& msg)
206 {
207  if(synchronized_)
208  {
209  UInt32 lastMsgSeqNumProcessed;
210  msg[Tags::LastMsgSeqNumProcessed].convertTo(lastMsgSeqNumProcessed);
211 
212  if (senderCompIdChanged(dataSource.senderCompID))
213  {
214  if (nextIncrementalMsgSeqNum_ != lastMsgSeqNumProcessed + 1)
215  {
216  nextIncrementalMsgSeqNum_ = lastMsgSeqNumProcessed + 1;
217  log(ONIXS_LOG_DEBUG[this] << "senderCompIdChanged, nextIncrementalMsgSeqNum = " << nextIncrementalMsgSeqNum_);
218  }
219  }
220  else
221  {
222  if (lastMsgSeqNumProcessed + 1 > nextIncrementalMsgSeqNum_)
223  processGapDetected();
224  }
225  }
226 }
227 
228 void RdiDataRepositoryGeneric::processGapDetected()
229 {
230  log(ONIXS_LOG_INFO[this]
231  << "Message gap detected: expected "
232  << nextIncrementalMsgSeqNum_);
233 
234  synchronized_ = false;
235  nextIncrementalMsgSeqNum_ = UndefinedSeqNum;
236 
237  cachedIncremetalQueue_.reset();
238 
240 }
241 
242 void RdiDataRepositoryGeneric::playCashedIncrementals()
243 {
244  while(!cachedIncremetalQueue_.empty())
245  {
246  MessageInfo messageInfo = cachedIncremetalQueue_.pop();
247  const CORE::Message& msg = *messageInfo.message;
248 
249  const TemplateId templateId = messageInfo.templateId;
250  DataSource& dataSource = *messageInfo.dataSource;
251 
252  dataSource.cached = true;
253  dataSource.origin = DataSource::Undefined;
254 
255  applyIncremental(dataSource, msg, templateId);
256  }
257 
258  cachedIncremetalQueue_.reset();
259 }
260 
262 {
263  return !cachedIncremetalQueue_.empty();
264 }
265 
266 bool RdiDataRepositoryGeneric::shouldStartRecoveryOnPacketGap() const
267 {
268  return config_.settings.startRecoveryOnPacketGap;
269 }
270 
272 {
273  if (shouldStartRecoveryOnPacketGap() && synchronized_)
274  {
275  processGapDetected();
276  }
277 }
278 
280 {
281  descriptors_.referenceDataReady();
282 
283  synchronized_ = true;
285 
286  config_.listenerHolder->invokeReferenceDataEnd();
287 
288  playCashedIncrementals();
289 
290  log(ONIXS_LOG_DEBUG[this] << descriptors_.printDescriptorsInfo());
291 }
292 
293 void RdiDataRepositoryGeneric::apply(const ProductSnapshot& msg, const DataSource& dataSource)
294 {
295  config_.listenerHolder->invokeReferenceDataProduct(&msg, dataSource );
296 
297  descriptors_.apply(msg);
298 }
299 
301 {
302  config_.listenerHolder->invokeReferenceDataInstrument(&msg, dataSource);
303 }
304 
306 {
307  config_.listenerHolder->invokeReferenceDataInstrumentUpdate(&msg, dataSource);
308 }
309 
311 {
312  config_.listenerHolder->invokeReferenceDataVarianceFuturesStatus(&msg, dataSource);
313 }
314 
316 {
317  config_.listenerHolder->invokeReferenceDataTotalReturnFuturesStatus(&msg, dataSource);
318 }
319 
321 {
322  config_.listenerHolder->invokeReferenceDataTradeAtReferencePriceStatus(&msg, dataSource);
323 }
324 
325 
326 }}}}
unsigned int SequenceNumber
Alias for sequence numbers.
const Tag LastMsgSeqNumProcessed
Definition: Tags.h:67
const TemplateId TradeAtReferencePriceStatusTemplateId
UInt32 senderCompID
Unique id for a sender.
Definition: Defines.h:85
EobiDescriptor::Collection EobiDescriptors
void push(const CORE::Message &message, const DataSource &dataSource, TemplateId templateId)
Util::TextBuilder TextBuilder
Definition: Formatting.h:43
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
EmdiDescriptor::Collection EmdiDescriptors
MdiDescriptor::Collection MdiDescriptors
void onIncrementalMessage(const DataSource &, const CORE::Message &, TemplateId)
OnixS::FIX::Core::FAST::TemplateId TemplateId
IInterfaceDescriptorProvider::MarketSegments MarketSegments