OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.2.1
API documentation
EmdsHandlerImpl.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 #include <boost/format.hpp>
20 
21 #include <system/File.h>
22 #include <util/String.h>
23 
25 
26 #include "EmdsHandlerImpl.h"
27 #include "PacketHeader.h"
28 #include "Formatting.h"
29 #include "MessageOperator.h"
30 #include "FeedEngineImpl.h"
31 #include "Version.h"
32 
33 
34 namespace OnixS {
35 namespace Eurex {
36 namespace MarketData {
37 namespace Implementation {
38 
39 using namespace std;
40 using namespace Networking;
41 using namespace HandlerCore::MarketData;
42 using namespace HandlerCore::Common;
43 
44 
46  : base ("EMDS", nullptr, settings.maxPacketSize, settings.licenseDirectory)
47  , settings_(settings)
48 {
49  initLogger(settings);
50 
51  forceLog (ONIXS_LOG_INFO[this] << "OnixS Eurex EMDS Handler, version " << OnixS::Eurex::MarketData::Implementation::version() << ", " << os());
52  forceLog (ONIXS_LOG_INFO[this] << settings.toString());
53 
54  try
55  {
56  listenerHolder_.reset(new ListenerHolder(this, logger_.get()));
57 
58  string xmlTemplates = System::File::readTextFile(settings.fastTemplatesFile);
59  fixDecoder_.reset(new FixDecoder(xmlTemplates, false, InputDataTraits::CompleteMessagesOnly));
60 
61  settlementFeedPacketProcessor.reset(
63  settings_.lostPacketWaitTime,
64  settings.heartbeatInterval,
66  lock()
67  )
68  );
69  settlementFeedPacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onSettlementPacketData, this, _1, _2, _3, _4));
70  settlementFeedPacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onSettlementPacketGap, this));
71  settlementFeedPacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onSettlementSenderCompIdChange, this, _1, _2, _3, _4));
72  settlementFeedPacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onSettlementInactivity, this));
73  settlementFeedPacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onSettlementFeedInactivity, this, _1));
74 
75  openInterestPacketProcessor.reset(
77  settings_.lostPacketWaitTime,
78  settings.heartbeatInterval,
80  lock())
81  );
82  openInterestPacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onOpenInterestPacketData, this, _1, _2, _3, _4));
83  openInterestPacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onOpenInterestPacketGap, this));
84  openInterestPacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onOpenInterestSenderCompIdChange, this, _1, _2, _3, _4));
85  openInterestPacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onOpenInterestInactivity, this));
86  openInterestPacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onOpenInterestFeedInactivity, this, _1));
87 
88  otcTradePacketProcessor.reset(
90  settings_.lostPacketWaitTime,
91  settings.heartbeatInterval,
93  lock())
94  );
95  otcTradePacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onOtcTradePacketData, this, _1, _2, _3, _4));
96  otcTradePacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onOtcTradePacketGap, this));
97  otcTradePacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onOtcTradeSenderCompIdChange, this, _1, _2, _3, _4));
98  otcTradePacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onOtcTradeInactivity, this));
99  otcTradePacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onOtcTradeFeedInactivity, this, _1));
100 
101  exchangeTradePacketProcessor.reset(
103  settings_.lostPacketWaitTime,
104  settings.heartbeatInterval,
106  lock())
107  );
108  exchangeTradePacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onExchangeTradePacketData, this, _1, _2, _3, _4));
109  exchangeTradePacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onExchangeTradePacketGap, this));
110  exchangeTradePacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onExchangeTradeSenderCompIdChange, this, _1, _2, _3, _4));
111  exchangeTradePacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onExchangeTradeInactivity, this));
112  exchangeTradePacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onExchangeTradeFeedInactivity, this, _1));
113  }
114  catch(const std::exception& ex)
115  {
116  log(ONIXS_LOG_ERROR[this] << "Exception in EmdsHandlerImpl constructor: " << ex.what());
117  throw;
118  }
119  catch(...)
120  {
121  log(ONIXS_LOG_WARN [this] << "Unknown exception in EmdsHandlerImpl constructor");
122  throw;
123  }
124 }
125 
127 {
128  stop();
129 }
130 
132 {
133  setFeedEngine(FeedEngineImpl::getFeedEngine(feedEngine));
134 
135  forceLog(ONIXS_LOG_INFO[this] << "Bound FeedEngine");
136  forceLog(ONIXS_LOG_INFO[this] << feedEngine.settings().toString());
137  forceLog(ONIXS_LOG_INFO[this] << feedEngine.info());
138 }
139 
141 {
142  base::start();
143 }
144 
146 {
147  LogPlayerLinks links;
148  links.push_back(LogPlayerLink("Settlement", settlementFeedPacketProcessor.get()));
149  links.push_back(LogPlayerLink("Interest", openInterestPacketProcessor.get()));
150  links.push_back(LogPlayerLink("OtcTrade", otcTradePacketProcessor.get()));
151  links.push_back(LogPlayerLink("ExchangeTrade", exchangeTradePacketProcessor.get()));
152 
153  BaseHandlerImp::start(options, links, "EMDS");
154 }
155 
156 //void EmdsHandlerImpl::onPrepareLogPlayer(LogPlayer* logPlayer)
157 //{
158 // settlementFeedPacketProcessor->reset();
159 // openInterestPacketProcessor->reset();
160 // otcTradePacketProcessor->reset();
161 // exchangeTradePacketProcessor->reset();
162 //}
163 
165 {
166  base::stop();
167 }
168 
169 void EmdsHandlerImpl::onStarted()
170 {
171  settlementFeedPacketProcessor->reset();
172  openInterestPacketProcessor->reset();
173  otcTradePacketProcessor->reset();
174  exchangeTradePacketProcessor->reset();
175 
176  FE::ConnectionInfo connectionA;
177  FE::ConnectionInfo connectionB;
178 
179  connectionA.role(FE::NetFeedRole::A);
180  connectionB.role(FE::NetFeedRole::B);
181 
182  FE::MulticastFeedLayout::Enum layout = FE::MulticastFeedLayout::Both;
183  if (!settings_.useFeedA)
184  layout = FE::MulticastFeedLayout::BOnly;
185  else if (!settings_.useFeedB)
186  layout = FE::MulticastFeedLayout::AOnly;
187 
188  FE::NifsByFeedRole nifs;
189  nifs.A(settings_.networkInterfaceA.empty() ? settings_.networkInterface : settings_.networkInterfaceA);
190  nifs.B(settings_.networkInterfaceB.empty() ? settings_.networkInterface : settings_.networkInterfaceB);
191 
192  connectionA.type(FE::NetFeedType::Incremental);
193  connectionB.type(FE::NetFeedType::Incremental);
194 
195  ///
196  connectionA.id("SettlementA");
197  connectionB.id("SettlementB");
198 
199  connectionA.ip(settings_.settlementFeedDescriptor.serviceA.address);
200  connectionB.ip(settings_.settlementFeedDescriptor.serviceB.address);
201 
202  connectionA.port(settings_.settlementFeedDescriptor.serviceA.port);
203  connectionB.port(settings_.settlementFeedDescriptor.serviceB.port);
204 
205  FE::ConnectionInfoList settlementConnections;
206  settlementConnections.push_back(connectionA);
207  settlementConnections.push_back(connectionB);
208 
209  settlementFeeds_.reset(
210  constructCluster(
211  FE::NetFeedType::Incremental,
212  layout,
213  settlementConnections,
214  nifs,
215  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
216  *settlementFeedPacketProcessor
217  )
218  );
219  settlementFeeds_->connect();
220 
221  ///
222  connectionA.id("OpenInterestA");
223  connectionB.id("OpenInterestB");
224 
225  connectionA.ip(settings_.openInterestFeedDescriptor.serviceA.address);
226  connectionB.ip(settings_.openInterestFeedDescriptor.serviceB.address);
227 
228  connectionA.port(settings_.openInterestFeedDescriptor.serviceA.port);
229  connectionB.port(settings_.openInterestFeedDescriptor.serviceB.port);
230 
231  FE::ConnectionInfoList openInterestConnections;
232  openInterestConnections.push_back(connectionA);
233  openInterestConnections.push_back(connectionB);
234 
235  openInterestFeeds_.reset(
236  constructCluster(
237  FE::NetFeedType::Incremental,
238  layout,
239  openInterestConnections,
240  nifs,
241  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
242  *openInterestPacketProcessor
243  )
244  );
245  openInterestFeeds_->connect();
246 
247  /// OtcTrade removed in 4.0.0.0
248 
249  ///
250  connectionA.id("ExchangeTradeA");
251  connectionB.id("ExchangeTradeB");
252 
253  connectionA.ip(settings_.exchangeTradeFeedDescriptor.serviceA.address);
254  connectionB.ip(settings_.exchangeTradeFeedDescriptor.serviceB.address);
255 
256  connectionA.port(settings_.exchangeTradeFeedDescriptor.serviceA.port);
257  connectionB.port(settings_.exchangeTradeFeedDescriptor.serviceB.port);
258 
259  FE::ConnectionInfoList ExchangeTradeConnections;
260  ExchangeTradeConnections.push_back(connectionA);
261  ExchangeTradeConnections.push_back(connectionB);
262 
263  exchangeTradeFeeds_.reset(
264  constructCluster(
265  FE::NetFeedType::Incremental,
266  layout,
267  ExchangeTradeConnections,
268  nifs,
269  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
270  *exchangeTradePacketProcessor
271  )
272  );
273  exchangeTradeFeeds_->connect();
274 }
275 
276 void EmdsHandlerImpl::onStopped()
277 {
278  settlementFeeds_.reset();
279  openInterestFeeds_.reset();
280  otcTradeFeeds_.reset();
281  exchangeTradeFeeds_.reset();
282 }
283 
284 void EmdsHandlerImpl::onPrepareFeeds()
285 {
286 }
287 
288 void EmdsHandlerImpl::onSettlementPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
289 {
290  static Logging::LogFacility logFacility("Settlement", this, Logging::LOG_LEVEL_DEBUG);
291 
292  if(settings_.logSettings & LogSettings::LogPackets)
293  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
294 
295  onPacketData(logFacility, dataSource, data, length, headerSize);
296 }
297 
298 void EmdsHandlerImpl::onSettlementPacketGap()
299 {
300  static Logging::LogFacility logFacility("Settlement", this, Logging::LOG_LEVEL_DEBUG);
301  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
302 }
303 
304 void EmdsHandlerImpl::onSettlementSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
305 {
306  static Logging::LogFacility logFacility("Settlement", this, Logging::LOG_LEVEL_DEBUG);
307  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
308 }
309 
310 void EmdsHandlerImpl::onOpenInterestPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
311 {
312  static Logging::LogFacility logFacility("Interest", this, Logging::LOG_LEVEL_DEBUG);
313 
314  if(settings_.logSettings & LogSettings::LogPackets)
315  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
316 
317  onPacketData(logFacility, dataSource, data, length, headerSize);
318 }
319 
320 void EmdsHandlerImpl::onOpenInterestPacketGap()
321 {
322  static Logging::LogFacility logFacility("Interest", this, Logging::LOG_LEVEL_DEBUG);
323  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
324 }
325 
326 void EmdsHandlerImpl::onOpenInterestSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
327 {
328  static Logging::LogFacility logFacility("Interest", this, Logging::LOG_LEVEL_DEBUG);
329  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
330 }
331 
332 void EmdsHandlerImpl::onOtcTradePacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
333 {
334  static Logging::LogFacility logFacility("OtcTrade", this, Logging::LOG_LEVEL_DEBUG);
335 
336  if(settings_.logSettings & LogSettings::LogPackets)
337  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
338 
339  onPacketData(logFacility, dataSource, data, length, headerSize);
340 }
341 
342 void EmdsHandlerImpl::onOtcTradePacketGap()
343 {
344  static Logging::LogFacility logFacility("OtcTrade", this, Logging::LOG_LEVEL_DEBUG);
345  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
346 }
347 
348 void EmdsHandlerImpl::onOtcTradeSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
349 {
350  static Logging::LogFacility logFacility("OtcTrade", this, Logging::LOG_LEVEL_DEBUG);
351  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
352 }
353 
354 void EmdsHandlerImpl::onExchangeTradePacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
355 {
356  static Logging::LogFacility logFacility("ExchangeTrade", this, Logging::LOG_LEVEL_DEBUG);
357 
358  if(settings_.logSettings & LogSettings::LogPackets)
359  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
360 
361  onPacketData(logFacility, dataSource, data, length, headerSize);
362 }
363 
364 void EmdsHandlerImpl::onExchangeTradePacketGap()
365 {
366  static Logging::LogFacility logFacility("ExchangeTrade", this, Logging::LOG_LEVEL_DEBUG);
367  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
368 }
369 
370 void EmdsHandlerImpl::onExchangeTradeSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
371 {
372  static Logging::LogFacility logFacility("ExchangeTrade", this, Logging::LOG_LEVEL_DEBUG);
373  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
374 }
375 
376 void EmdsHandlerImpl::onPacketData(const Logging::LogFacility& logFacility, DataSource& dataSource, const char* data, size_t length, size_t headerSize)
377 {
378  try
379  {
380  PacketHeaderForEmds* packetHeader = (PacketHeaderForEmds*)(data);
381  const UInt32 packetHeaderTemplateId = PacketHeaderHelper::getTemplateId(packetHeader);
382 
383  if(packetHeaderTemplateId != PacketHeaderForEmds::TemplateIdValue)
384  {
385  throw OperationException (
386  BOOST_CURRENT_FUNCTION,
387  format("Wrong packet header id: expected %d, but received %d", PacketHeaderForEmds::TemplateIdValue, packetHeaderTemplateId).c_str()
388  );
389  }
390 
391  size_t offset = headerSize;
392  while(offset < length)
393  {
394  size_t chunkLength;
395  unsigned int templateId;
396  const CORE::Message* msg = fixDecoder_->decode(Util::ByteBuffer(data + offset, length - offset), &chunkLength, &templateId);
397 
398  if(chunkLength == 0)
399  break;
400 
401  offset += chunkLength;
402 
403  if( templateId == 120 ) //FastReset
404  {
405  fixDecoder_->resetDictionaries();
406  continue;
407  }
408 
409  dataSource.packetMessageSeqNum++;
410  dataSource.isLastInPacket = offset >= length;
411 
412  processMessage(logFacility, dataSource, *msg, templateId);
413  }
414  }
415  catch(const std::exception& ex)
416  {
417  log(ONIXS_LOG_WARN [&logFacility] << "Exception while message decoding: " << ex.what());
418  }
419  catch(...)
420  {
421  log(ONIXS_LOG_WARN [&logFacility] << "Unknown exception while message decoding");
422  }
423 }
424 
425 void EmdsHandlerImpl::processMessage(const Logging::LogFacility& logFacility, const DataSource& dataSource, const CORE::Message& msg, unsigned int templateId)
426 {
427  log(ONIXS_LOG_DEBUG[&logFacility] << msg);
428 
429  const CORE::FieldValue& marketSegmentIdValue = msg[Tags::MarketSegmentID];
430 
431  if( CORE::DataTypes::Null != marketSegmentIdValue.type())
432  {
433  //filter by market segment id
434  UInt32 marketSegmentId;
435  if( !marketSegmentIdfilters_.empty() && marketSegmentIdValue.convertTo(marketSegmentId) && marketSegmentIdfilters_.end() == marketSegmentIdfilters_.find(marketSegmentId))
436  return;
437  }
438 
439  switch(templateId)
440  {
441  case 171:
442  {
443  const OpenInterestWrapper wrapper(msg);
444  listenerHolder_->invokeOpenInterest(&wrapper, dataSource);
445  break;
446  }
447  case 172:
448  {
449  const SettlementWrapper wrapper(msg);
450  listenerHolder_->invokeSettlement(&wrapper, dataSource);
451  break;
452  }
453  case 175:
454  {
455  const ExchangeTradeWrapper wrapper(msg);
456  listenerHolder_->invokeExchangeTrade(&wrapper, dataSource);
457  break;
458  }
459  case 152:
460  {
461  unsigned int mDCount = 0;
462  msg[Tags::MDReportCount].convertTo(mDCount);
463 
464  const CORE::FieldValue& mDReportEventValue = msg[Tags::MDReportEvent];
465  unsigned int mDReportEvent = 0;
466  if( CORE::DataTypes::Null != mDReportEventValue.type())
467  mDReportEventValue.convertTo(mDReportEvent);
468 
469  switch(mDReportEvent)
470  {
471  //case 2: listenerHolder_->invokeOtcTradeReplayCycleStart(mDCount, dataSource); break;
472  //case 3: listenerHolder_->invokeOtcTradeReplayCycleEnd(dataSource); break;
473  case 4: listenerHolder_->invokeExchangeTradeReplayCycleStart(mDCount, dataSource); break;
474  case 5: listenerHolder_->invokeExchangeTradeReplayCycleEnd(dataSource); break;
475  case 6: listenerHolder_->invokeOpenInterestReplayCycleStart(mDCount, dataSource); break;
476  case 7: listenerHolder_->invokeOpenInterestReplayCycleEnd(dataSource); break;
477  case 8: listenerHolder_->invokeSettlementReplayCycleStart(mDCount, dataSource); break;
478  case 9: listenerHolder_->invokeSettlementReplayCycleEnd(dataSource); break;
479  default: log(ONIXS_LOG_WARN[&logFacility] << "Unknown mDReportEvent: " << mDReportEvent);
480  }
481  }
482  break;
483  case 170: //heartbeat
484  break;
485  default:
486  log(ONIXS_LOG_WARN[&logFacility] << "Unknown message template id: " << templateId);
487  }
488 }
489 
491 {
492  Guard guard(marketSegmentIdfiltersLock_);
493  marketSegmentIdfilters_ = filters;
494 }
495 
497 {
498  Guard guard(marketSegmentIdfiltersLock_);
499  marketSegmentIdfilters_.clear();
500 }
501 
503 {
504  Guard guard(securityIdFiltersLock_);
505  securityIdFilters_ = filters;
506 }
507 
509 {
510  Guard guard(securityIdFiltersLock_);
511  securityIdFilters_.clear();
512 }
513 
514 void EmdsHandlerImpl::onSettlementInactivity()
515 {
516  listenerHolder_->invokeWarning("No settlement data");
517 }
518 
519 void EmdsHandlerImpl::onSettlementFeedInactivity(FE::NetFeedRole::Enum id)
520 {
521  listenerHolder_->invokeWarning("No settlement data on " + FE::toStr(id));
522 }
523 
524 void EmdsHandlerImpl::onOpenInterestInactivity()
525 {
526  listenerHolder_->invokeWarning("No open interest data");
527 }
528 
529 void EmdsHandlerImpl::onOpenInterestFeedInactivity(FE::NetFeedRole::Enum id)
530 {
531  listenerHolder_->invokeWarning("No open interest data on " + FE::toStr(id));
532 }
533 
534 void EmdsHandlerImpl::onOtcTradeInactivity()
535 {
536  listenerHolder_->invokeWarning("No OTC trade data");
537 }
538 
539 void EmdsHandlerImpl::onOtcTradeFeedInactivity(FE::NetFeedRole::Enum id)
540 {
541  listenerHolder_->invokeWarning("No OTC trade data on " + FE::toStr(id));
542 }
543 
544 void EmdsHandlerImpl::onExchangeTradeInactivity()
545 {
546  listenerHolder_->invokeWarning("No on-exchange trade data");
547 }
548 
549 void EmdsHandlerImpl::onExchangeTradeFeedInactivity(FE::NetFeedRole::Enum id)
550 {
551  listenerHolder_->invokeWarning("No on-exchange trade data on " + FE::toStr(id));
552 }
553 
554 void EmdsHandlerImpl::initLogger(const HandlerSettings &settings)
555 {
556  logger_.reset(new HandlerLogger(convertLogSettings(settings)));
557 
558  setLogger(&logger_->getLogger());
559 }
560 
561 void EmdsHandlerImpl::onStateChanged(HandlerCore::MarketData::MulticastFeedHandler::State::Enum newState)
562 {
563  listenerHolder_->invokeStateChanged(static_cast<HandlerState::Enum>(newState));
564 }
565 
566 
567 }}}}
568 
unsigned int SequenceNumber
Alias for sequence numbers.
std::string toString() const
Returns the string representation.
Manages processing machinery for market data received from feeds.
Definition: FeedEngine.h:287
void setMarketSegmentIdFilters(const MarketSegmentIdFilters &filters)
FeedDescriptor settlementFeedDescriptor
Sets Settlement prices feed technical configuration.
FeedDescriptor openInterestFeedDescriptor
Sets Open Interest prices feed technical configuration.
ServiceDescriptor serviceA
Service A.
std::string info() const
Feed engine info.
Definition: FeedEngine.cpp:64
EMDI handler configuration settings.
STL namespace.
ServiceDescriptor serviceB
Service B.
static UInt32 getTemplateId(TPacketHeader *packetHeader)
Definition: PacketHeader.h:105
Handler base configuration settings.
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
FilteringTraits::MarketSegmentIdFilters MarketSegmentIdFilters
Defines params which affect logs replay.
Definition: Replay.h:74
SequenceNumber packetSeqNum
Packet sequence number.
Definition: Defines.h:66
static HandlerCore::MarketData::FE::MulticastFeedEngine * getFeedEngine(FeedEngine &feedEngine)
void setSecurityIdFilters(const SecurityIdFilters &filters)
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
const FeedEngineSettings & settings() const
Settings used define behavior of given instance.
Definition: FeedEngine.cpp:59
Log binary data of received packets, applied only for Info log level and below.
Definition: LogSettings.h:75
FeedDescriptor exchangeTradeFeedDescriptor
Sets On-exchange trade prices feed technical configuration.
EmdsHandlerImpl(const EmdsHandlerSettings &settings)
Initialize new instance.
FilteringTraits::SecurityIdFilters SecurityIdFilters
bool isLastInPacket
Indicates whether a message is last in the packet.
Definition: Defines.h:72
SequenceNumber packetMessageSeqNum
Packet message number.
Definition: Defines.h:69
std::string toString() const
Returns the string representation.