OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.2.2
API documentation
EobiDataRepository.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 
20 #include "EobiDataRepository.h"
22 
23 
24 using namespace OnixS;
26 
28  const EobiDataRepositoryConfig &config,
29  bool buildInternalOrderBooks,
30  OrderBookAllocator* orderBookAllocator)
31  : base(config)
32  , state_(Synchronized)
33  , repositories_()
34  , partitionFilters_()
35  , productFilters_()
36  , instrumentFilters_()
37  , nextSnapshotMsgSeqNum_(0)
38  , orderBookAllocator_(orderBookAllocator)
39 {
40  BOOST_ASSERT(!buildInternalOrderBooks || orderBookAllocator);
41  config_.settings.buildInternalOrderBooks = buildInternalOrderBooks;
42 }
43 
45 {
46  clearProductDataRepositories();
47 }
48 
51 {
52  partitionFilters_.clear();
53 
54  std::copy(
55  filters.begin(),
56  filters.end(),
57  std::inserter(partitionFilters_, partitionFilters_.end()));
58 }
59 
61 {
62  partitionFilters_.clear();
63 }
64 
67 {
68  productFilters_.clear();
69 
70  std::copy(
71  filters.begin(),
72  filters.end(),
73  std::inserter(productFilters_, productFilters_.end()));
74 }
75 
77 {
78  productFilters_.clear();
79 }
80 
83 {
84  instrumentFilters_ = filters;
85 
86  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
87  {
88  entry.second->setSecurityIdFilters(filters);
89  }
90 }
91 
93 {
94  instrumentFilters_.clear();
95 }
96 
98  const Eurex::MarketData::DataSource& dataSource,
99  const BaseMsgData& msgData)
100 {
101  if (shouldSkipMessage(dataSource))
102  {
103  return ;
104  }
105 
106  log(ONIXS_LOG_DEBUG[this] << "IM: " << dataSource.marketSegmentId << ": " << msgData);
107 
108  EobiProductDataRepository* repository
109  = getOrCreateProductDataRepository(dataSource.marketSegmentId);
110 
111  BOOST_ASSERT(repository != nullptr);
112 
113  bool gapDetected = false;
114  repository->processIncrementalMessage(dataSource, msgData, &gapDetected);
115  if (gapDetected)
116  {
117  repository->startRecovery();
118  startRecovery();
119  }
120 }
121 
123 {
124  log(ONIXS_LOG_DEBUG[this] << "Incremental packet gap.");
125 
126 
127  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
128  {
129  BOOST_ASSERT(entry.second != nullptr);
130  entry.second->onGap();
131  }
132 
133  if (shouldStartRecoveryOnPacketGap())
134  {
135  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
136  {
137  entry.second->restartRecoveryIfInProgress();
138  }
139 
140  startRecovery();
141  }
142 }
143 
145  const Eurex::MarketData::DataSource &dataSource,
146  const BaseMsgData &msgData)
147 {
148  if (msgData.MsgSeqNum == 0)
149  {
150  processRecoveryCycleStart(dataSource, msgData);
151  }
152  else if (state_ == RecoveryInProgress)
153  {
154  processRecoveryCycleEntry(dataSource, msgData);
155  }
156 }
157 
158 void EobiDataRepository::processRecoveryCycleStart(
159  const Eurex::MarketData::DataSource &dataSource,
160  const BaseMsgData &msgData)
161 {
162  log(ONIXS_LOG_DEBUG[this] << "processRecoveryCycleStart()");
163 
164  if (state_ == RecoveryPending) // waiting for the cycle start
165  {
166  state_ = RecoveryInProgress;
167 
168  processRecoveryCycleEntry(dataSource, msgData);
169  }
170  else if (state_ == RecoveryInProgress) // waiting for the next cycle start
171  {
172  if (!completeRecovery())
173  {
174  restartRecovery();
175 
176  processRecoveryCycleEntry(dataSource, msgData);
177  }
178  }
179 }
180 
181 bool EobiDataRepository::processRecoveryCycleEntry(
182  const Eurex::MarketData::DataSource &dataSource,
183  const BaseMsgData &msgData)
184 {
185  if (nextSnapshotMsgSeqNum_ != msgData.MsgSeqNum)
186  {
187  log(ONIXS_LOG_DEBUG[this] << "Snapshot message gap (expected " << nextSnapshotMsgSeqNum_ << ", actual: " << msgData.MsgSeqNum << ")");
188 
189  restartRecovery();
190 
191  return false;
192  }
193 
194  if (!shouldSkipMessage(dataSource))
195  {
196  log(ONIXS_LOG_DEBUG[this] << "SM: " << msgData);
197 
198  EobiProductDataRepository *repository
199  = getOrCreateProductDataRepository(dataSource.marketSegmentId);
200 
201  repository->processSnapshotMessage(dataSource, msgData);
202  }
203 
204  ++nextSnapshotMsgSeqNum_;
205 
206  return true;
207 }
208 
209 void EobiDataRepository::startRecovery()
210 {
211  if (state_ == Synchronized)
212  {
213  log(ONIXS_LOG_DEBUG[this] << "Starting recovery.");
214 
215  state_ = RecoveryPending;
216 
217  BOOST_ASSERT(config_.listenerHolder != nullptr);
219 
221  }
222 }
223 
224 void EobiDataRepository::restartRecovery()
225 {
226  log(ONIXS_LOG_DEBUG[this] << "Restarting recovery.");
227 
228  state_ = RecoveryPending;
229  nextSnapshotMsgSeqNum_ = 0;
230 
231  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
232  {
233  entry.second->restartRecoveryIfInProgress();
234  }
235 }
236 
237 bool EobiDataRepository::completeRecovery()
238 {
239  log(ONIXS_LOG_DEBUG[this] << "Completing recovery...");
240 
241  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
242  {
243  if (!entry.second->completeRecovery())
244  {
245  return false;
246  }
247  }
248 
249  state_ = Synchronized;
250  log(ONIXS_LOG_DEBUG[this] << "Recovery completed");
251 
253 
254  BOOST_ASSERT(config_.listenerHolder != nullptr);
256 
257  return true;
258 }
259 
260 EobiProductDataRepository* EobiDataRepository::getOrCreateProductDataRepository(
262 {
263  ProductDataRepositories::const_iterator it = repositories_.find(product);
264  if (it != repositories_.end())
265  {
266  return (*it).second;
267  }
268 
270  config.parent = this;
271 
272  EobiProductDataRepository *repository
273  = new EobiProductDataRepository(product, getMarketDepth(product), config, orderBookAllocator_, shouldBuildInternalOrderBooks());
274 
275  repository->setSecurityIdFilters(instrumentFilters_);
276 
277  repositories_[product] = repository;
278 
279  return repository;
280 }
281 
282 void EobiDataRepository::clearProductDataRepositories()
283 {
284  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
285  {
286  BOOST_ASSERT(entry.second != nullptr);
287  delete entry.second;
288  }
289 
290  repositories_.clear();
291 }
292 
293 Eurex::MarketData::MarketDepth EobiDataRepository::getMarketDepth(
294  Eurex::MarketData::MarketSegmentId /*product*/) const
295 {
296  return config_.settings.bookDepth;
297 }
298 
299 bool EobiDataRepository::shouldSkipMessage(
300  const Eurex::MarketData::DataSource &dataSource) const
301 {
302  const bool skipPartition = !partitionFilters_.empty()
303  && (partitionFilters_.find(dataSource.partitionId) == partitionFilters_.end());
304 
305  const bool skipProduct = !productFilters_.empty()
306  && (productFilters_.find(dataSource.marketSegmentId) == productFilters_.end());
307 
308  return (skipPartition || skipProduct);
309 }
310 
312 {
313  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
314  {
315  BOOST_ASSERT(entry.second != nullptr);
316  entry.second->noDataOnIncrementalFeeds();
317  }
318 }
319 
321 {
322  log(ONIXS_LOG_DEBUG[this] << "Stopping recovery...");
323 
324  BOOST_FOREACH(ProductDataRepositories::const_reference entry, repositories_)
325  {
326  entry.second->onApplSeqReset();
327  }
328 
329  state_ = Synchronized;
330  log(ONIXS_LOG_DEBUG[this] << "Recovery stopped");
331 
333 }
EobiDataRepository(const EobiDataRepositoryConfig &config, bool buildInternalOrderBooks, OrderBookAllocator *orderBookAllocator)
void onIncrementalMessage(const DataSource &dataSource, const BaseMsgData &msgData)
void processIncrementalMessage(const DataSource &dataSource, const BaseMsgData &msgData, bool *gapDetected)
UInt32 MarketDepth
Alias for Market depth type.
Definition: Defines.h:43
void processSnapshotMessage(const DataSource &dataSource, const BaseMsgData &msgData)
FilteringTraits::PartitionIdFilters PartitionIdFilters
Definition: Defines.h:30
void setMarketSegmentIdFilters(const MarketSegmentIdFilters &filters)
FilteringTraits::MarketSegmentIdFilters MarketSegmentIdFilters
FilteringTraits::SecurityIdFilters SecurityIdFilters
UInt32 MarketSegmentId
Alias for Market Segment ID type.
Definition: Defines.h:40
void onSnapshotMessage(const DataSource &dataSource, const BaseMsgData &msgData)