OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.2.1
API documentation
EmdiHandlerImpl.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 #include <system/File.h>
20 #include <util/String.h>
21 
22 #include <OnixS/Core/Bootstrap/Platform.h>
23 
25 
26 #include <OnixS/HandlerCore/FeedEngine/NetFeedAttributes.h>
27 
28 #include "EmdiHandlerImpl.h"
29 #include "Formatting.h"
30 #include "FeedEngineImpl.h"
31 #include "Version.h"
32 
33 
34 namespace OnixS {
35 namespace Eurex {
36 namespace MarketData {
37 namespace Implementation {
38 
39 using namespace HandlerCore::MarketData;
40 using namespace HandlerCore::Common;
41 using namespace Networking;
42 
43 
45  : base("EMDI", nullptr, settings.maxPacketSize, settings.licenseDirectory)
46  , settings_(settings)
47  , logFacilityForSnapshots_("Snapshot", this, Logging::LOG_LEVEL_DEBUG)
48  , logFacilityForIncrementals_("Incremental", this, Logging::LOG_LEVEL_DEBUG)
49  , incrementalMessagesPool_(settings.messagePoolSize)
50  , snapShotContentAllocator_(settings.messagePoolSize)
51 {
52  initLogger(settings);
53 
54  forceLog(ONIXS_LOG_INFO[this] << "OnixS Eurex T7 Emdi Handler, version " << OnixS::Eurex::MarketData::Implementation::version() << ", " << os());
55  forceLog(ONIXS_LOG_INFO[this] << settings.toString());
56 
57  try
58  {
59  listenerHolder_.reset(new ListenerHolder(this, logger_.get()));
60 
61  checkXmlFile();
62 
63  const std::string xml(removeInvalidTemplateNodes(System::File::readTextFile(settings.fastTemplatesFile), "EMDPacketHeader"));
64 
65  incrementalFixDecoder_.reset(new FixDecoder(xml, false, InputDataTraits::CompleteMessagesOnly));
66  snapshotFixDecoder_.reset(new FixDecoder(xml, false, InputDataTraits::CompleteMessagesOnly));
67 
68  EmdiDataRepositoryConfig repositoryConfig;
69  repositoryConfig.name = "DataRepository";
70  repositoryConfig.parent = this;
71  repositoryConfig.logger = logger_.get();
72  repositoryConfig.listenerHolder = listenerHolder_.get();
73  repositoryConfig.settings = settings;
74  dataRepository_.reset(new EmdiDataRepository(repositoryConfig));
75 
76  dataRepository_->subscribeOnSnapshotStartRequest(boost::bind(&EmdiHandlerImpl::onSnapshotStartRequest, this));
77  dataRepository_->subscribeOnSnapshotStopRequest(boost::bind(&EmdiHandlerImpl::onSnapshotStopRequest, this));
78 
79  incrementalPacketProcessor_.reset(
81  settings.lostPacketWaitTime,
82  settings.heartbeatInterval,
84  lock(),
85  boost::bind(&EmdiHandlerImpl::filterPacket, this, _1))
86  );
87  incrementalPacketProcessor_->subscribeOnPacketData(boost::bind(&EmdiHandlerImpl::onIncrementalPacketData, this, _1, _2, _3, _4));
88  incrementalPacketProcessor_->subscribeOnGap(boost::bind(&EmdiHandlerImpl::onIncrementalPacketGap, this));
89  incrementalPacketProcessor_->subscribeOnSenderCompIdChange(boost::bind(&EmdiHandlerImpl::onIncrementalSenderCompIdChange, this, _1, _2, _3, _4));
90  incrementalPacketProcessor_->subscribeOnInactivity(boost::bind(&EmdiHandlerImpl::onIncrementalInactivity, this));
91  incrementalPacketProcessor_->subscribeOnFeedInactivity(boost::bind(&EmdiHandlerImpl::onIncrementalFeedInactivity, this, _1));
92 
93  snapshotPacketProcessor_.reset(
95  settings.lostPacketWaitTime,
96  settings.heartbeatInterval,
98  lock(),
99  boost::bind(&EmdiHandlerImpl::filterPacket, this, _1))
100  );
101  snapshotPacketProcessor_->subscribeOnPacketData(boost::bind(&EmdiHandlerImpl::onSnapshotPacketData, this, _1, _2, _3, _4));
102  snapshotPacketProcessor_->subscribeOnGap(boost::bind(&EmdiHandlerImpl::onSnapshotPacketGap, this));
103  snapshotPacketProcessor_->subscribeOnSenderCompIdChange(boost::bind(&EmdiHandlerImpl::onSnapshotSenderCompIdChange, this, _1, _2, _3, _4));
104  snapshotPacketProcessor_->subscribeOnInactivity(boost::bind(&EmdiHandlerImpl::onSnapshotInactivity, this));
105  snapshotPacketProcessor_->subscribeOnFeedInactivity(boost::bind(&EmdiHandlerImpl::onSnapshotFeedInactivity, this, _1));
106  }
107  catch (const std::exception& ex)
108  {
109  log(ONIXS_LOG_ERROR[this] << "Exception in EmdiHandlerImpl constructor: " << ex.what());
110  throw;
111  }
112  catch (...)
113  {
114  log(ONIXS_LOG_WARN[this] << "Unknown exception in EmdiHandlerImpl constructor");
115  throw;
116  }
117 }
118 
120 {
121  incrementalMessagesPool_.reset();
122  snapShotContentAllocator_.reset();
123 
124  stop();
125 }
126 
127 void EmdiHandlerImpl::checkXmlFile()
128 {
129  std::string detectedXmlVersion;
130 
131  if (!checkXmlVersion(settings_.fastTemplatesFile, detectedXmlVersion, EmdiXmlMajorVersion, EmdiXmlMinorVersion))
132  {
133  const std::string what = "Can not extract template file version.";
134 
135  listenerHolder_->invokeWarning(what);
136  forceLog(ONIXS_LOG_WARN [this] << what);
137  }
138  else
139  forceLog(ONIXS_LOG_INFO[this] << "Detected template file version: " << detectedXmlVersion);
140 }
141 
143 {
144  setFeedEngine(FeedEngineImpl::getFeedEngine(feedEngine));
145 
146  forceLog(ONIXS_LOG_INFO[this] << "Bound FeedEngine");
147  forceLog(ONIXS_LOG_INFO[this] << feedEngine.settings().toString());
148  forceLog(ONIXS_LOG_INFO[this] << feedEngine.info());
149 }
150 
152 {
153  dataRepository_->preBuildProductDataRepositories();
154 
155  base::start();
156 }
157 
159 {
160  dataRepository_->preBuildProductDataRepositories();
161 
162  LogPlayerLinks links;
163  links.push_back(LogPlayerLink("Incremental", incrementalPacketProcessor_.get()));
164  links.push_back(LogPlayerLink("Snapshot", snapshotPacketProcessor_.get()));
165 
166  BaseHandlerImp::start(options, links, "EMDI");
167 }
168 
170 {
171  base::stop();
172 }
173 
174 void EmdiHandlerImpl::onStarted()
175 {
176  incrementalPacketProcessor_->reset();
177  snapshotPacketProcessor_->reset();
178 
179  FE::MulticastFeedLayout::Enum layout = FE::MulticastFeedLayout::Both;
180  if (!settings_.useFeedA)
181  layout = FE::MulticastFeedLayout::BOnly;
182  else if (!settings_.useFeedB)
183  layout = FE::MulticastFeedLayout::AOnly;
184 
185  FE::NifsByFeedRole nifs;
186  nifs.A(settings_.networkInterfaceA.empty() ? settings_.networkInterface : settings_.networkInterfaceA);
187  nifs.B(settings_.networkInterfaceB.empty() ? settings_.networkInterface : settings_.networkInterfaceB);
188 
189 
190  {
191  FE::ConnectionInfo connectionA;
192  FE::ConnectionInfo connectionB;
193 
194  connectionA.role(FE::NetFeedRole::A);
195  connectionB.role(FE::NetFeedRole::B);
196 
197  ///
198  connectionA.type(FE::NetFeedType::Incremental);
199  connectionB.type(FE::NetFeedType::Incremental);
200 
201  connectionA.id("IncrementalA");
202  connectionB.id("IncrementalB");
203 
204  connectionA.ip(settings_.interfaceDescriptor.incrementalFeed.serviceA.address);
205  connectionB.ip(settings_.interfaceDescriptor.incrementalFeed.serviceB.address);
206 
207  connectionA.port(settings_.interfaceDescriptor.incrementalFeed.serviceA.port);
208  connectionB.port(settings_.interfaceDescriptor.incrementalFeed.serviceB.port);
209 
210  FE::ConnectionInfoList incrementalConnections;
211  incrementalConnections.push_back(connectionA);
212  incrementalConnections.push_back(connectionB);
213 
214  BOOST_ASSERT(incrementalPacketProcessor_.get());
215 
216  incrementalFeeds_.reset(
217  constructCluster(
218  FE::NetFeedType::Incremental,
219  layout,
220  incrementalConnections,
221  nifs,
222  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
223  *incrementalPacketProcessor_.get()
224  )
225  );
226  }
227 
228  {
229  FE::ConnectionInfo connectionA;
230  FE::ConnectionInfo connectionB;
231 
232  connectionA.role(FE::NetFeedRole::A);
233  connectionB.role(FE::NetFeedRole::B);
234 
235  connectionA.type(FE::NetFeedType::Snapshot);
236  connectionB.type(FE::NetFeedType::Snapshot);
237 
238  connectionA.id("SnapshotA");
239  connectionB.id("SnapshotB");
240 
241  connectionA.ip(settings_.interfaceDescriptor.snapshotFeed.serviceA.address);
242  connectionB.ip(settings_.interfaceDescriptor.snapshotFeed.serviceB.address);
243 
244  connectionA.port(settings_.interfaceDescriptor.snapshotFeed.serviceA.port);
245  connectionB.port(settings_.interfaceDescriptor.snapshotFeed.serviceB.port);
246 
247  FE::ConnectionInfoList snapshotConnections;
248  snapshotConnections.push_back(connectionA);
249  snapshotConnections.push_back(connectionB);
250 
251  BOOST_ASSERT(snapshotPacketProcessor_.get());
252 
253  snapshotFeeds_.reset(
254  constructCluster(
255  FE::NetFeedType::Snapshot,
256  layout,
257  snapshotConnections,
258  nifs,
259  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
260  *snapshotPacketProcessor_.get()
261  )
262  );
263  }
264 
265 
266  incrementalFeeds_->connect();
267 }
268 
269 void EmdiHandlerImpl::onStopped()
270 {
271  incrementalFeeds_.reset();
272  snapshotFeeds_.reset();
273 }
274 
275 void EmdiHandlerImpl::onPrepareFeeds()
276 {
277 }
278 
279 void EmdiHandlerImpl::onSnapshotStartRequest()
280 {
281  if (!isBacktrestingMode())
282  {
283  log(ONIXS_LOG_DEBUG[this] << "Reseting snapshot packet processor");
284  snapshotPacketProcessor_->reset();
285 
286  snapshotFeeds_->connect();
287  }
288  else
289  log(ONIXS_LOG_DEBUG[this] << "Starting snapshot feed");
290 }
291 
292 void EmdiHandlerImpl::onSnapshotStopRequest()
293 {
294  if (!isBacktrestingMode())
295  {
296  snapshotFeeds_->disconnect();
297  }
298 
299  log(ONIXS_LOG_DEBUG[this] << "Reseting snapshot packet processor");
300  snapshotPacketProcessor_->reset();
301 
302  snapShotContentAllocator_.reset();
303 }
304 
305 void EmdiHandlerImpl::onIncrementalPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
306 {
307  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Packet " << dataSource.packetSeqNum);
308 
309  if (settings_.logSettings& LogSettings::LogPackets)
310  log(ONIXS_LOG_INFO[&logFacilityForIncrementals_] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
311 
312  size_t offset = headerSize;
313  size_t chunkLength;
314  TemplateId templateId = 0;
315  bool result;
316 
317  const PacketHeaderForEMDI* const packetHeader = (PacketHeaderForEMDI*)(data);
318  const UInt32 packetHeaderTemplateId = PacketHeaderHelper::getTemplateId(packetHeader);
319 
320  if (packetHeaderTemplateId != PacketHeaderForEMDI::TemplateIdValue)
321  {
322  throw OperationException(
323  BOOST_CURRENT_FUNCTION,
324  format("Wrong packet header id: expected %d, but received %d", PacketHeaderForEMDI::TemplateIdValue, packetHeaderTemplateId).c_str()
325  );
326  }
327 
328 
329  while (offset < length)
330  {
331  CORE::Message* const msg = incrementalMessagesPool_.create();
332  result = incrementalFixDecoder_->decode(reinterpret_cast<const uint8*>(data), offset, length - offset, msg, &chunkLength, &templateId);
333 
334  if (chunkLength == 0)
335  break;
336 
337  if (!result)
338  break;
339 
340  offset += chunkLength;
341  ++dataSource.packetMessageSeqNum;
342  dataSource.isLastInPacket = offset >= length;
343 
344  if (templateId == ResetTemplateId)
345  continue;
346 
347  dataRepository_->onIncrementalMessage(dataSource, *msg, templateId);
348  }
349 
350  if (incrementalMessagesPool_.recommendReset())
351  {
352  if (!dataRepository_->hasCashedIncrementals())
353  {
354  incrementalMessagesPool_.reset();
355  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Resetting incremental Messages Pool");
356  }
357  else
358  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Unable to Reset incremental Messages Pool : cashed messages exist");
359  }
360 }
361 
362 void EmdiHandlerImpl::onIncrementalPacketGap()
363 {
364  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Packet gap");
365  dataRepository_->onIncrementalPacketGap();
366 
367  if (!dataRepository_->hasCashedIncrementals())
368  {
369  incrementalMessagesPool_.reset();
370  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Resetting incremental Messages Pool");
371  }
372 }
373 
374 void EmdiHandlerImpl::onIncrementalSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
375 {
376  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum " << lastSeqNum << "->" << newSeqNum);
377  dataRepository_->onIncrementalSenderCompIdChange();
378 
379  if (!dataRepository_->hasCashedIncrementals())
380  {
381  incrementalMessagesPool_.reset();
382  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Resetting incremental Messages Pool");
383  }
384 }
385 
386 void EmdiHandlerImpl::onIncrementalInactivity()
387 {
388  TextBuilder tb;
389  tb << "No " << LogFacility::name() << " incremental data";
390  listenerHolder_->invokeWarning(tb.toString());
391  dataRepository_->noDataOnIncrementalFeeds();
392 }
393 
394 void EmdiHandlerImpl::onIncrementalFeedInactivity(FE::NetFeedRole::Enum id)
395 {
396  TextBuilder tb;
397  tb << "No " << LogFacility::name() << " incremental data on " << FE::toStr(id);
398  listenerHolder_->invokeWarning(tb.toString());
399 }
400 
401 void EmdiHandlerImpl::onSnapshotPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
402 {
403  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "Packet " << dataSource.packetSeqNum);
404 
405  if (settings_.logSettings & LogSettings::LogPackets)
406  log(ONIXS_LOG_INFO[&logFacilityForSnapshots_] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
407 
408  try
409  {
410  const PacketHeaderForEMDI* const packetHeader = reinterpret_cast<const PacketHeaderForEMDI*>(data);
411  const UInt32 packetHeaderTemplateId = PacketHeaderHelper::getTemplateId(packetHeader);
412 
413  if (packetHeaderTemplateId != PacketHeaderForEMDI::TemplateIdValue)
414  {
415  throw OperationException(
416  BOOST_CURRENT_FUNCTION,
417  format("Wrong packet header id: expected %d, but received %d", PacketHeaderForEMDI::TemplateIdValue, packetHeaderTemplateId).c_str()
418  );
419  }
420 
421  size_t offset = headerSize;
422  bool result = false;
423  size_t chunkLength;
424  TemplateId templateId = 0;
425 
426  while (offset < length)
427  {
428  CORE::Message msg(snapShotContentAllocator_);
429  result = snapshotFixDecoder_->decode(reinterpret_cast<const uint8*>(data), offset, length - offset, &msg, &chunkLength, &templateId);
430 
431  if (chunkLength == 0)
432  break;
433 
434  if (!result)
435  break;
436 
437  offset += chunkLength;
438  ++dataSource.packetMessageSeqNum;
439  dataSource.isLastInPacket = offset >= length;
440 
441  if (templateId == ResetTemplateId)
442  continue;
443 
444  bool skipRestOfPacket = false;
445 
446  dataRepository_->onSnapshotMessage(dataSource, msg, &skipRestOfPacket);
447  if (skipRestOfPacket)
448  {
449  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "Rest of packet skipped");
450  break;
451  }
452  }
453  }
454  catch (const std::exception& ex)
455  {
456  log(ONIXS_LOG_WARN[&logFacilityForSnapshots_] << "Exception while message decoding: " << ex.what());
457  }
458  catch (...)
459  {
460  log(ONIXS_LOG_WARN[&logFacilityForSnapshots_] << "Unknown exception while message decoding");
461  }
462 }
463 
464 void EmdiHandlerImpl::onSnapshotPacketGap()
465 {
466  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "Packet gap detected");
467  snapShotContentAllocator_.reset();
468 }
469 
470 void EmdiHandlerImpl::onSnapshotSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
471 {
472  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum " << lastSeqNum << "->" << newSeqNum);
473  dataRepository_->onSnapshotSenderCompIdChange();
474 }
475 
476 void EmdiHandlerImpl::onSnapshotInactivity()
477 {
478  TextBuilder tb;
479  tb << "No " << LogFacility::name() << " snapshot data";
480  listenerHolder_->invokeWarning(tb.toString());
481  //dataRepository_->noDataOnSnapshotFeeds();
482  snapShotContentAllocator_.reset();
483 }
484 
485 void EmdiHandlerImpl::onSnapshotFeedInactivity(FE::NetFeedRole::Enum id)
486 {
487  TextBuilder tb;
488  tb << "No " << LogFacility::name() << " snapshot data on " << FE::toStr(id);
489  listenerHolder_->invokeWarning(tb.toString());
490 }
491 
493 {
494  dataRepository_->setMarketSegmentId2Depth(map);
495 
496  if (map.size() > 0)
497  {
498  TextBuilder tb;
499  tb << "Market SegmentId to Depth Map: ";
500  for (MarketSegmentId2Depth::const_iterator it = map.begin(), end = map.end(); it != end; ++it)
501  tb << '\n' << it->first << " => " << it->second;
502  log(ONIXS_LOG_INFO[this] << tb.toString());
503  }
504 }
505 
507 {
508  partitionIdFilters_.clear();
509  std::copy(filters.begin(), filters.end(), std::inserter(partitionIdFilters_, partitionIdFilters_.end()));
510 
511  if (filters.size() > 0)
512  {
513  TextBuilder tb;
514  tb << "Partition Id Filters: ";
515  for (PartitionIdFilters::const_iterator it = filters.begin(), end = filters.end(); it != end; ++it)
516  tb << '\n' << *it;
517  log(ONIXS_LOG_INFO[this] << tb.toString());
518  }
519 }
520 
522 {
523  partitionIdFilters_.clear();
524  log(ONIXS_LOG_INFO[this] << "removeAllPartitionIdFilters");
525 }
526 
528 {
529  BOOST_ASSERT(dataRepository_.get() != nullptr);
530  BOOST_ASSERT(!filters.empty());
531 
532  dataRepository_->setMarketSegmentIdFilters(filters);
533 
534  if (filters.size() > 0)
535  {
536  TextBuilder tb;
537  tb << "Market Segment Id Filters: ";
538  for (MarketSegmentIdFilters::const_iterator it = filters.begin(), end = filters.end(); it != end; ++it)
539  tb << '\n' << *it;
540  log(ONIXS_LOG_INFO[this] << tb.toString());
541  }
542 }
543 
545 {
546  BOOST_ASSERT(dataRepository_.get() != nullptr);
547  dataRepository_->clearMarketSegmentIdFilters();
548  log(ONIXS_LOG_INFO[this] << "removeAllMarketSegmentIdFilters");
549 }
550 
552 {
553  dataRepository_->setSecurityIdFilters(filters);
554 
555  if (filters.size() > 0)
556  {
557  TextBuilder tb;
558  tb << "Security Id Filters: ";
559  for (SecurityIdFilters::const_iterator it = filters.begin(), end = filters.end(); it != end; ++it)
560  tb << '\n' << *it;
561  log(ONIXS_LOG_INFO[this] << tb.toString());
562  }
563 }
564 
566 {
567  dataRepository_->removeAllSecurityIdFilters();
568  log(ONIXS_LOG_INFO[this] << "removeAllSecurityIdFilters");
569 }
570 
571 bool EmdiHandlerImpl::filterPacket(PartitionId partitionId)
572 {
573  if (partitionIdFilters_.empty())
574  return true;
575 
576  return partitionIdFilters_.end() != partitionIdFilters_.find(partitionId);
577 }
578 
579 void EmdiHandlerImpl::initLogger(const HandlerSettings &settings)
580 {
581  logger_.reset(new HandlerLogger(convertLogSettings(settings)));
582 
583  setLogger(&logger_->getLogger());
584 }
585 
586 void EmdiHandlerImpl::onStateChanged(HandlerCore::MarketData::MulticastFeedHandler::State::Enum newState)
587 {
588  listenerHolder_->invokeStateChanged(static_cast<HandlerState::Enum>(newState));
589 }
590 
591 }
592 }
593 }
594 }
595 
unsigned int SequenceNumber
Alias for sequence numbers.
std::string toString() const
Returns the string representation.
void setMarketSegmentIdFilters(const MarketSegmentIdFilters &filters)
Manages processing machinery for market data received from feeds.
Definition: FeedEngine.h:287
MarketDepthTraits::MarketSegmentId2Depth MarketSegmentId2Depth
void setSecurityIdFilters(const SecurityIdFilters &filters)
UInt32 PartitionId
Alias for Partition ID type.
Definition: Defines.h:48
InterfaceDescriptor interfaceDescriptor
Sets data interface technical configuration.
ServiceDescriptor serviceA
Service A.
std::string info() const
Feed engine info.
Definition: FeedEngine.cpp:64
FilteringTraits::PartitionIdFilters PartitionIdFilters
ServiceDescriptor serviceB
Service B.
static UInt32 getTemplateId(TPacketHeader *packetHeader)
Definition: PacketHeader.h:105
Handler base configuration settings.
Util::TextBuilder TextBuilder
Definition: Formatting.h:42
EmdiHandlerImpl(const EmdiHandlerSettings &settings)
Initialize new instance.
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
FilteringTraits::MarketSegmentIdFilters MarketSegmentIdFilters
Defines params which affect logs replay.
Definition: Replay.h:74
SequenceNumber packetSeqNum
Packet sequence number.
Definition: Defines.h:66
static HandlerCore::MarketData::FE::MulticastFeedEngine * getFeedEngine(FeedEngine &feedEngine)
EMDI handler configuration settings.
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
const FeedEngineSettings & settings() const
Settings used define behavior of given instance.
Definition: FeedEngine.cpp:59
Log binary data of received packets, applied only for Info log level and below.
Definition: LogSettings.h:75
FilteringTraits::SecurityIdFilters SecurityIdFilters
bool checkXmlVersion(const std::string &filename, std::string &detectedVersion, UInt32 expectedXmlMajorVersion, UInt32 expectedEmlMinorVersion)
Definition: Utils.cpp:37
void setPartitionIdFilters(const PartitionIdFilters &filters)
bool isLastInPacket
Indicates whether a message is last in the packet.
Definition: Defines.h:72
std::string toString() const
Returns the string representation.
SequenceNumber packetMessageSeqNum
Packet message number.
Definition: Defines.h:69
OnixS::FIX::Core::FAST::TemplateId TemplateId
void setMarketSegmentId2Depth(const MarketSegmentId2Depth &map)
std::string removeInvalidTemplateNodes(std::string xml, const std::string &node)
Definition: Utils.cpp:121