OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.4.2
API documentation
EmdiHandlerImpl.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 #include <system/File.h>
20 #include <util/String.h>
21 
22 #include <OnixS/Core/Bootstrap/Platform.h>
23 
25 
26 #include <OnixS/HandlerCore/FeedEngine/NetFeedAttributes.h>
27 
28 #include "EmdiHandlerImpl.h"
29 #include "Formatting.h"
30 #include "FeedEngineImpl.h"
31 #include "Version.h"
32 
33 
34 namespace OnixS {
35 namespace Eurex {
36 namespace MarketData {
37 namespace Implementation {
38 
39 using namespace HandlerCore::MarketData;
40 using namespace HandlerCore::Common;
41 using namespace Networking;
42 
43 
45  : base("EMDI", nullptr, settings.maxPacketSize, settings.licenseDirectory)
46  , settings_(settings)
47  , logFacilityForSnapshots_("Snapshot", this, Logging::LOG_LEVEL_DEBUG)
48  , logFacilityForIncrementals_("Incremental", this, Logging::LOG_LEVEL_DEBUG)
49  , incrementalMessagesPool_(settings.messagePoolSize)
50  , snapShotContentAllocator_(settings.messagePoolSize)
51 {
52  initLogger(settings);
53 
54  forceLog(ONIXS_LOG_INFO[this] << "OnixS Eurex T7 Emdi Handler, version " << OnixS::Eurex::MarketData::Implementation::version() << ", " << os());
55  forceLog(ONIXS_LOG_INFO[this] << settings.toString());
56 
57  initListenerHolder();
58 
59  try
60  {
61  checkXmlFile();
62 
63  const std::string xml(removeInvalidTemplateNodes(System::File::readTextFile(settings.fastTemplatesFile), "EMDPacketHeader"));
64 
65  incrementalFixDecoder_.reset(new FixDecoder(xml, false, InputDataTraits::CompleteMessagesOnly));
66  snapshotFixDecoder_.reset(new FixDecoder(xml, false, InputDataTraits::CompleteMessagesOnly));
67 
68  EmdiDataRepositoryConfig repositoryConfig;
69  repositoryConfig.name = "DataRepository";
70  repositoryConfig.parent = this;
71  repositoryConfig.logger = logger_.get();
72  repositoryConfig.listenerHolder = listenerHolder_.get();
73  repositoryConfig.settings = settings;
74  dataRepository_.reset(new EmdiDataRepository(repositoryConfig));
75 
76  dataRepository_->subscribeOnSnapshotStartRequest(boost::bind(&EmdiHandlerImpl::onSnapshotStartRequest, this));
77  dataRepository_->subscribeOnSnapshotStopRequest(boost::bind(&EmdiHandlerImpl::onSnapshotStopRequest, this));
78 
79  incrementalPacketProcessor_.reset(
81  settings.lostPacketWaitTime,
82  settings.heartbeatInterval,
84  lock(),
85  boost::bind(&EmdiHandlerImpl::filterPacket, this, _1))
86  );
87  incrementalPacketProcessor_->subscribeOnPacketData(boost::bind(&EmdiHandlerImpl::onIncrementalPacketData, this, _1, _2, _3, _4));
88  incrementalPacketProcessor_->subscribeOnGap(boost::bind(&EmdiHandlerImpl::onIncrementalPacketGap, this));
89  incrementalPacketProcessor_->subscribeOnSenderCompIdChange(boost::bind(&EmdiHandlerImpl::onIncrementalSenderCompIdChange, this, _1, _2, _3, _4));
90  incrementalPacketProcessor_->subscribeOnInactivity(boost::bind(&EmdiHandlerImpl::onIncrementalInactivity, this));
91  incrementalPacketProcessor_->subscribeOnFeedInactivity(boost::bind(&EmdiHandlerImpl::onIncrementalFeedInactivity, this, _1));
92 
93  snapshotPacketProcessor_.reset(
95  settings.lostPacketWaitTime,
96  settings.heartbeatInterval,
98  lock(),
99  boost::bind(&EmdiHandlerImpl::filterPacket, this, _1))
100  );
101  snapshotPacketProcessor_->subscribeOnPacketData(boost::bind(&EmdiHandlerImpl::onSnapshotPacketData, this, _1, _2, _3, _4));
102  snapshotPacketProcessor_->subscribeOnGap(boost::bind(&EmdiHandlerImpl::onSnapshotPacketGap, this));
103  snapshotPacketProcessor_->subscribeOnSenderCompIdChange(boost::bind(&EmdiHandlerImpl::onSnapshotSenderCompIdChange, this, _1, _2, _3, _4));
104  snapshotPacketProcessor_->subscribeOnInactivity(boost::bind(&EmdiHandlerImpl::onSnapshotInactivity, this));
105  snapshotPacketProcessor_->subscribeOnFeedInactivity(boost::bind(&EmdiHandlerImpl::onSnapshotFeedInactivity, this, _1));
106  }
107  catch (const std::exception& ex)
108  {
109  reportError(TextBuilder() << "Exception in EmdiHandlerImpl constructor: " << ex.what());
110  throw;
111  }
112  catch (...)
113  {
114  reportError("Unknown exception in EmdiHandlerImpl constructor");
115  throw;
116  }
117 }
118 
120 {
121  incrementalMessagesPool_.reset();
122  snapShotContentAllocator_.reset();
123 
124  stop();
125 }
126 
127 void EmdiHandlerImpl::checkXmlFile()
128 {
129  std::string detectedXmlVersion;
130 
131  if (!checkXmlVersion(settings_.fastTemplatesFile, detectedXmlVersion, EmdiXmlMajorVersion, EmdiXmlMinorVersion))
132  {
133  reportWarning("Can not extract template file version.");
134  }
135  else
136  forceLog(ONIXS_LOG_INFO[this] << "Detected template file version: " << detectedXmlVersion);
137 }
138 
140 {
141  setFeedEngine(FeedEngineImpl::getFeedEngine(feedEngine));
142 
143  forceLog(ONIXS_LOG_INFO[this] << "Bound FeedEngine");
144  forceLog(ONIXS_LOG_INFO[this] << feedEngine.settings().toString());
145  forceLog(ONIXS_LOG_INFO[this] << feedEngine.info());
146 }
147 
149 {
150  dataRepository_->preBuildProductDataRepositories();
151 
152  base::start();
153 }
154 
156 {
157  dataRepository_->preBuildProductDataRepositories();
158 
159  incrementalPacketProcessor_->activateReplayMode();
160  snapshotPacketProcessor_->activateReplayMode();
161 
162  LogPlayerLinks links;
163  links.push_back(LogPlayerLink("Incremental", incrementalPacketProcessor_.get()));
164  links.push_back(LogPlayerLink("Snapshot", snapshotPacketProcessor_.get()));
165 
166  BaseHandlerImp::start(options, links, "EMDI");
167 }
168 
170 {
171  base::stop();
172 }
173 
174 void EmdiHandlerImpl::onStarting()
175 {
176  incrementalPacketProcessor_->reset();
177  snapshotPacketProcessor_->reset();
178 
179  FE::MulticastFeedLayout::Enum layout = FE::MulticastFeedLayout::Both;
180  if (!settings_.useFeedA)
181  layout = FE::MulticastFeedLayout::BOnly;
182  else if (!settings_.useFeedB)
183  layout = FE::MulticastFeedLayout::AOnly;
184 
185  FE::NifsByFeedRole nifs;
186  nifs.A(settings_.networkInterfaceA.empty() ? settings_.networkInterface : settings_.networkInterfaceA);
187  nifs.B(settings_.networkInterfaceB.empty() ? settings_.networkInterface : settings_.networkInterfaceB);
188 
189 
190  {
191  FE::ConnectionInfo connectionA;
192  FE::ConnectionInfo connectionB;
193 
194  connectionA.role(FE::NetFeedRole::A);
195  connectionB.role(FE::NetFeedRole::B);
196 
197  ///
198  connectionA.type(FE::NetFeedType::Incremental);
199  connectionB.type(FE::NetFeedType::Incremental);
200 
201  connectionA.id("IncrementalA");
202  connectionB.id("IncrementalB");
203 
204  connectionA.ip(settings_.interfaceDescriptor.incrementalFeed.serviceA.address);
205  connectionB.ip(settings_.interfaceDescriptor.incrementalFeed.serviceB.address);
206 
207  connectionA.port(settings_.interfaceDescriptor.incrementalFeed.serviceA.port);
208  connectionB.port(settings_.interfaceDescriptor.incrementalFeed.serviceB.port);
209 
210  FE::ConnectionInfoList incrementalConnections;
211  incrementalConnections.push_back(connectionA);
212  incrementalConnections.push_back(connectionB);
213 
214  BOOST_ASSERT(incrementalPacketProcessor_.get());
215 
216  incrementalFeeds_.reset(
217  constructCluster(
218  FE::NetFeedType::Incremental,
219  layout,
220  incrementalConnections,
221  nifs,
222  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
223  *incrementalPacketProcessor_.get()
224  )
225  );
226  }
227 
228  {
229  FE::ConnectionInfo connectionA;
230  FE::ConnectionInfo connectionB;
231 
232  connectionA.role(FE::NetFeedRole::A);
233  connectionB.role(FE::NetFeedRole::B);
234 
235  connectionA.type(FE::NetFeedType::Snapshot);
236  connectionB.type(FE::NetFeedType::Snapshot);
237 
238  connectionA.id("SnapshotA");
239  connectionB.id("SnapshotB");
240 
241  connectionA.ip(settings_.interfaceDescriptor.snapshotFeed.serviceA.address);
242  connectionB.ip(settings_.interfaceDescriptor.snapshotFeed.serviceB.address);
243 
244  connectionA.port(settings_.interfaceDescriptor.snapshotFeed.serviceA.port);
245  connectionB.port(settings_.interfaceDescriptor.snapshotFeed.serviceB.port);
246 
247  FE::ConnectionInfoList snapshotConnections;
248  snapshotConnections.push_back(connectionA);
249  snapshotConnections.push_back(connectionB);
250 
251  BOOST_ASSERT(snapshotPacketProcessor_.get());
252 
253  snapshotFeeds_.reset(
254  constructCluster(
255  FE::NetFeedType::Snapshot,
256  layout,
257  snapshotConnections,
258  nifs,
259  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
260  *snapshotPacketProcessor_.get()
261  )
262  );
263  }
264 
265  incrementalFeeds_->connect(FE::CanThrowNow());
266 }
267 
268 void EmdiHandlerImpl::onStopping()
269 {
270  if(incrementalFeeds_)
271  incrementalFeeds_->disconnect();
272 
273  if(snapshotFeeds_)
274  snapshotFeeds_->disconnect();
275 }
276 
277 void EmdiHandlerImpl::onSnapshotStartRequest()
278 {
279  if (!isBacktrestingMode())
280  {
281  log(ONIXS_LOG_DEBUG[this] << "Reseting snapshot packet processor");
282  snapshotPacketProcessor_->reset();
283 
284  snapshotFeeds_->connectSwallowException();
285  }
286  else
287  log(ONIXS_LOG_DEBUG[this] << "Starting snapshot feed");
288 }
289 
290 void EmdiHandlerImpl::onSnapshotStopRequest()
291 {
292  if (!isBacktrestingMode())
293  {
294  snapshotFeeds_->disconnect();
295  }
296 
297  log(ONIXS_LOG_DEBUG[this] << "Reseting snapshot packet processor");
298  snapshotPacketProcessor_->reset();
299 
300  snapShotContentAllocator_.reset();
301 }
302 
303 void EmdiHandlerImpl::onIncrementalPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
304 {
305  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Packet " << dataSource.packetSeqNum);
306 
307  if (settings_.logSettings& LogSettings::LogPackets)
308  log(ONIXS_LOG_INFO[&logFacilityForIncrementals_] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
309 
310  try
311  {
312  size_t offset = headerSize;
313  size_t chunkLength;
314  TemplateId templateId = 0;
315  bool result;
316 
317  const PacketHeaderForEMDI* const packetHeader = (PacketHeaderForEMDI*) (data);
318  const UInt32 packetHeaderTemplateId = PacketHeaderHelper::getTemplateId(packetHeader);
319 
320  if(packetHeaderTemplateId != PacketHeaderForEMDI::TemplateIdValue)
321  {
322  throw OperationException(
323  BOOST_CURRENT_FUNCTION,
324  format("Wrong packet header id: expected %d, but received %d", PacketHeaderForEMDI::TemplateIdValue,
325  packetHeaderTemplateId).c_str()
326  );
327  }
328 
329  while(offset < length)
330  {
331  CORE::Message* const msg = incrementalMessagesPool_.create();
332 
333  try
334  {
335  result = incrementalFixDecoder_->decode(
336  reinterpret_cast<const uint8*>(data), offset, length - offset, msg, &chunkLength, &templateId);
337  }
338  catch (const std::exception& ex)
339  {
340  reportWarning(
341  TextBuilder()
342  << "Exception while message decoding: "
343  << ex.what()
344  << ", Packet ("
345  << Base64Wrapper(data, length) << ")."
346  );
347 
348  break;
349  }
350  catch (...)
351  {
352  reportWarning(
353  TextBuilder()
354  << "Unknown exception while message decoding"
355  << ", Packet ("
356  << Base64Wrapper(data, length) << ")."
357  );
358 
359  break;
360  }
361 
362  if(chunkLength == 0)
363  break;
364 
365  if(!result)
366  break;
367 
368  offset += chunkLength;
369  ++dataSource.packetMessageSeqNum;
370  dataSource.isLastInPacket = offset >= length;
371 
372  if(templateId == ResetTemplateId)
373  continue;
374 
375  dataRepository_->onIncrementalMessage(dataSource, *msg, templateId);
376  }
377  }
378  catch (const std::exception& ex)
379  {
380  reportWarning(
381  TextBuilder()
382  << "Exception while message processing: "
383  << ex.what()
384  << ", Packet ("
385  << Base64Wrapper(data, length) << ")."
386  );
387  }
388  catch (...)
389  {
390  reportWarning(
391  TextBuilder()
392  << "Unknown exception while message processing"
393  << ", Packet ("
394  << Base64Wrapper(data, length) << ")."
395  );
396  }
397 
398  if (incrementalMessagesPool_.recommendReset())
399  {
400  if (!dataRepository_->hasCashedIncrementals())
401  {
402  incrementalMessagesPool_.reset();
403  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Resetting incremental Messages Pool");
404  }
405  else
406  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Unable to Reset incremental Messages Pool : cashed messages exist");
407  }
408 }
409 
410 void EmdiHandlerImpl::onIncrementalPacketGap()
411 {
412  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Packet gap");
413  dataRepository_->onIncrementalPacketGap();
414 
415  if (!dataRepository_->hasCashedIncrementals())
416  {
417  incrementalMessagesPool_.reset();
418  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Resetting incremental Messages Pool");
419  }
420 }
421 
422 void EmdiHandlerImpl::onIncrementalSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
423 {
424  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum " << lastSeqNum << "->" << newSeqNum);
425  dataRepository_->onIncrementalSenderCompIdChange();
426 
427  if (!dataRepository_->hasCashedIncrementals())
428  {
429  incrementalMessagesPool_.reset();
430  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Resetting incremental Messages Pool");
431  }
432 }
433 
434 void EmdiHandlerImpl::onIncrementalInactivity()
435 {
436  reportWarning(
437  TextBuilder()
438  << "No " << LogFacility::name() << " incremental data"
439  );
440 
441  dataRepository_->noDataOnIncrementalFeeds();
442 }
443 
444 void EmdiHandlerImpl::onIncrementalFeedInactivity(FE::NetFeedRole::Enum id)
445 {
446  reportWarning(
447  TextBuilder()
448  << "No " << LogFacility::name() << " incremental data on " << FE::toStr(id)
449  );
450 }
451 
452 void EmdiHandlerImpl::onSnapshotPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
453 {
454  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "Packet " << dataSource.packetSeqNum);
455 
456  if (settings_.logSettings & LogSettings::LogPackets)
457  log(ONIXS_LOG_INFO[&logFacilityForSnapshots_] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
458 
459  try
460  {
461  const PacketHeaderForEMDI* const packetHeader = reinterpret_cast<const PacketHeaderForEMDI*>(data);
462  const UInt32 packetHeaderTemplateId = PacketHeaderHelper::getTemplateId(packetHeader);
463 
464  if (packetHeaderTemplateId != PacketHeaderForEMDI::TemplateIdValue)
465  {
466  throw OperationException(
467  BOOST_CURRENT_FUNCTION,
468  format("Wrong packet header id: expected %d, but received %d", PacketHeaderForEMDI::TemplateIdValue, packetHeaderTemplateId).c_str()
469  );
470  }
471 
472  size_t offset = headerSize;
473  bool result = false;
474  size_t chunkLength;
475  TemplateId templateId = 0;
476 
477  while (offset < length)
478  {
479  CORE::Message msg(snapShotContentAllocator_);
480 
481  try
482  {
483  result = snapshotFixDecoder_->decode(
484  reinterpret_cast<const uint8*>(data), offset, length - offset, &msg, &chunkLength, &templateId);
485  }
486  catch (const std::exception& ex)
487  {
488  reportWarning(
489  TextBuilder()
490  << "Exception while message decoding: "
491  << ex.what()
492  << ", Packet ("
493  << Base64Wrapper(data, length) << ")."
494  );
495 
496  break;
497  }
498  catch (...)
499  {
500  reportWarning(
501  TextBuilder()
502  << "Unknown exception while message decoding"
503  << ", Packet ("
504  << Base64Wrapper(data, length) << ")."
505  );
506 
507  break;
508  }
509 
510  if (chunkLength == 0)
511  break;
512 
513  if (!result)
514  break;
515 
516  offset += chunkLength;
517  ++dataSource.packetMessageSeqNum;
518  dataSource.isLastInPacket = offset >= length;
519 
520  if (templateId == ResetTemplateId)
521  continue;
522 
523  bool skipRestOfPacket = false;
524 
525  dataRepository_->onSnapshotMessage(dataSource, msg, &skipRestOfPacket);
526  if (skipRestOfPacket)
527  {
528  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "Rest of packet skipped");
529  break;
530  }
531  }
532  }
533  catch (const std::exception& ex)
534  {
535  reportWarning(
536  TextBuilder()
537  << "Exception while message processing: "
538  << ex.what()
539  );
540  }
541  catch (...)
542  {
543  reportWarning(
544  TextBuilder()
545  << "Unknown exception while message processing"
546  );
547  }
548 }
549 
550 void EmdiHandlerImpl::onSnapshotPacketGap()
551 {
552  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "Packet gap detected");
553  snapShotContentAllocator_.reset();
554 }
555 
556 void EmdiHandlerImpl::onSnapshotSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
557 {
558  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum " << lastSeqNum << "->" << newSeqNum);
559  dataRepository_->onSnapshotSenderCompIdChange();
560 }
561 
562 void EmdiHandlerImpl::onSnapshotInactivity()
563 {
564  reportWarning(
565  TextBuilder()
566  << "No " << LogFacility::name() << " snapshot data"
567  );
568 
569  snapShotContentAllocator_.reset();
570 }
571 
572 void EmdiHandlerImpl::onSnapshotFeedInactivity(FE::NetFeedRole::Enum id)
573 {
574  reportWarning(
575  TextBuilder()
576  << "No " << LogFacility::name() << " snapshot data on " << FE::toStr(id)
577  );
578 }
579 
581 {
582  dataRepository_->setMarketSegmentId2Depth(map);
583 
584  if (map.size() > 0)
585  {
586  TextBuilder tb;
587  tb << "Market SegmentId to Depth Map: ";
588  for (MarketSegmentId2Depth::const_iterator it = map.begin(), end = map.end(); it != end; ++it)
589  tb << '\n' << it->first << " => " << it->second;
590  log(ONIXS_LOG_INFO[this] << tb.toString());
591  }
592 }
593 
595 {
596  partitionIdFilters_.clear();
597  std::copy(filters.begin(), filters.end(), std::inserter(partitionIdFilters_, partitionIdFilters_.end()));
598 
599  if (filters.size() > 0)
600  {
601  TextBuilder tb;
602  tb << "Partition Id Filters: ";
603  for (PartitionIdFilters::const_iterator it = filters.begin(), end = filters.end(); it != end; ++it)
604  tb << '\n' << *it;
605  log(ONIXS_LOG_INFO[this] << tb.toString());
606  }
607 }
608 
610 {
611  partitionIdFilters_.clear();
612  log(ONIXS_LOG_INFO[this] << "removeAllPartitionIdFilters");
613 }
614 
616 {
617  BOOST_ASSERT(dataRepository_.get() != nullptr);
618  //BOOST_ASSERT(!filters.empty());
619 
620  dataRepository_->setMarketSegmentIdFilters(filters);
621 
622  if (filters.size() > 0)
623  {
624  TextBuilder tb;
625  tb << "Market Segment Id Filters: ";
626  for (MarketSegmentIdFilters::const_iterator it = filters.begin(), end = filters.end(); it != end; ++it)
627  tb << '\n' << *it;
628  log(ONIXS_LOG_INFO[this] << tb.toString());
629  }
630 }
631 
633 {
634  BOOST_ASSERT(dataRepository_.get() != nullptr);
635  dataRepository_->clearMarketSegmentIdFilters();
636  log(ONIXS_LOG_INFO[this] << "removeAllMarketSegmentIdFilters");
637 }
638 
640 {
641  dataRepository_->setSecurityIdFilters(filters);
642 
643  if (filters.size() > 0)
644  {
645  TextBuilder tb;
646  tb << "Security Id Filters: ";
647  for (SecurityIdFilters::const_iterator it = filters.begin(), end = filters.end(); it != end; ++it)
648  tb << '\n' << *it;
649  log(ONIXS_LOG_INFO[this] << tb.toString());
650  }
651 }
652 
654 {
655  dataRepository_->removeAllSecurityIdFilters();
656  log(ONIXS_LOG_INFO[this] << "removeAllSecurityIdFilters");
657 }
658 
659 bool EmdiHandlerImpl::filterPacket(PartitionId partitionId)
660 {
661  if (partitionIdFilters_.empty())
662  return true;
663 
664  return partitionIdFilters_.end() != partitionIdFilters_.find(partitionId);
665 }
666 
667 void EmdiHandlerImpl::initLogger(const HandlerSettings &settings)
668 {
669  logger_.reset(new HandlerLogger(convertLogSettings(settings)));
670 
671  setLogger(&logger_->getLogger());
672 }
673 
674 void EmdiHandlerImpl::initListenerHolder()
675 {
676  try
677  {
678  listenerHolder_.reset(new ListenerHolder(this, logger_.get()));
679  }
680  catch (const std::exception& ex)
681  {
682  log(ONIXS_LOG_ERROR[this] << "Exception while constructing ListenerHolder: " << ex.what());
683  throw;
684  }
685  catch (...)
686  {
687  log(ONIXS_LOG_ERROR[this] << "Unknown exception while constructing ListenerHolder");
688  throw;
689  }
690 }
691 
692 void EmdiHandlerImpl::onStateChanged(HandlerCore::MarketData::MulticastFeedHandler::State::Enum newState)
693 {
694  listenerHolder_->invokeStateChanged(static_cast<HandlerState::Enum>(newState));
695 }
696 
697 void EmdiHandlerImpl::invokeError(const std::string& description)
698 {
700 }
701 
702 void EmdiHandlerImpl::invokeWarning(const std::string& description)
703 {
704  listenerHolder().invokeWarning(description);
705 }
706 
707 }
708 }
709 }
710 }
711 
unsigned int SequenceNumber
Alias for sequence numbers.
std::string toString() const
Returns the string representation.
void setMarketSegmentIdFilters(const MarketSegmentIdFilters &filters)
Manages processing machinery for market data received from feeds.
Definition: FeedEngine.h:287
MarketDepthTraits::MarketSegmentId2Depth MarketSegmentId2Depth
void setSecurityIdFilters(const SecurityIdFilters &filters)
UInt32 PartitionId
Alias for Partition ID type.
Definition: Defines.h:48
InterfaceDescriptor interfaceDescriptor
Sets data interface technical configuration.
ServiceDescriptor serviceA
Service A.
std::string info() const
Feed engine info.
Definition: FeedEngine.cpp:64
FilteringTraits::PartitionIdFilters PartitionIdFilters
ServiceDescriptor serviceB
Service B.
static UInt32 getTemplateId(TPacketHeader *packetHeader)
Definition: PacketHeader.h:117
Handler base configuration settings.
Util::TextBuilder TextBuilder
Definition: Formatting.h:43
void invokeError(ErrorCode::Enum code, const std::string &description) ONIXS_NOEXCEPT
EmdiHandlerImpl(const EmdiHandlerSettings &settings)
Initialize new instance.
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
FilteringTraits::MarketSegmentIdFilters MarketSegmentIdFilters
Defines params which affect logs replay.
Definition: Replay.h:77
SequenceNumber packetSeqNum
Packet sequence number.
Definition: Defines.h:66
ListenerHolder & listenerHolder()
Returns commons services as shared object.
static HandlerCore::MarketData::FE::MulticastFeedEngine * getFeedEngine(FeedEngine &feedEngine)
EMDI handler configuration settings.
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
const FeedEngineSettings & settings() const
Settings used define behavior of given instance.
Definition: FeedEngine.cpp:59
Log binary data of received packets, applied only for Info log level and below.
Definition: LogSettings.h:75
FilteringTraits::SecurityIdFilters SecurityIdFilters
bool checkXmlVersion(const std::string &filename, std::string &detectedVersion, UInt32 expectedXmlMajorVersion, UInt32 expectedEmlMinorVersion)
Definition: Utils.cpp:37
void setPartitionIdFilters(const PartitionIdFilters &filters)
bool isLastInPacket
Indicates whether a message is last in the packet.
Definition: Defines.h:72
Identifiers errors of generic nature.
Definition: ErrorListener.h:38
std::string toString() const
Returns the string representation.
SequenceNumber packetMessageSeqNum
Packet message number.
Definition: Defines.h:69
OnixS::FIX::Core::FAST::TemplateId TemplateId
void invokeError(const std::string &description) override
Invoke helpers.
void setMarketSegmentId2Depth(const MarketSegmentId2Depth &map)
void invokeWarning(const std::string &description) ONIXS_NOEXCEPT
std::string removeInvalidTemplateNodes(std::string xml, const std::string &node)
Definition: Utils.cpp:121
void invokeWarning(const std::string &description) override