OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.0.3
API documentation
EobiDataRepository.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 
20 #include "EobiDataRepository.h"
23 #include "../Formatting.h"
24 #include <boost/foreach.hpp>
25 
26 using namespace OnixS;
28 
30  const EobiDataRepositoryConfig &config,
31  bool buildInternalOrderBooks,
32  OrderBookAllocator* orderBookAllocator)
33  : base(config)
34  , state_(Synchronized)
35  , repositories_()
36  , partitionFilters_()
37  , productFilters_()
38  , instrumentFilters_()
39  , nextSnapshotMsgSeqNum_(0)
40  , orderBookAllocator_(orderBookAllocator)
41 {
42  BOOST_ASSERT(!buildInternalOrderBooks || orderBookAllocator);
43  config_.settings.buildInternalOrderBooks = buildInternalOrderBooks;
44 }
45 
47 {
48  clearProductDataRepositories();
49 }
50 
53 {
54  partitionFilters_.clear();
55 
56  std::copy(
57  filters.begin(),
58  filters.end(),
59  std::inserter(partitionFilters_, partitionFilters_.end()));
60 }
61 
63 {
64  partitionFilters_.clear();
65 }
66 
69 {
70  productFilters_.clear();
71 
72  std::copy(
73  filters.begin(),
74  filters.end(),
75  std::inserter(productFilters_, productFilters_.end()));
76 }
77 
79 {
80  productFilters_.clear();
81 }
82 
85 {
86  instrumentFilters_ = filters;
87 
88  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
89  {
90  entry.second->setSecurityIdFilters(filters);
91  }
92 }
93 
95 {
96  instrumentFilters_.clear();
97 }
98 
100  const Eurex::MarketData::DataSource& dataSource,
101  const BaseMsgData& msgData)
102 {
103  if (shouldSkipMessage(dataSource))
104  {
105  return ;
106  }
107 
108  log(ONIXS_LOG_DEBUG[this] << "IM: " << dataSource.marketSegmentId << ": " << msgData);
109 
110  EobiProductDataRepository* repository
111  = getOrCreateProductDataRepository(dataSource.marketSegmentId);
112 
113  BOOST_ASSERT(repository != NULL);
114 
115  bool gapDetected = false;
116  repository->processIncrementalMessage(dataSource, msgData, &gapDetected);
117  if (gapDetected)
118  {
119  repository->startRecovery();
120  startRecovery();
121  }
122 }
123 
125 {
126  log(ONIXS_LOG_DEBUG[this] << "Incremental packet gap.");
127 
128 
129  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
130  {
131  BOOST_ASSERT(entry.second != NULL);
132  entry.second->onGap();
133  }
134 
135  if (shouldStartRecoveryOnPacketGap())
136  {
137  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
138  {
139  entry.second->restartRecoveryIfInProgress();
140  }
141 
142  startRecovery();
143  }
144 }
145 
147  const Eurex::MarketData::DataSource &dataSource,
148  const BaseMsgData &msgData)
149 {
150  if (msgData.MsgSeqNum == 0)
151  {
152  processRecoveryCycleStart(dataSource, msgData);
153  }
154  else if (state_ == RecoveryInProgress)
155  {
156  processRecoveryCycleEntry(dataSource, msgData);
157  }
158 }
159 
160 void EobiDataRepository::processRecoveryCycleStart(
161  const Eurex::MarketData::DataSource &dataSource,
162  const BaseMsgData &msgData)
163 {
164  log(ONIXS_LOG_DEBUG[this] << "processRecoveryCycleStart()");
165 
166  if (state_ == RecoveryPending) // waiting for the cycle start
167  {
168  state_ = RecoveryInProgress;
169 
170  processRecoveryCycleEntry(dataSource, msgData);
171  }
172  else if (state_ == RecoveryInProgress) // waiting for the next cycle start
173  {
174  if (!completeRecovery())
175  {
176  restartRecovery();
177 
178  processRecoveryCycleEntry(dataSource, msgData);
179  }
180  }
181 }
182 
183 bool EobiDataRepository::processRecoveryCycleEntry(
184  const Eurex::MarketData::DataSource &dataSource,
185  const BaseMsgData &msgData)
186 {
187  if (nextSnapshotMsgSeqNum_ != msgData.MsgSeqNum)
188  {
189  log(ONIXS_LOG_DEBUG[this] << "Snapshot message gap (expected " << nextSnapshotMsgSeqNum_ << ", actual: " << msgData.MsgSeqNum << ")");
190 
191  restartRecovery();
192 
193  return false;
194  }
195 
196  if (!shouldSkipMessage(dataSource))
197  {
198  log(ONIXS_LOG_DEBUG[this] << "SM: " << msgData);
199 
200  EobiProductDataRepository *repository
201  = getOrCreateProductDataRepository(dataSource.marketSegmentId);
202 
203  repository->processSnapshotMessage(dataSource, msgData);
204  }
205 
206  ++nextSnapshotMsgSeqNum_;
207 
208  return true;
209 }
210 
211 void EobiDataRepository::startRecovery()
212 {
213  if (state_ == Synchronized)
214  {
215  log(ONIXS_LOG_DEBUG[this] << "Starting recovery.");
216 
217  state_ = RecoveryPending;
218 
219  BOOST_ASSERT(config_.listenerHolder != NULL);
221 
223  }
224 }
225 
226 void EobiDataRepository::restartRecovery()
227 {
228  log(ONIXS_LOG_DEBUG[this] << "Restarting recovery.");
229 
230  state_ = RecoveryPending;
231  nextSnapshotMsgSeqNum_ = 0;
232 
233  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
234  {
235  entry.second->restartRecoveryIfInProgress();
236  }
237 }
238 
239 bool EobiDataRepository::completeRecovery()
240 {
241  log(ONIXS_LOG_DEBUG[this] << "Completing recovery...");
242 
243  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
244  {
245  if (!entry.second->completeRecovery())
246  {
247  return false;
248  }
249  }
250 
251  state_ = Synchronized;
252  log(ONIXS_LOG_DEBUG[this] << "Recovery completed");
253 
255 
256  BOOST_ASSERT(config_.listenerHolder != NULL);
258 
259  return true;
260 }
261 
262 EobiProductDataRepository* EobiDataRepository::getOrCreateProductDataRepository(
264 {
265  ProductDataRepositories::const_iterator it = repositories_.find(product);
266  if (it != repositories_.end())
267  {
268  return (*it).second;
269  }
270 
272  config.parent = this;
273 
274  EobiProductDataRepository *repository
275  = new EobiProductDataRepository(product, getMarketDepth(product), config, orderBookAllocator_, shouldBuildInternalOrderBooks());
276 
277  repository->setSecurityIdFilters(instrumentFilters_);
278 
279  repositories_[product] = repository;
280 
281  return repository;
282 }
283 
284 void EobiDataRepository::clearProductDataRepositories()
285 {
286  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
287  {
288  BOOST_ASSERT(entry.second != NULL);
289  delete entry.second;
290  }
291 
292  repositories_.clear();
293 }
294 
295 Eurex::MarketData::MarketDepth EobiDataRepository::getMarketDepth(
296  Eurex::MarketData::MarketSegmentId /*product*/) const
297 {
298  return config_.settings.bookDepth;
299 }
300 
301 bool EobiDataRepository::shouldSkipMessage(
302  const Eurex::MarketData::DataSource &dataSource) const
303 {
304  const bool skipPartition = !partitionFilters_.empty()
305  && (partitionFilters_.find(dataSource.partitionId) == partitionFilters_.end());
306 
307  const bool skipProduct = !productFilters_.empty()
308  && (productFilters_.find(dataSource.marketSegmentId) == productFilters_.end());
309 
310  return (skipPartition || skipProduct);
311 }
312 
314 {
315  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
316  {
317  BOOST_ASSERT(entry.second != NULL);
318  entry.second->noDataOnIncrementalFeeds();
319  }
320 }
321 
323 {
324  log(ONIXS_LOG_DEBUG[this] << "Stopping recovery...");
325 
326  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
327  {
328  entry.second->onApplSeqReset();
329  }
330 
331  state_ = Synchronized;
332  log(ONIXS_LOG_DEBUG[this] << "Recovery stopped");
333 
335 }
EobiDataRepository(const EobiDataRepositoryConfig &config, bool buildInternalOrderBooks, OrderBookAllocator *orderBookAllocator)
void onIncrementalMessage(const DataSource &dataSource, const BaseMsgData &msgData)
void processIncrementalMessage(const DataSource &dataSource, const BaseMsgData &msgData, bool *gapDetected)
UInt32 MarketDepth
Alias for Market depth type.
Definition: Defines.h:43
void processSnapshotMessage(const DataSource &dataSource, const BaseMsgData &msgData)
FilteringTraits::PartitionIdFilters PartitionIdFilters
Definition: Group.h:25
void setMarketSegmentIdFilters(const MarketSegmentIdFilters &filters)
FilteringTraits::MarketSegmentIdFilters MarketSegmentIdFilters
FilteringTraits::SecurityIdFilters SecurityIdFilters
UInt32 MarketSegmentId
Alias for Market Segment ID type.
Definition: Defines.h:40
void onSnapshotMessage(const DataSource &dataSource, const BaseMsgData &msgData)