OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.0.3
API documentation
EmdiHandlerImpl.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 #include <system/File.h>
20 #include <util/String.h>
21 
22 #include <OnixS/Core/Bootstrap/Platform.h>
23 
25 
26 #include <OnixS/HandlerCore/FeedEngine/NetFeedAttributes.h>
27 
28 #include "EmdiHandlerImpl.h"
29 #include "Formatting.h"
30 #include "FeedEngineImpl.h"
31 #include "Version.h"
32 
33 
34 namespace OnixS {
35 namespace Eurex {
36 namespace MarketData {
37 namespace Implementation {
38 
39 using namespace HandlerCore::MarketData;
40 using namespace HandlerCore::Common;
41 using namespace Networking;
42 
43 
45  : base("EMDI", NULL, settings.maxPacketSize, settings.licenseDirectory)
46  , settings_(settings)
47  , logFacilityForSnapshots_("Snapshot", this, Logging::LOG_LEVEL_DEBUG)
48  , logFacilityForIncrementals_("Incremental", this, Logging::LOG_LEVEL_DEBUG)
49  , incrementalMessagesPool_(settings.messagePoolSize)
50  , snapShotContentAllocator_(settings.messagePoolSize)
51 {
52  initLogger(settings);
53 
54  forceLog(ONIXS_LOG_INFO[this] << "OnixS Eurex T7 Emdi Handler, version " << OnixS::Eurex::MarketData::Implementation::version() << ", " << os());
55  forceLog(ONIXS_LOG_INFO[this] << settings.toString());
56 
57  try
58  {
59  listenerHolder_.reset(new ListenerHolder(this, logger_.get()));
60 
61  std::string detectedXmlVersion;
62  if (!checkXmlVersion(settings.fastTemplatesFile, detectedXmlVersion))
63  {
64  listenerHolder_->invokeWarning("Can not extract template file version.");
65  forceLog(ONIXS_LOG_WARN [this] << "Can not extract template file version.");
66  }
67  else
68  forceLog(ONIXS_LOG_INFO[this] << "Detected template file version: " << detectedXmlVersion);
69 
70  const std::string xml(removeInvalidTemplateNodes(System::File::readTextFile(settings.fastTemplatesFile), "EMDPacketHeader"));
71 
72  incrementalFixDecoder_.reset(new FixDecoder(xml, false, InputDataTraits::CompleteMessagesOnly));
73  snapshotFixDecoder_.reset(new FixDecoder(xml, false, InputDataTraits::CompleteMessagesOnly));
74 
75  EmdiDataRepositoryConfig repositoryConfig;
76  repositoryConfig.name = "DataRepository";
77  repositoryConfig.parent = this;
78  repositoryConfig.logger = logger_.get();
79  repositoryConfig.listenerHolder = listenerHolder_.get();
80  repositoryConfig.settings = settings;
81  dataRepository_.reset(new EmdiDataRepository(repositoryConfig));
82 
83  dataRepository_->subscribeOnSnapshotStartRequest(boost::bind(&EmdiHandlerImpl::onSnapshotStartRequest, this));
84  dataRepository_->subscribeOnSnapshotStopRequest(boost::bind(&EmdiHandlerImpl::onSnapshotStopRequest, this));
85 
86  incrementalPacketProcessor_.reset(
88  settings.maxPacketWaitingTime,
89  settings.heartbeatInterval,
91  lock(),
92  boost::bind(&EmdiHandlerImpl::filterPacket, this, _1))
93  );
94  incrementalPacketProcessor_->subscribeOnPacketData(boost::bind(&EmdiHandlerImpl::onIncrementalPacketData, this, _1, _2, _3, _4));
95  incrementalPacketProcessor_->subscribeOnGap(boost::bind(&EmdiHandlerImpl::onIncrementalPacketGap, this));
96  incrementalPacketProcessor_->subscribeOnSenderCompIdChange(boost::bind(&EmdiHandlerImpl::onIncrementalSenderCompIdChange, this, _1, _2, _3, _4));
97  incrementalPacketProcessor_->subscribeOnInactivity(boost::bind(&EmdiHandlerImpl::onIncrementalInactivity, this));
98  incrementalPacketProcessor_->subscribeOnFeedInactivity(boost::bind(&EmdiHandlerImpl::onIncrementalFeedInactivity, this, _1));
99 
100  snapshotPacketProcessor_.reset(
102  settings.maxPacketWaitingTime,
103  settings.heartbeatInterval,
105  lock(),
106  boost::bind(&EmdiHandlerImpl::filterPacket, this, _1))
107  );
108  snapshotPacketProcessor_->subscribeOnPacketData(boost::bind(&EmdiHandlerImpl::onSnapshotPacketData, this, _1, _2, _3, _4));
109  snapshotPacketProcessor_->subscribeOnGap(boost::bind(&EmdiHandlerImpl::onSnapshotPacketGap, this));
110  snapshotPacketProcessor_->subscribeOnSenderCompIdChange(boost::bind(&EmdiHandlerImpl::onSnapshotSenderCompIdChange, this, _1, _2, _3, _4));
111  snapshotPacketProcessor_->subscribeOnInactivity(boost::bind(&EmdiHandlerImpl::onSnapshotInactivity, this));
112  snapshotPacketProcessor_->subscribeOnFeedInactivity(boost::bind(&EmdiHandlerImpl::onSnapshotFeedInactivity, this, _1));
113  }
114  catch (const std::exception& ex)
115  {
116  log(ONIXS_LOG_ERROR[this] << "Exception in EmdiHandlerImpl constructor: " << ex.what());
117  throw;
118  }
119  catch (...)
120  {
121  log(ONIXS_LOG_WARN[this] << "Unknown exception in EmdiHandlerImpl constructor");
122  throw;
123  }
124 }
125 
127 {
128  incrementalMessagesPool_.reset();
129  snapShotContentAllocator_.reset();
130 
131  stop();
132 }
133 
134 void EmdiHandlerImpl::commonLog(int logLevel, const char * msg, size_t length)
135 {
136  if (logLevel < OnixS::Logging::MIN_LOG_LEVEL || logLevel > OnixS::Logging::MAX_LOG_LEVEL)
137  throw OperationException(
138  BOOST_CURRENT_FUNCTION,
139  OnixS::Util::format("An error occurred during log operation. Invalid log level (logLevel=%d).", logLevel).c_str());
140 
141  log(OnixS::Logging::LogMsg(OnixS::Logging::LogLevel(logLevel))["USER"] << OnixS::Util::ValuePtr(msg, length));
142 }
143 
145 {
146  setFeedEngine(FeedEngineImpl::getFeedEngine(feedEngine));
147 
148  forceLog(ONIXS_LOG_INFO[this] << "Bound FeedEngine");
149  forceLog(ONIXS_LOG_INFO[this] << feedEngine.settings().toString());
150  forceLog(ONIXS_LOG_INFO[this] << feedEngine.info());
151 }
152 
154 {
155  dataRepository_->preBuildProductDataRepositories();
156 
157  base::start();
158 }
159 
161 {
162  dataRepository_->preBuildProductDataRepositories();
163 
164  LogPlayerLinks links;
165  links.push_back(LogPlayerLink("Incremental", incrementalPacketProcessor_.get()));
166  links.push_back(LogPlayerLink("Snapshot", snapshotPacketProcessor_.get()));
167 
168  BaseHandlerImp::start(options, links, "EMDI");
169 }
170 
172 {
173  base::stop();
174 }
175 
176 void EmdiHandlerImpl::onStarted()
177 {
178  incrementalPacketProcessor_->reset();
179  snapshotPacketProcessor_->reset();
180 
181  FE::MulticastFeedLayout::Enum layout = FE::MulticastFeedLayout::Both;
182  if (!settings_.useFeedA)
183  layout = FE::MulticastFeedLayout::BOnly;
184  else if (!settings_.useFeedB)
185  layout = FE::MulticastFeedLayout::AOnly;
186 
187  FE::NifsByFeedRole nifs;
188  nifs.A(settings_.networkInterfaceA.empty() ? settings_.networkInterface : settings_.networkInterfaceA);
189  nifs.B(settings_.networkInterfaceB.empty() ? settings_.networkInterface : settings_.networkInterfaceB);
190 
191 
192  {
193  FE::ConnectionInfo connectionA;
194  FE::ConnectionInfo connectionB;
195 
196  connectionA.role(FE::NetFeedRole::A);
197  connectionB.role(FE::NetFeedRole::B);
198 
199  ///
200  connectionA.type(FE::NetFeedType::Incremental);
201  connectionB.type(FE::NetFeedType::Incremental);
202 
203  connectionA.id("IncrementalA");
204  connectionB.id("IncrementalB");
205 
206  connectionA.ip(settings_.interfaceDescriptor.incrementalFeed.serviceA.address);
207  connectionB.ip(settings_.interfaceDescriptor.incrementalFeed.serviceB.address);
208 
209  connectionA.port(settings_.interfaceDescriptor.incrementalFeed.serviceA.port);
210  connectionB.port(settings_.interfaceDescriptor.incrementalFeed.serviceB.port);
211 
212  FE::ConnectionInfoList incrementalConnections;
213  incrementalConnections.push_back(connectionA);
214  incrementalConnections.push_back(connectionB);
215 
216  BOOST_ASSERT(incrementalPacketProcessor_.get());
217 
218  incrementalFeeds_.reset(
219  constructCluster(
220  FE::NetFeedType::Incremental,
221  layout,
222  incrementalConnections,
223  nifs,
224  FE::TimeSpan(0, 0, 0, settings_.maxPacketWaitingTime * FE::TimeTraits::nanosecondsPerMillisecond()),
225  *incrementalPacketProcessor_.get()
226  )
227  );
228  }
229 
230  {
231  FE::ConnectionInfo connectionA;
232  FE::ConnectionInfo connectionB;
233 
234  connectionA.role(FE::NetFeedRole::A);
235  connectionB.role(FE::NetFeedRole::B);
236 
237  connectionA.type(FE::NetFeedType::Snapshot);
238  connectionB.type(FE::NetFeedType::Snapshot);
239 
240  connectionA.id("SnapshotA");
241  connectionB.id("SnapshotB");
242 
243  connectionA.ip(settings_.interfaceDescriptor.snapshotFeed.serviceA.address);
244  connectionB.ip(settings_.interfaceDescriptor.snapshotFeed.serviceB.address);
245 
246  connectionA.port(settings_.interfaceDescriptor.snapshotFeed.serviceA.port);
247  connectionB.port(settings_.interfaceDescriptor.snapshotFeed.serviceB.port);
248 
249  FE::ConnectionInfoList snapshotConnections;
250  snapshotConnections.push_back(connectionA);
251  snapshotConnections.push_back(connectionB);
252 
253  BOOST_ASSERT(snapshotPacketProcessor_.get());
254 
255  snapshotFeeds_.reset(
256  constructCluster(
257  FE::NetFeedType::Snapshot,
258  layout,
259  snapshotConnections,
260  nifs,
261  FE::TimeSpan(0, 0, 0, settings_.maxPacketWaitingTime * FE::TimeTraits::nanosecondsPerMillisecond()),
262  *snapshotPacketProcessor_.get()
263  )
264  );
265  }
266 
267 
268  incrementalFeeds_->connect();
269 }
270 
271 void EmdiHandlerImpl::onStopped()
272 {
273  incrementalFeeds_.reset();
274  snapshotFeeds_.reset();
275 }
276 
277 void EmdiHandlerImpl::onPrepareFeeds()
278 {
279 }
280 
281 void EmdiHandlerImpl::onSnapshotStartRequest()
282 {
283  if (!isBacktrestingMode())
284  {
285  log(ONIXS_LOG_DEBUG[this] << "Reseting snapshot packet processor");
286  snapshotPacketProcessor_->reset();
287 
288  snapshotFeeds_->connect();
289  }
290  else
291  log(ONIXS_LOG_DEBUG[this] << "Starting snapshot feed");
292 }
293 
294 void EmdiHandlerImpl::onSnapshotStopRequest()
295 {
296  if (!isBacktrestingMode())
297  {
298  snapshotFeeds_->disconnect();
299  }
300 
301  log(ONIXS_LOG_DEBUG[this] << "Reseting snapshot packet processor");
302  snapshotPacketProcessor_->reset();
303 
304  snapShotContentAllocator_.reset();
305 }
306 
307 void EmdiHandlerImpl::onIncrementalPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
308 {
309  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Packet " << dataSource.packetSeqNum);
310 
311  if (settings_.logSettings& LogSettings::LogPackets)
312  log(ONIXS_LOG_INFO[&logFacilityForIncrementals_] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
313 
314  size_t offset = headerSize;
315  size_t chunkLength;
316  TemplateId templateId = 0;
317  bool result;
318 
319  const PacketHeaderForEMDI* const packetHeader = (PacketHeaderForEMDI*)(data);
320  const UInt32 packetHeaderTemplateId = PacketHeaderHelper::getTemplateId(packetHeader);
321 
322  if (packetHeaderTemplateId != PacketHeaderForEMDI::TemplateIdValue)
323  {
324  throw OperationException(
325  BOOST_CURRENT_FUNCTION,
326  format("Wrong packet header id: expected %d, but received %d", PacketHeaderForEMDI::TemplateIdValue, packetHeaderTemplateId).c_str()
327  );
328  }
329 
330 
331  while (offset < length)
332  {
333  CORE::Message* const msg = incrementalMessagesPool_.create();
334  result = incrementalFixDecoder_->decode(reinterpret_cast<const uint8*>(data), offset, length - offset, msg, &chunkLength, &templateId);
335 
336  if (chunkLength == 0)
337  break;
338 
339  if (!result)
340  break;
341 
342  offset += chunkLength;
343  ++dataSource.packetMessageSeqNum;
344  dataSource.isLastInPacket = offset >= length;
345 
346  if (templateId == ResetTemplateId)
347  continue;
348 
349  dataRepository_->onIncrementalMessage(dataSource, *msg, templateId);
350  }
351 
352  if (incrementalMessagesPool_.recommendReset())
353  {
354  if (!dataRepository_->hasCashedIncrementals())
355  {
356  incrementalMessagesPool_.reset();
357  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Resetting incremental Messages Pool");
358  }
359  else
360  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Unable to Reset incremental Messages Pool : cashed messages exist");
361  }
362 }
363 
364 void EmdiHandlerImpl::onIncrementalPacketGap()
365 {
366  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Packet gap");
367  dataRepository_->onIncrementalPacketGap();
368 
369  if (!dataRepository_->hasCashedIncrementals())
370  {
371  incrementalMessagesPool_.reset();
372  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Resetting incremental Messages Pool");
373  }
374 }
375 
376 void EmdiHandlerImpl::onIncrementalSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
377 {
378  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum " << lastSeqNum << "->" << newSeqNum);
379  dataRepository_->onIncrementalSenderCompIdChange();
380 
381  if (!dataRepository_->hasCashedIncrementals())
382  {
383  incrementalMessagesPool_.reset();
384  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Resetting incremental Messages Pool");
385  }
386 }
387 
388 void EmdiHandlerImpl::onIncrementalInactivity()
389 {
390  TextBuilder tb;
391  tb << "No " << LogFacility::name() << " incremental data";
392  listenerHolder_->invokeWarning(tb.toString());
393  dataRepository_->noDataOnIncrementalFeeds();
394 }
395 
396 void EmdiHandlerImpl::onIncrementalFeedInactivity(FE::NetFeedRole::Enum id)
397 {
398  TextBuilder tb;
399  tb << "No " << LogFacility::name() << " incremental data on " << FE::toStr(id);
400  listenerHolder_->invokeWarning(tb.toString());
401 }
402 
403 void EmdiHandlerImpl::onSnapshotPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
404 {
405  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "Packet " << dataSource.packetSeqNum);
406 
407  if (settings_.logSettings & LogSettings::LogPackets)
408  log(ONIXS_LOG_INFO[&logFacilityForSnapshots_] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
409 
410  try
411  {
412  const PacketHeaderForEMDI* const packetHeader = reinterpret_cast<const PacketHeaderForEMDI*>(data);
413  const UInt32 packetHeaderTemplateId = PacketHeaderHelper::getTemplateId(packetHeader);
414 
415  if (packetHeaderTemplateId != PacketHeaderForEMDI::TemplateIdValue)
416  {
417  throw OperationException(
418  BOOST_CURRENT_FUNCTION,
419  format("Wrong packet header id: expected %d, but received %d", PacketHeaderForEMDI::TemplateIdValue, packetHeaderTemplateId).c_str()
420  );
421  }
422 
423  size_t offset = headerSize;
424  bool result = false;
425  size_t chunkLength;
426  TemplateId templateId = 0;
427 
428  while (offset < length)
429  {
430  CORE::Message msg(snapShotContentAllocator_);
431  result = snapshotFixDecoder_->decode(reinterpret_cast<const uint8*>(data), offset, length - offset, &msg, &chunkLength, &templateId);
432 
433  if (chunkLength == 0)
434  break;
435 
436  if (!result)
437  break;
438 
439  offset += chunkLength;
440  ++dataSource.packetMessageSeqNum;
441  dataSource.isLastInPacket = !(offset < length);
442 
443  if (templateId == ResetTemplateId)
444  continue;
445 
446  bool skipRestOfPacket = false;
447 
448  dataRepository_->onSnapshotMessage(dataSource, msg, &skipRestOfPacket);
449  if (skipRestOfPacket)
450  {
451  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "Rest of packet skipped");
452  break;
453  }
454  }
455  }
456  catch (const std::exception& ex)
457  {
458  log(ONIXS_LOG_WARN[&logFacilityForSnapshots_] << "Exception while message decoding: " << ex.what());
459  }
460  catch (...)
461  {
462  log(ONIXS_LOG_WARN[&logFacilityForSnapshots_] << "Unknown exception while message decoding");
463  }
464 }
465 
466 void EmdiHandlerImpl::onSnapshotPacketGap()
467 {
468  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "Packet gap detected");
469  snapShotContentAllocator_.reset();
470 }
471 
472 void EmdiHandlerImpl::onSnapshotSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
473 {
474  log(ONIXS_LOG_DEBUG[&logFacilityForSnapshots_] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum " << lastSeqNum << "->" << newSeqNum);
475  dataRepository_->onSnapshotSenderCompIdChange();
476 }
477 
478 void EmdiHandlerImpl::onSnapshotInactivity()
479 {
480  TextBuilder tb;
481  tb << "No " << LogFacility::name() << " snapshot data";
482  listenerHolder_->invokeWarning(tb.toString());
483  //dataRepository_->noDataOnSnapshotFeeds();
484  snapShotContentAllocator_.reset();
485 }
486 
487 void EmdiHandlerImpl::onSnapshotFeedInactivity(FE::NetFeedRole::Enum id)
488 {
489  TextBuilder tb;
490  tb << "No " << LogFacility::name() << " snapshot data on " << FE::toStr(id);
491  listenerHolder_->invokeWarning(tb.toString());
492 }
493 
495 {
496  dataRepository_->setMarketSegmentId2Depth(map);
497 
498  if (map.size() > 0)
499  {
500  TextBuilder tb;
501  tb << "Market SegmentId to Depth Map: ";
502  for (MarketSegmentId2Depth::const_iterator it = map.begin(), end = map.end(); it != end; ++it)
503  tb << '\n' << it->first << " => " << it->second;
504  log(ONIXS_LOG_INFO[this] << tb.toString());
505  }
506 }
507 
509 {
510  partitionIdFilters_.clear();
511  std::copy(filters.begin(), filters.end(), std::inserter(partitionIdFilters_, partitionIdFilters_.end()));
512 
513  if (filters.size() > 0)
514  {
515  TextBuilder tb;
516  tb << "Partition Id Filters: ";
517  for (PartitionIdFilters::const_iterator it = filters.begin(), end = filters.end(); it != end; ++it)
518  tb << '\n' << *it;
519  log(ONIXS_LOG_INFO[this] << tb.toString());
520  }
521 }
522 
524 {
525  partitionIdFilters_.clear();
526  log(ONIXS_LOG_INFO[this] << "removeAllPartitionIdFilters");
527 }
528 
530 {
531  BOOST_ASSERT(dataRepository_.get() != NULL);
532  BOOST_ASSERT(!filters.empty());
533 
534  dataRepository_->setMarketSegmentIdFilters(filters);
535 
536  if (filters.size() > 0)
537  {
538  TextBuilder tb;
539  tb << "Market Segment Id Filters: ";
540  for (MarketSegmentIdFilters::const_iterator it = filters.begin(), end = filters.end(); it != end; ++it)
541  tb << '\n' << *it;
542  log(ONIXS_LOG_INFO[this] << tb.toString());
543  }
544 }
545 
547 {
548  BOOST_ASSERT(dataRepository_.get() != NULL);
549  dataRepository_->clearMarketSegmentIdFilters();
550  log(ONIXS_LOG_INFO[this] << "removeAllMarketSegmentIdFilters");
551 }
552 
554 {
555  dataRepository_->setSecurityIdFilters(filters);
556 
557  if (filters.size() > 0)
558  {
559  TextBuilder tb;
560  tb << "Security Id Filters: ";
561  for (SecurityIdFilters::const_iterator it = filters.begin(), end = filters.end(); it != end; ++it)
562  tb << '\n' << *it;
563  log(ONIXS_LOG_INFO[this] << tb.toString());
564  }
565 }
566 
568 {
569  dataRepository_->removeAllSecurityIdFilters();
570  log(ONIXS_LOG_INFO[this] << "removeAllSecurityIdFilters");
571 }
572 
573 bool EmdiHandlerImpl::filterPacket(PartitionId partitionId)
574 {
575  if (partitionIdFilters_.empty())
576  return true;
577 
578  if (partitionIdFilters_.end() != partitionIdFilters_.find(partitionId))
579  return true;
580 
581  return false;
582 }
583 
584 void EmdiHandlerImpl::initLogger(const HandlerSettings &settings)
585 {
586  logger_.reset(new HandlerLogger(convertLogSettings(settings)));
587 
588  setLogger(&logger_->getLogger());
589 }
590 
591 void EmdiHandlerImpl::onStateChanged(HandlerCore::MarketData::MulticastFeedHandler::State::Enum newState)
592 {
593  listenerHolder_->invokeStateChanged(static_cast<HandlerState::Enum>(newState));
594 }
595 
596 }
597 }
598 }
599 }
600 
unsigned int SequenceNumber
Alias for sequence numbers.
std::string toString() const
Returns the string representation.
void setMarketSegmentIdFilters(const MarketSegmentIdFilters &filters)
Manages processing machinery for market data received from feeds.
Definition: FeedEngine.h:283
MarketDepthTraits::MarketSegmentId2Depth MarketSegmentId2Depth
void setSecurityIdFilters(const SecurityIdFilters &filters)
UInt32 PartitionId
Alias for Partition ID type.
Definition: Defines.h:48
InterfaceDescriptor interfaceDescriptor
Sets data interface technical configuration.
ServiceDescriptor serviceA
Service A.
std::string info() const
Feed engine info.
Definition: FeedEngine.cpp:64
FilteringTraits::PartitionIdFilters PartitionIdFilters
ServiceDescriptor serviceB
Service B.
static UInt32 getTemplateId(TPacketHeader *packetHeader)
Definition: PacketHeader.h:105
Handler base configuration settings.
Util::TextBuilder TextBuilder
Definition: Formatting.h:42
EmdiHandlerImpl(const EmdiHandlerSettings &settings)
Initialize new instance.
unsigned int UInt32
Definition: Numeric.h:41
Definition: Group.h:25
FilteringTraits::MarketSegmentIdFilters MarketSegmentIdFilters
Defines params which affect logs replay.
Definition: Replay.h:74
SequenceNumber packetSeqNum
Packet sequence number.
Definition: Defines.h:66
static HandlerCore::MarketData::FE::MulticastFeedEngine * getFeedEngine(FeedEngine &feedEngine)
EMDI handler configuration settings.
void commonLog(int logLevel, const char *msg, size_t length)
User&#39;s common log.
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
const FeedEngineSettings & settings() const
Settings used define behavior of given instance.
Definition: FeedEngine.cpp:59
Log binary data of received packets, applied only for Info log level and below.
Definition: LogSettings.h:75
FilteringTraits::SecurityIdFilters SecurityIdFilters
void setPartitionIdFilters(const PartitionIdFilters &filters)
bool checkXmlVersion(const std::string &filename, std::string &detectedVersion)
Definition: Utils.cpp:37
bool isLastInPacket
Indicates whether a message is last in the packet.
Definition: Defines.h:72
std::string toString() const
Returns the string representation.
SequenceNumber packetMessageSeqNum
Packet message number.
Definition: Defines.h:69
OnixS::FIX::Core::FAST::TemplateId TemplateId
void setMarketSegmentId2Depth(const MarketSegmentId2Depth &map)
std::string removeInvalidTemplateNodes(std::string xml, const std::string &node)
Definition: Utils.cpp:117