OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  8.1.0
API documentation
EobiHandlerImpl.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 #include <boost/foreach.hpp>
20 #include <boost/scope_exit.hpp>
21 
22 #include <util/String.h>
23 
25 
26 #include "EobiHandlerImpl.h"
27 #include "EobiPacketProcessor.h"
28 #include "EobiPacketParser.h"
29 #include "../FeedEngineImpl.h"
30 #include "../InternalDefines.h"
31 
32 
33 namespace OnixS {
34 namespace Eurex {
35 namespace MarketData {
36 namespace EOBI {
37 namespace Implementation {
38 
39 
40 using namespace HandlerCore::MarketData;
41 using namespace HandlerCore::Common;
42 using namespace Networking;
43 
44 
46  : base ("EOBI", nullptr, settings.maxPacketSize, settings.licenseDirectory)
47  , settings_(settings)
48  , bookAllocator_(settings_.buildInternalOrderBooks ? new OrderBookAllocator(settings_.maxBooksObjectAmount) : nullptr)
49  , logger_(createAndInitLogger(settings))
50  , logFacilityForSnapshots_("Snapshot", this, Logging::LOG_LEVEL_DEBUG)
51  , logFacilityForIncrementals_("Incremental", this, Logging::LOG_LEVEL_DEBUG)
52  , snapshotPacketProcessor_(nullptr)
53  , incrementalPacketProcessor_(nullptr)
54  , listenerHolder_(new EobiListenerHolder(this, logger_.get()))
55  , dataRepository_(createAndInitDataRepository(settings))
56 {
57  forceLog (ONIXS_LOG_INFO[this] << "OnixS Eurex T7 Eobi Handler, version " << OnixS::Eurex::MarketData::Implementation::version() << ", " << os());
58  forceLog (ONIXS_LOG_INFO[this] << settings.toString());
59 
60  initListenerHolder();
61 
62  try
63  {
64  snapshotPacketProcessor_.reset(
66  settings.lostPacketWaitTime,
67  settings.heartbeatInterval,
69  lock())
70  );
71 
72  snapshotPacketProcessor_->subscribeOnPacketData(
73  boost::bind(&EobiHandlerImpl::onSnapshotPacketData, this, _1, _2, _3));
74  snapshotPacketProcessor_->subscribeOnGap(boost::bind(&EobiHandlerImpl::onSnapshotPacketGap, this));
75  snapshotPacketProcessor_->subscribeOnInactivity(boost::bind(&EobiHandlerImpl::onSnapshotInactivity, this));
76  snapshotPacketProcessor_->subscribeOnFeedInactivity(
77  boost::bind(&EobiHandlerImpl::onSnapshotFeedInactivity, this, _1));
78 
79  incrementalPacketProcessor_.reset(
81  settings.lostPacketWaitTime,
82  settings.heartbeatInterval,
84  lock())
85  );
86 
87  incrementalPacketProcessor_->subscribeOnPacketData(
88  boost::bind(&EobiHandlerImpl::onIncrementalPacketData, this, _1, _2, _3));
89  incrementalPacketProcessor_->subscribeOnGap(boost::bind(&EobiHandlerImpl::onIncrementalPacketGap, this));
90  incrementalPacketProcessor_->subscribeOnApplSeqReset(boost::bind(&EobiHandlerImpl::onApplSeqReset, this));
91  incrementalPacketProcessor_->subscribeOnInactivity(
92  boost::bind(&EobiHandlerImpl::onIncrementalInactivity, this));
93  incrementalPacketProcessor_->subscribeOnFeedInactivity(
94  boost::bind(&EobiHandlerImpl::onIncrementalFeedInactivity, this, _1));
95 
96  dataRepository_.reset(createAndInitDataRepository(settings));
97 
98  }
99  catch (const std::exception& ex)
100  {
101  reportError(TextBuilder() << "Exception in EobiHandlerImpl constructor: " << ex.what());
102  throw;
103  }
104  catch (...)
105  {
106  reportError("Unknown exception in EobiHandlerImpl constructor");
107  throw;
108  }
109 }
110 
112 {
113  stop();
114 }
115 
116 void EobiHandlerImpl::initListenerHolder()
117 {
118  try
119  {
120  listenerHolder_.reset(new EobiListenerHolder(this, logger_.get()));
121  }
122  catch (const std::exception& ex)
123  {
124  log(ONIXS_LOG_ERROR[this] << "Exception while constructing EobiListenerHolder: " << ex.what());
125  throw;
126  }
127  catch (...)
128  {
129  log(ONIXS_LOG_WARN[this] << "Unknown exception while constructing EobiListenerHolder");
130  throw;
131  }
132 }
133 
135 {
136  setFeedEngine(FeedEngineImpl::getFeedEngine(feedEngine));
137 
138  forceLog(ONIXS_LOG_INFO[this] << "Bound FeedEngine");
139  forceLog(ONIXS_LOG_INFO[this] << feedEngine.settings().toString());
140  forceLog(ONIXS_LOG_INFO[this] << feedEngine.info());
141 }
142 
144 {
145  base::start();
146 }
147 
149 {
150  incrementalPacketProcessor_->activateReplayMode();
151  snapshotPacketProcessor_->activateReplayMode();
152 
153  LogPlayerLinks links;
154 
155  links.push_back(LogPlayerLink("Incremental", incrementalPacketProcessor_.get()));
156  links.push_back(LogPlayerLink("Snapshot", snapshotPacketProcessor_.get()));
157 
158  BaseHandlerImp::start(options, links, "EOBI");
159 }
160 
162 {
163  base::stop();
164 }
165 
167 {
168  incrementalPacketProcessor_->reset();
169  snapshotPacketProcessor_->reset();
170 
171  FE::ConnectionInfo connectionA;
172  FE::ConnectionInfo connectionB;
173 
174  connectionA.role(FE::NetFeedRole::A);
175  connectionB.role(FE::NetFeedRole::B);
176 
177  FE::MulticastFeedLayout::Enum layout = FE::MulticastFeedLayout::Both;
178  if (!settings_.useFeedA)
179  layout = FE::MulticastFeedLayout::BOnly;
180  else if (!settings_.useFeedB)
181  layout = FE::MulticastFeedLayout::AOnly;
182 
183  FE::NifsByFeedRole nifs;
184  nifs.A(settings_.networkInterfaceA.empty() ? settings_.networkInterface : settings_.networkInterfaceA);
185  nifs.B(settings_.networkInterfaceB.empty() ? settings_.networkInterface : settings_.networkInterfaceB);
186 
187  ///
188  connectionA.type(FE::NetFeedType::Incremental);
189  connectionB.type(FE::NetFeedType::Incremental);
190 
191  connectionA.id("IncrementalA");
192  connectionB.id("IncrementalB");
193 
194  connectionA.ip(settings_.interfaceDescriptor.incrementalFeed.serviceA.address);
195  connectionB.ip(settings_.interfaceDescriptor.incrementalFeed.serviceB.address);
196 
197  connectionA.port(settings_.interfaceDescriptor.incrementalFeed.serviceA.port);
198  connectionB.port(settings_.interfaceDescriptor.incrementalFeed.serviceB.port);
199 
200  FE::ConnectionInfoList incrementalConnections;
201  incrementalConnections.push_back(connectionA);
202  incrementalConnections.push_back(connectionB);
203 
204  incrementalFeeds_.reset(
205  constructCluster(
206  FE::NetFeedType::Incremental,
207  layout,
208  incrementalConnections,
209  nifs,
210  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
211  *incrementalPacketProcessor_
212  )
213  );
214 
215  incrementalFeeds_->connect(FE::CanThrowNow());
216 
217  ///
218  connectionA.type(FE::NetFeedType::Snapshot);
219  connectionB.type(FE::NetFeedType::Snapshot);
220 
221  connectionA.id("SnapshotA");
222  connectionB.id("SnapshotB");
223 
224  connectionA.ip(settings_.interfaceDescriptor.snapshotFeed.serviceA.address);
225  connectionB.ip(settings_.interfaceDescriptor.snapshotFeed.serviceB.address);
226 
227  connectionA.port(settings_.interfaceDescriptor.snapshotFeed.serviceA.port);
228  connectionB.port(settings_.interfaceDescriptor.snapshotFeed.serviceB.port);
229 
230  FE::ConnectionInfoList snapshotConnections;
231  snapshotConnections.push_back(connectionA);
232  snapshotConnections.push_back(connectionB);
233 
234  snapshotFeeds_.reset(
235  constructCluster(
236  FE::NetFeedType::Snapshot,
237  layout,
238  snapshotConnections,
239  nifs,
240  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
241  *snapshotPacketProcessor_
242  )
243  );
244 }
245 
247 {
248  if(incrementalFeeds_)
249  incrementalFeeds_->disconnect();
250 
251  if(snapshotFeeds_)
252  snapshotFeeds_->disconnect();
253 }
254 
256 {
257  dataRepository_->setPartitionIdFilters(filters);
258 
259  if (filters.size() > 0)
260  {
261  TextBuilder tb;
262  tb << "PartitionId filters: ";
263 
264  BOOST_FOREACH(MarketSegmentIdFilters::const_reference item, filters)
265  {
266  tb << '\n' << item;
267  }
268 
269  log( ONIXS_LOG_INFO[this] << tb.toString());
270  }
271 }
272 
274 {
275  dataRepository_->removeAllPartitionIdFilters();
276 
277  log( ONIXS_LOG_INFO[this] << "Remove all PartitionId filters");
278 }
279 
281 {
282  dataRepository_->setMarketSegmentIdFilters(filters);
283 
284  if (filters.size() > 0)
285  {
286  TextBuilder tb;
287  tb << "MarketSegmentId filters: ";
288 
289  BOOST_FOREACH(MarketSegmentIdFilters::const_reference item, filters)
290  {
291  tb << '\n' << item;
292  }
293 
294  log( ONIXS_LOG_INFO[this] << tb.toString());
295  }
296 }
297 
299 {
300  dataRepository_->removeAllMarketSegmentIdFilters();
301 
302  log( ONIXS_LOG_INFO[this] << "Remove all MarketSegmentId filters");
303 }
304 
306 {
307  dataRepository_->setSecurityIdFilters(filters);
308 
309  if (filters.size() > 0)
310  {
311  TextBuilder tb;
312  tb << "SecurityId Filters: ";
313 
314  BOOST_FOREACH(SecurityIdFilters::const_reference item, filters)
315  {
316  tb << '\n' << item;
317  }
318 
319  log( ONIXS_LOG_INFO[this] << tb.toString());
320  }
321 }
322 
324 {
325  dataRepository_->removeAllSecurityIdFilters();
326 
327  log( ONIXS_LOG_INFO[this] << "Remove all SecurityId filters");
328 }
329 
331  DataSource &dataSource,
332  const char *data,
333  size_t size)
334 {
335  log( ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Packet " << dataSource.packetSeqNum);
336 
337  if (shouldLogPackets())
338  {
339  log( ONIXS_LOG_INFO[&logFacilityForIncrementals_] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, size) << ").");
340  }
341 
342  EobiPacketParser packetParser;
343 
344  if (!packetParser.attachPacketData(reinterpret_cast<const Networking::Byte*>(data), size))
345  {
346  reportError(TextBuilder() << "Invalid packet (" << Base64Wrapper(data, size) << ").");
347  }
348 
349  listenerHolder_->invokePacketProcessingBegin(dataSource);
350 
351  BOOST_SCOPE_EXIT(&dataSource, &listenerHolder_)
352  {
353  listenerHolder_->invokePacketProcessingEnd(dataSource);
354  }
355  BOOST_SCOPE_EXIT_END
356 
357  while (packetParser.hasMoreData())
358  {
359  const BaseMsgData *msg = packetParser.getNextMessage();
360 
361  ++dataSource.packetMessageSeqNum;
362  dataSource.isLastInPacket = !packetParser.hasMoreData();
363 
364  dataRepository_->onIncrementalMessage(dataSource, *msg);
365  }
366 }
367 
369 {
370  log( ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Packet gap");
371  dataRepository_->onIncrementalPacketGap();
372 }
373 
375 {
376  reportWarning(
377  TextBuilder()
378  << "No " << LogFacility::name() << " incremental data"
379  );
380 
381  dataRepository_->noDataOnIncrementalFeeds();
382 }
383 
384 void EobiHandlerImpl::onIncrementalFeedInactivity(FE::NetFeedRole::Enum id)
385 {
386  reportWarning(
387  TextBuilder()
388  << "No " << LogFacility::name() << " incremental data on " << FE::toStr(id)
389  );
390 }
391 
393 {
394  log( ONIXS_LOG_DEBUG[this] << "onApplSeqReset");
395  dataRepository_->onApplSeqReset();
396 }
397 
399  DataSource &dataSource,
400  const char *data,
401  size_t size)
402 {
403  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "Packet " << dataSource.packetSeqNum);
404 
405  if (shouldLogPackets())
406  {
407  log(ONIXS_LOG_INFO[&logFacilityForSnapshots_] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, size) << ").");
408  }
409 
410  EobiPacketParser packetParser;
411 
412  if (!packetParser.attachPacketData(reinterpret_cast<const Networking::Byte*>(data), size))
413  {
414  reportError(TextBuilder() << "Invalid packet (" << Base64Wrapper(data, size) << ").");
415  }
416 
417  listenerHolder_->invokePacketProcessingBegin(dataSource);
418 
419  BOOST_SCOPE_EXIT(&dataSource, &listenerHolder_)
420  {
421  listenerHolder_->invokePacketProcessingEnd(dataSource);
422  }
423  BOOST_SCOPE_EXIT_END
424 
425  while (packetParser.hasMoreData())
426  {
427  const BaseMsgData *msg = packetParser.getNextMessage();
428 
429  ++dataSource.packetMessageSeqNum;
430  dataSource.isLastInPacket = !packetParser.hasMoreData();
431 
432  dataRepository_->onSnapshotMessage(dataSource, *msg);
433  }
434 }
435 
437 {
438  log( ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "Packet gap detected");
439 }
440 
442 {
443  reportWarning(
444  TextBuilder()
445  << "No " << LogFacility::name() << " snapshot data"
446  );
447 }
448 
449 void EobiHandlerImpl::onSnapshotFeedInactivity(FE::NetFeedRole::Enum id)
450 {
451  reportWarning(
452  TextBuilder()
453  << "No " << LogFacility::name() << " snapshot data on " << FE::toStr(id)
454  );
455 }
456 
458 {
459  if (!isBacktrestingMode())
460  {
461  log( ONIXS_LOG_DEBUG[this] << "Reseting snapshot packet processor");
462  snapshotPacketProcessor_->reset();
463 
464  snapshotFeeds_->connectSwallowException();
465  }
466  else
467  log( ONIXS_LOG_DEBUG[this] << "Starting snapshot feed");
468 }
469 
471 {
472  if (!isBacktrestingMode())
473  snapshotFeeds_->disconnect();
474 
475  log( ONIXS_LOG_DEBUG[this] << "Reseting snapshot packet processor");
476 
477  snapshotPacketProcessor_->reset();
478 }
479 
480 EobiDataRepository* EobiHandlerImpl::createAndInitDataRepository(const EobiHandlerSettings &settings)
481 {
482  EobiDataRepositoryConfig repositoryConfig;
483 
484  repositoryConfig.name = "EobiDataRepository";
485  repositoryConfig.parent = this;
486  repositoryConfig.logger = logger_.get();
487  repositoryConfig.listenerHolder = listenerHolder_.get();
488  repositoryConfig.settings = settings;
489 
490  EobiDataRepository *repository = new EobiDataRepository(
491  repositoryConfig,
492  settings.buildInternalOrderBooks,
493  bookAllocator_.get());
494 
497 
498  return repository;
499 }
500 
501 HandlerLogger* EobiHandlerImpl::createAndInitLogger(const EobiHandlerSettings &settings)
502 {
503  HandlerLogger *logger = new HandlerLogger(convertLogSettings(settings));
504 
505  setLogger(&(logger->getLogger()));
506 
507  return logger;
508 }
509 
510 bool EobiHandlerImpl::shouldLogPackets() const
511 {
513 }
514 
515 void EobiHandlerImpl::onStateChanged(HandlerCore::MarketData::MulticastFeedHandler::State::Enum newState)
516 {
517  listenerHolder_->invokeStateChanged(static_cast<HandlerState::Enum>(newState));
518 }
519 
520 void EobiHandlerImpl::invokeError(const std::string& description)
521 {
523 }
524 
525 void EobiHandlerImpl::invokeWarning(const std::string& description)
526 {
527  listenerHolder().invokeWarning(description);
528 }
529 
530 }}}}}
std::string toString() const
Returns the string representation.
Manages processing machinery for market data received from feeds.
Definition: FeedEngine.h:287
EobiHandlerImpl(const EobiHandlerSettings &settings)
Initialize new instance.
InterfaceDescriptor interfaceDescriptor
Sets data interface technical configuration.
void onIncrementalPacketData(DataSource &dataSource, const char *data, size_t length)
ServiceDescriptor serviceA
Service A.
std::string info() const
Feed engine info.
Definition: FeedEngine.cpp:64
FilteringTraits::PartitionIdFilters PartitionIdFilters
ServiceDescriptor serviceB
Service B.
Util::TextBuilder TextBuilder
Definition: Formatting.h:43
void invokeWarning(const std::string &description) override
bool attachPacketData(const Networking::Byte *packet, size_t size)
Definition: Defines.h:30
FilteringTraits::MarketSegmentIdFilters MarketSegmentIdFilters
Defines params which affect logs replay.
Definition: Replay.h:77
void subscribeOnSnapshotStopRequest(const OnRequest callback)
SequenceNumber packetSeqNum
Packet sequence number.
Definition: Defines.h:66
static HandlerCore::MarketData::FE::MulticastFeedEngine * getFeedEngine(FeedEngine &feedEngine)
void invokeWarning(const std::string &description) ONIXS_NOEXCEPT
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
const FeedEngineSettings & settings() const
Settings used define behavior of given instance.
Definition: FeedEngine.cpp:59
Log binary data of received packets, applied only for Info log level and below.
Definition: LogSettings.h:75
void onIncrementalFeedInactivity(HandlerCore::MarketData::FE::NetFeedRole::Enum id)
void setPartitionIdFilters(const PartitionIdFilters &filters)
void onSnapshotFeedInactivity(HandlerCore::MarketData::FE::NetFeedRole::Enum id)
EOBI handler configuration settings.
void setMarketSegmentIdFilters(const MarketSegmentIdFilters &filters)
FilteringTraits::SecurityIdFilters SecurityIdFilters
void invokeError(const std::string &description) override
Invoke helpers.
void subscribeOnSnapshotStartRequest(const OnRequest callback)
bool isLastInPacket
Indicates whether a message is last in the packet.
Definition: Defines.h:72
EobiListenerHolder & listenerHolder()
Returns commons services as shared object.
Identifiers errors of generic nature.
Definition: ErrorListener.h:38
SequenceNumber packetMessageSeqNum
Packet message number.
Definition: Defines.h:69
std::string toString() const
Returns the string representation.
void invokeError(ErrorCode::Enum code, const std::string &description) ONIXS_NOEXCEPT
void onSnapshotPacketData(DataSource &dataSource, const char *data, size_t length)