OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.4.2
API documentation
EmdsHandlerImpl.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 
20 #include <boost/format.hpp>
21 
22 #include <system/File.h>
23 #include <util/String.h>
24 
25 #include <OnixS/HandlerCore/Utils.h>
26 
28 
29 #include "EmdsHandlerImpl.h"
30 #include "PacketHeader.h"
31 #include "Formatting.h"
32 #include "MessageOperator.h"
33 #include "FeedEngineImpl.h"
34 #include "Version.h"
35 
36 
37 namespace OnixS {
38 namespace Eurex {
39 namespace MarketData {
40 namespace Implementation {
41 
42 using namespace std;
43 using namespace Networking;
44 using namespace HandlerCore::MarketData;
45 using namespace HandlerCore::Common;
46 
47 
49  : base ("EMDS", nullptr, settings.maxPacketSize, settings.licenseDirectory)
50  , settings_(settings)
51 {
52  initLogger(settings);
53 
54  forceLog (ONIXS_LOG_INFO[this] << "OnixS Eurex EMDS Handler, version " << OnixS::Eurex::MarketData::Implementation::version() << ", " << os());
55  forceLog (ONIXS_LOG_INFO[this] << settings.toString());
56 
57  initListenerHolder();
58 
59  try
60  {
61  listenerHolder_.reset(new ListenerHolder(this, logger_.get()));
62 
63  string xmlTemplates = System::File::readTextFile(settings.fastTemplatesFile);
64  fixDecoder_.reset(new FixDecoder(xmlTemplates, false, InputDataTraits::CompleteMessagesOnly));
65 
66  settlementFeedPacketProcessor.reset(
68  settings_.lostPacketWaitTime,
69  settings.heartbeatInterval,
71  lock()
72  )
73  );
74  settlementFeedPacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onSettlementPacketData, this, _1, _2, _3, _4));
75  settlementFeedPacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onSettlementPacketGap, this));
76  settlementFeedPacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onSettlementSenderCompIdChange, this, _1, _2, _3, _4));
77  settlementFeedPacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onSettlementInactivity, this));
78  settlementFeedPacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onSettlementFeedInactivity, this, _1));
79 
80  openInterestPacketProcessor.reset(
82  settings_.lostPacketWaitTime,
83  settings.heartbeatInterval,
85  lock())
86  );
87  openInterestPacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onOpenInterestPacketData, this, _1, _2, _3, _4));
88  openInterestPacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onOpenInterestPacketGap, this));
89  openInterestPacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onOpenInterestSenderCompIdChange, this, _1, _2, _3, _4));
90  openInterestPacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onOpenInterestInactivity, this));
91  openInterestPacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onOpenInterestFeedInactivity, this, _1));
92 
93  otcTradePacketProcessor.reset(
95  settings_.lostPacketWaitTime,
96  settings.heartbeatInterval,
98  lock())
99  );
100  otcTradePacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onOtcTradePacketData, this, _1, _2, _3, _4));
101  otcTradePacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onOtcTradePacketGap, this));
102  otcTradePacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onOtcTradeSenderCompIdChange, this, _1, _2, _3, _4));
103  otcTradePacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onOtcTradeInactivity, this));
104  otcTradePacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onOtcTradeFeedInactivity, this, _1));
105 
106  exchangeTradePacketProcessor.reset(
108  settings_.lostPacketWaitTime,
109  settings.heartbeatInterval,
111  lock())
112  );
113  exchangeTradePacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onExchangeTradePacketData, this, _1, _2, _3, _4));
114  exchangeTradePacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onExchangeTradePacketGap, this));
115  exchangeTradePacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onExchangeTradeSenderCompIdChange, this, _1, _2, _3, _4));
116  exchangeTradePacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onExchangeTradeInactivity, this));
117  exchangeTradePacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onExchangeTradeFeedInactivity, this, _1));
118  }
119  catch (const std::exception& ex)
120  {
121  reportError(TextBuilder() << "Exception in EmdsHandlerImpl constructor: " << ex.what());
122  throw;
123  }
124  catch (...)
125  {
126  reportError("Unknown exception in EmdsHandlerImpl constructor");
127  throw;
128  }
129 }
130 
132 {
133  stop();
134 }
135 
137 {
138  setFeedEngine(FeedEngineImpl::getFeedEngine(feedEngine));
139 
140  forceLog(ONIXS_LOG_INFO[this] << "Bound FeedEngine");
141  forceLog(ONIXS_LOG_INFO[this] << feedEngine.settings().toString());
142  forceLog(ONIXS_LOG_INFO[this] << feedEngine.info());
143 }
144 
146 {
147  base::start();
148 }
149 
151 {
152  settlementFeedPacketProcessor->activateReplayMode();
153  openInterestPacketProcessor->activateReplayMode();
154  otcTradePacketProcessor->activateReplayMode();
155  exchangeTradePacketProcessor->activateReplayMode();
156 
157 
158  LogPlayerLinks links;
159  links.push_back(LogPlayerLink("Settlement", settlementFeedPacketProcessor.get()));
160  links.push_back(LogPlayerLink("Interest", openInterestPacketProcessor.get()));
161  links.push_back(LogPlayerLink("OtcTrade", otcTradePacketProcessor.get()));
162  links.push_back(LogPlayerLink("ExchangeTrade", exchangeTradePacketProcessor.get()));
163 
164  BaseHandlerImp::start(options, links, "EMDS");
165 }
166 
168 {
169  base::stop();
170 }
171 
172 void EmdsHandlerImpl::onStarting()
173 {
174  settlementFeedPacketProcessor->reset();
175  openInterestPacketProcessor->reset();
176  otcTradePacketProcessor->reset();
177  exchangeTradePacketProcessor->reset();
178 
179  FE::ConnectionInfo connectionA;
180  FE::ConnectionInfo connectionB;
181 
182  connectionA.role(FE::NetFeedRole::A);
183  connectionB.role(FE::NetFeedRole::B);
184 
185  FE::MulticastFeedLayout::Enum layout = FE::MulticastFeedLayout::Both;
186  if (!settings_.useFeedA)
187  layout = FE::MulticastFeedLayout::BOnly;
188  else if (!settings_.useFeedB)
189  layout = FE::MulticastFeedLayout::AOnly;
190 
191  FE::NifsByFeedRole nifs;
192  nifs.A(settings_.networkInterfaceA.empty() ? settings_.networkInterface : settings_.networkInterfaceA);
193  nifs.B(settings_.networkInterfaceB.empty() ? settings_.networkInterface : settings_.networkInterfaceB);
194 
195  connectionA.type(FE::NetFeedType::Incremental);
196  connectionB.type(FE::NetFeedType::Incremental);
197 
198 
199  const bool anyValidFeed =
200  checkDescriptor(layout, settings_.settlementFeedDescriptor) ||
201  checkDescriptor(layout, settings_.exchangeTradeFeedDescriptor) ||
202  checkDescriptor(layout, settings_.openInterestFeedDescriptor);
203 
204  if(!anyValidFeed)
205  {
206  const std::string what = "No valid feeds provided.";
207 
208  reportError(what);
209  throw std::runtime_error(what);
210  }
211 
212  createSettlementFeeds(layout, connectionA, connectionB, nifs);
213  createOpenInterestsFeeds(layout, connectionA, connectionB, nifs);
214  createExchangeTradeFeeds(layout, connectionA, connectionB, nifs);
215 }
216 
217 void EmdsHandlerImpl::createSettlementFeeds(
218  FE::MulticastFeedLayout::Enum layout,
219  FE::ConnectionInfo& connectionA,
220  FE::ConnectionInfo& connectionB,
221  const FE::NifsByFeedRole& nifs)
222 {
223  const FeedDescriptor& descriptor = settings_.settlementFeedDescriptor;
224 
225  if(checkDescriptor(layout, descriptor))
226  {
227  connectionA.id("SettlementA");
228  connectionB.id("SettlementB");
229 
230  connectionA.ip(descriptor.serviceA.address);
231  connectionB.ip(descriptor.serviceB.address);
232 
233  connectionA.port(descriptor.serviceA.port);
234  connectionB.port(descriptor.serviceB.port);
235 
236  FE::ConnectionInfoList connections;
237  connections.push_back(connectionA);
238  connections.push_back(connectionB);
239 
240  settlementFeeds_.reset(
241  constructCluster(
242  FE::NetFeedType::Incremental,
243  layout,
244  connections,
245  nifs,
246  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
247  *settlementFeedPacketProcessor
248  )
249  );
250 
251  settlementFeeds_->connect(FE::CanThrowNow());
252  }
253  else
254  {
255  log(ONIXS_LOG_INFO[this]
256  << "No valid config for settlementFeedDescriptor provided, feed is not created.");
257  }
258 }
259 
260 void EmdsHandlerImpl::createExchangeTradeFeeds(
261  FE::MulticastFeedLayout::Enum layout,
262  FE::ConnectionInfo& connectionA,
263  FE::ConnectionInfo& connectionB,
264  const FE::NifsByFeedRole& nifs)
265 {
266  const FeedDescriptor& descriptor = settings_.exchangeTradeFeedDescriptor;
267 
268  if(checkDescriptor(layout, descriptor))
269  {
270  connectionA.id("ExchangeTradeA");
271  connectionB.id("ExchangeTradeB");
272 
273  connectionA.ip(descriptor.serviceA.address);
274  connectionB.ip(descriptor.serviceB.address);
275 
276  connectionA.port(descriptor.serviceA.port);
277  connectionB.port(descriptor.serviceB.port);
278 
279  FE::ConnectionInfoList connections;
280  connections.push_back(connectionA);
281  connections.push_back(connectionB);
282 
283  exchangeTradeFeeds_.reset(
284  constructCluster(
285  FE::NetFeedType::Incremental,
286  layout,
287  connections,
288  nifs,
289  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
290  *exchangeTradePacketProcessor
291  )
292  );
293 
294  exchangeTradeFeeds_->connect(FE::CanThrowNow());
295  }
296  else
297  {
298  log(ONIXS_LOG_INFO[this]
299  << "No valid config for exchangeTradeFeedDescriptor provided, feed is not created.");
300  }
301 }
302 
303 void EmdsHandlerImpl::createOpenInterestsFeeds(
304  FE::MulticastFeedLayout::Enum layout,
305  FE::ConnectionInfo& connectionA,
306  FE::ConnectionInfo& connectionB,
307  const FE::NifsByFeedRole& nifs)
308 {
309  const FeedDescriptor& descriptor = settings_.openInterestFeedDescriptor;
310 
311  if(checkDescriptor(layout, descriptor))
312  {
313  connectionA.id("OpenInterestA");
314  connectionB.id("OpenInterestB");
315 
316  connectionA.ip(descriptor.serviceA.address);
317  connectionB.ip(descriptor.serviceB.address);
318 
319  connectionA.port(descriptor.serviceA.port);
320  connectionB.port(descriptor.serviceB.port);
321 
322  FE::ConnectionInfoList connections;
323  connections.push_back(connectionA);
324  connections.push_back(connectionB);
325 
326  openInterestFeeds_.reset(
327  constructCluster(
328  FE::NetFeedType::Incremental,
329  layout,
330  connections,
331  nifs,
332  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
333  *openInterestPacketProcessor
334  )
335  );
336 
337  openInterestFeeds_->connect(FE::CanThrowNow());
338  }
339  else
340  {
341  log(ONIXS_LOG_INFO[this]
342  << "No valid config for openInterestFeedDescriptor provided, feed is not created.");
343  }
344 }
345 
346 bool EmdsHandlerImpl::checkDescriptor(FE::MulticastFeedLayout::Enum layout, const FeedDescriptor& descriptor) const
347 {
348  const bool isAConfigured = !descriptor.serviceA.empty();
349  const bool isBConfigured = !descriptor.serviceB.empty();
350 
351  if(FE::MulticastFeedLayout::Both == layout)
352  {
353  if(!isAConfigured || !isBConfigured)
354  return false;
355  }
356  else if(FE::MulticastFeedLayout::AOnly == layout)
357  {
358  if(!isAConfigured)
359  return false;
360  }
361  else if(FE::MulticastFeedLayout::BOnly == layout)
362  {
363  if(!isBConfigured)
364  return false;
365  }
366 
367  return true;
368 }
369 
370 void EmdsHandlerImpl::onStopping()
371 {
372  if(settlementFeeds_)
373  settlementFeeds_->disconnect();
374 
375  if(openInterestFeeds_)
376  openInterestFeeds_->disconnect();
377 
378  if(otcTradeFeeds_)
379  otcTradeFeeds_->disconnect();
380 
381  if(exchangeTradeFeeds_)
382  exchangeTradeFeeds_->disconnect();
383 }
384 
385 void EmdsHandlerImpl::onSettlementPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
386 {
387  static Logging::LogFacility logFacility("Settlement", this, Logging::LOG_LEVEL_DEBUG);
388 
389  if(settings_.logSettings & LogSettings::LogPackets)
390  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
391 
392  onPacketData(logFacility, dataSource, data, length, headerSize);
393 }
394 
395 void EmdsHandlerImpl::onSettlementPacketGap()
396 {
397  static Logging::LogFacility logFacility("Settlement", this, Logging::LOG_LEVEL_DEBUG);
398  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
399 }
400 
401 void EmdsHandlerImpl::onSettlementSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
402 {
403  static Logging::LogFacility logFacility("Settlement", this, Logging::LOG_LEVEL_DEBUG);
404  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
405 }
406 
407 void EmdsHandlerImpl::onOpenInterestPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
408 {
409  static Logging::LogFacility logFacility("Interest", this, Logging::LOG_LEVEL_DEBUG);
410 
411  if(settings_.logSettings & LogSettings::LogPackets)
412  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
413 
414  onPacketData(logFacility, dataSource, data, length, headerSize);
415 }
416 
417 void EmdsHandlerImpl::onOpenInterestPacketGap()
418 {
419  static Logging::LogFacility logFacility("Interest", this, Logging::LOG_LEVEL_DEBUG);
420  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
421 }
422 
423 void EmdsHandlerImpl::onOpenInterestSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
424 {
425  static Logging::LogFacility logFacility("Interest", this, Logging::LOG_LEVEL_DEBUG);
426  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
427 }
428 
429 void EmdsHandlerImpl::onOtcTradePacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
430 {
431  static Logging::LogFacility logFacility("OtcTrade", this, Logging::LOG_LEVEL_DEBUG);
432 
433  if(settings_.logSettings & LogSettings::LogPackets)
434  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
435 
436  onPacketData(logFacility, dataSource, data, length, headerSize);
437 }
438 
439 void EmdsHandlerImpl::onOtcTradePacketGap()
440 {
441  static Logging::LogFacility logFacility("OtcTrade", this, Logging::LOG_LEVEL_DEBUG);
442  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
443 }
444 
445 void EmdsHandlerImpl::onOtcTradeSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
446 {
447  static Logging::LogFacility logFacility("OtcTrade", this, Logging::LOG_LEVEL_DEBUG);
448  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
449 }
450 
451 void EmdsHandlerImpl::onExchangeTradePacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
452 {
453  static Logging::LogFacility logFacility("ExchangeTrade", this, Logging::LOG_LEVEL_DEBUG);
454 
455  if(settings_.logSettings & LogSettings::LogPackets)
456  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
457 
458  onPacketData(logFacility, dataSource, data, length, headerSize);
459 }
460 
461 void EmdsHandlerImpl::onExchangeTradePacketGap()
462 {
463  static Logging::LogFacility logFacility("ExchangeTrade", this, Logging::LOG_LEVEL_DEBUG);
464  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
465 }
466 
467 void EmdsHandlerImpl::onExchangeTradeSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
468 {
469  static Logging::LogFacility logFacility("ExchangeTrade", this, Logging::LOG_LEVEL_DEBUG);
470  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
471 }
472 
473 void EmdsHandlerImpl::onPacketData(const Logging::LogFacility& logFacility, DataSource& dataSource, const char* data, size_t length, size_t headerSize)
474 {
475  try
476  {
477  PacketHeaderForEmds* packetHeader = (PacketHeaderForEmds*)(data);
478  const UInt32 packetHeaderTemplateId = PacketHeaderHelper::getTemplateId(packetHeader);
479 
480  if(packetHeaderTemplateId != PacketHeaderForEmds::TemplateIdValue)
481  {
482  throw OperationException (
483  BOOST_CURRENT_FUNCTION,
484  format("Wrong packet header id: expected %d, but received %d", PacketHeaderForEmds::TemplateIdValue, packetHeaderTemplateId).c_str()
485  );
486  }
487 
488  size_t offset = headerSize;
489  while(offset < length)
490  {
491  size_t chunkLength;
492  unsigned int templateId;
493  const CORE::Message* msg = fixDecoder_->decode(Util::ByteBuffer(data + offset, length - offset), &chunkLength, &templateId);
494 
495  if(chunkLength == 0)
496  break;
497 
498  offset += chunkLength;
499 
500  if( templateId == 120 ) //FastReset
501  {
502  fixDecoder_->resetDictionaries();
503  continue;
504  }
505 
506  dataSource.packetMessageSeqNum++;
507  dataSource.isLastInPacket = offset >= length;
508 
509  processMessage(logFacility, dataSource, *msg, templateId);
510  }
511  }
512  catch (const std::exception& ex)
513  {
514  reportWarning(
515  TextBuilder()
516  << "Exception while message decoding or processing: "
517  << ex.what()
518  << ", Packet ("
519  << Base64Wrapper(data, length) << ")."
520  );
521  }
522  catch (...)
523  {
524  reportWarning(
525  TextBuilder()
526  << "Unknown exception while message decoding or processing"
527  << ", Packet ("
528  << Base64Wrapper(data, length) << ")."
529  );
530  }
531 }
532 
533 void EmdsHandlerImpl::processMessage(const Logging::LogFacility& logFacility, const DataSource& dataSource, const CORE::Message& msg, unsigned int templateId)
534 {
535  log(ONIXS_LOG_DEBUG[&logFacility] << msg);
536 
537  const CORE::FieldValue& marketSegmentIdValue = msg[Tags::MarketSegmentID];
538 
539  if( CORE::DataTypes::Null != marketSegmentIdValue.type())
540  {
541  //filter by market segment id
542  UInt32 marketSegmentId;
543  if( !marketSegmentIdfilters_.empty() && marketSegmentIdValue.convertTo(marketSegmentId) && marketSegmentIdfilters_.end() == marketSegmentIdfilters_.find(marketSegmentId))
544  return;
545  }
546 
547  switch(templateId)
548  {
549  case 171:
550  {
551  const OpenInterestWrapper wrapper(msg);
552  listenerHolder_->invokeOpenInterest(&wrapper, dataSource);
553  break;
554  }
555  case 172:
556  {
557  const SettlementWrapper wrapper(msg);
558  listenerHolder_->invokeSettlement(&wrapper, dataSource);
559  break;
560  }
561  case 175:
562  {
563  const ExchangeTradeWrapper wrapper(msg);
564  listenerHolder_->invokeExchangeTrade(&wrapper, dataSource);
565  break;
566  }
567  case 152:
568  {
569  unsigned int mDCount = 0;
570  msg[Tags::MDReportCount].convertTo(mDCount);
571 
572  const CORE::FieldValue& mDReportEventValue = msg[Tags::MDReportEvent];
573  unsigned int mDReportEvent = 0;
574  if( CORE::DataTypes::Null != mDReportEventValue.type())
575  mDReportEventValue.convertTo(mDReportEvent);
576 
577  switch(mDReportEvent)
578  {
579  //case 2: listenerHolder_->invokeOtcTradeReplayCycleStart(mDCount, dataSource); break;
580  //case 3: listenerHolder_->invokeOtcTradeReplayCycleEnd(dataSource); break;
581  case 4: listenerHolder_->invokeExchangeTradeReplayCycleStart(mDCount, dataSource); break;
582  case 5: listenerHolder_->invokeExchangeTradeReplayCycleEnd(dataSource); break;
583  case 6: listenerHolder_->invokeOpenInterestReplayCycleStart(mDCount, dataSource); break;
584  case 7: listenerHolder_->invokeOpenInterestReplayCycleEnd(dataSource); break;
585  case 8: listenerHolder_->invokeSettlementReplayCycleStart(mDCount, dataSource); break;
586  case 9: listenerHolder_->invokeSettlementReplayCycleEnd(dataSource); break;
587  default: reportWarning(TextBuilder() << "Unknown mDReportEvent: " << mDReportEvent);
588  }
589  }
590  break;
591  case 170: //heartbeat
592  break;
593  default:
594  reportWarning(TextBuilder()<< "Unknown message template id: " << templateId);
595  }
596 }
597 
599 {
600  Guard guard(marketSegmentIdfiltersLock_);
601  marketSegmentIdfilters_ = filters;
602 }
603 
605 {
606  Guard guard(marketSegmentIdfiltersLock_);
607  marketSegmentIdfilters_.clear();
608 }
609 
611 {
612  Guard guard(securityIdFiltersLock_);
613  securityIdFilters_ = filters;
614 }
615 
617 {
618  Guard guard(securityIdFiltersLock_);
619  securityIdFilters_.clear();
620 }
621 
622 void EmdsHandlerImpl::onSettlementInactivity()
623 {
624  reportWarning(TextBuilder() << "No settlement data");
625 }
626 
627 void EmdsHandlerImpl::onSettlementFeedInactivity(FE::NetFeedRole::Enum id)
628 {
629  reportWarning(TextBuilder() << "No settlement data on " << FE::toStr(id));
630 }
631 
632 void EmdsHandlerImpl::onOpenInterestInactivity()
633 {
634  reportWarning(TextBuilder() << "No open interest data");
635 }
636 
637 void EmdsHandlerImpl::onOpenInterestFeedInactivity(FE::NetFeedRole::Enum id)
638 {
639  reportWarning(TextBuilder() << "No open interest data on " << FE::toStr(id));
640 }
641 
642 void EmdsHandlerImpl::onOtcTradeInactivity()
643 {
644  reportWarning(TextBuilder() << "No OTC trade data");
645 }
646 
647 void EmdsHandlerImpl::onOtcTradeFeedInactivity(FE::NetFeedRole::Enum id)
648 {
649  reportWarning(TextBuilder() << "No OTC trade data on " << FE::toStr(id));
650 }
651 
652 void EmdsHandlerImpl::onExchangeTradeInactivity()
653 {
654  reportWarning(TextBuilder() << "No on-exchange trade data");
655 }
656 
657 void EmdsHandlerImpl::onExchangeTradeFeedInactivity(FE::NetFeedRole::Enum id)
658 {
659  reportWarning(TextBuilder() << "No on-exchange trade data on " << FE::toStr(id));
660 }
661 
662 void EmdsHandlerImpl::initLogger(const HandlerSettings &settings)
663 {
664  logger_.reset(new HandlerLogger(convertLogSettings(settings)));
665 
666  setLogger(&logger_->getLogger());
667 }
668 
669 void EmdsHandlerImpl::initListenerHolder()
670 {
671  try
672  {
673  listenerHolder_.reset(new ListenerHolder(this, logger_.get()));
674  }
675  catch (const std::exception& ex)
676  {
677  log(ONIXS_LOG_ERROR[this] << "Exception in EmdiHandlerImpl constructor: " << ex.what());
678  throw;
679  }
680  catch (...)
681  {
682  log(ONIXS_LOG_ERROR[this] << "Unknown exception in EmdiHandlerImpl constructor");
683  throw;
684  }
685 }
686 
687 void EmdsHandlerImpl::onStateChanged(HandlerCore::MarketData::MulticastFeedHandler::State::Enum newState)
688 {
689  listenerHolder_->invokeStateChanged(static_cast<HandlerState::Enum>(newState));
690 }
691 
692 void EmdsHandlerImpl::invokeError(const std::string& description)
693 {
695 }
696 
697 void EmdsHandlerImpl::invokeWarning(const std::string& description)
698 {
699  listenerHolder().invokeWarning(description);
700 }
701 
702 
703 }}}}
704 
void invokeWarning(const std::string &description) override
unsigned int SequenceNumber
Alias for sequence numbers.
std::string toString() const
Returns the string representation.
Manages processing machinery for market data received from feeds.
Definition: FeedEngine.h:287
void setMarketSegmentIdFilters(const MarketSegmentIdFilters &filters)
FeedDescriptor settlementFeedDescriptor
Sets Settlement prices feed technical configuration.
FeedDescriptor openInterestFeedDescriptor
Sets Open Interest prices feed technical configuration.
bool empty() const
checks whether descriptor is empty
ServiceDescriptor serviceA
Service A.
std::string info() const
Feed engine info.
Definition: FeedEngine.cpp:64
EMDI handler configuration settings.
STL namespace.
ServiceDescriptor serviceB
Service B.
static UInt32 getTemplateId(TPacketHeader *packetHeader)
Definition: PacketHeader.h:117
Handler base configuration settings.
Util::TextBuilder TextBuilder
Definition: Formatting.h:43
void invokeError(ErrorCode::Enum code, const std::string &description) ONIXS_NOEXCEPT
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
FilteringTraits::MarketSegmentIdFilters MarketSegmentIdFilters
Defines params which affect logs replay.
Definition: Replay.h:77
SequenceNumber packetSeqNum
Packet sequence number.
Definition: Defines.h:66
static HandlerCore::MarketData::FE::MulticastFeedEngine * getFeedEngine(FeedEngine &feedEngine)
ListenerHolder & listenerHolder()
Returns commons services as shared object.
void setSecurityIdFilters(const SecurityIdFilters &filters)
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
const FeedEngineSettings & settings() const
Settings used define behavior of given instance.
Definition: FeedEngine.cpp:59
Log binary data of received packets, applied only for Info log level and below.
Definition: LogSettings.h:75
FeedDescriptor exchangeTradeFeedDescriptor
Sets On-exchange trade prices feed technical configuration.
EmdsHandlerImpl(const EmdsHandlerSettings &settings)
Initialize new instance.
FilteringTraits::SecurityIdFilters SecurityIdFilters
void invokeError(const std::string &description) override
Invoke helpers.
bool isLastInPacket
Indicates whether a message is last in the packet.
Definition: Defines.h:72
Identifiers errors of generic nature.
Definition: ErrorListener.h:38
SequenceNumber packetMessageSeqNum
Packet message number.
Definition: Defines.h:69
void invokeWarning(const std::string &description) ONIXS_NOEXCEPT
std::string toString() const
Returns the string representation.