OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.0.3
API documentation
EmdsHandlerImpl.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 #include <system/File.h>
20 #include <util/String.h>
21 
23 
24 #include "EmdsHandlerImpl.h"
25 #include "PacketHeader.h"
26 #include "Formatting.h"
27 #include "MessageOperator.h"
28 #include "FeedEngineImpl.h"
29 #include "Version.h"
30 
31 
32 namespace OnixS {
33 namespace Eurex {
34 namespace MarketData {
35 namespace Implementation {
36 
37 using namespace std;
38 using namespace Networking;
39 using namespace HandlerCore::MarketData;
40 using namespace HandlerCore::Common;
41 
42 
44  : base ("EMDS", NULL, settings.maxPacketSize, settings.licenseDirectory)
45  , settings_(settings)
46 {
47  initLogger(settings);
48 
49  forceLog (ONIXS_LOG_INFO[this] << "OnixS Eurex EMDS Handler, version " << OnixS::Eurex::MarketData::Implementation::version() << ", " << os());
50  forceLog (ONIXS_LOG_INFO[this] << settings.toString());
51 
52  try
53  {
54  listenerHolder_.reset(new ListenerHolder(this, logger_.get()));
55 
56  string xmlTemplates = System::File::readTextFile(settings.fastTemplatesFile);
57  fixDecoder_.reset(new FixDecoder(xmlTemplates, false, InputDataTraits::CompleteMessagesOnly));
58 
59  settlementFeedPacketProcessor.reset(
61  settings_.maxPacketWaitingTime,
62  settings.heartbeatInterval,
64  lock()
65  )
66  );
67  settlementFeedPacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onSettlementPacketData, this, _1, _2, _3, _4));
68  settlementFeedPacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onSettlementPacketGap, this));
69  settlementFeedPacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onSettlementSenderCompIdChange, this, _1, _2, _3, _4));
70  settlementFeedPacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onSettlementInactivity, this));
71  settlementFeedPacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onSettlementFeedInactivity, this, _1));
72 
73  openInterestPacketProcessor.reset(
75  settings_.maxPacketWaitingTime,
76  settings.heartbeatInterval,
78  lock())
79  );
80  openInterestPacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onOpenInterestPacketData, this, _1, _2, _3, _4));
81  openInterestPacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onOpenInterestPacketGap, this));
82  openInterestPacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onOpenInterestSenderCompIdChange, this, _1, _2, _3, _4));
83  openInterestPacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onOpenInterestInactivity, this));
84  openInterestPacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onOpenInterestFeedInactivity, this, _1));
85 
86  otcTradePacketProcessor.reset(
88  settings_.maxPacketWaitingTime,
89  settings.heartbeatInterval,
91  lock())
92  );
93  otcTradePacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onOtcTradePacketData, this, _1, _2, _3, _4));
94  otcTradePacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onOtcTradePacketGap, this));
95  otcTradePacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onOtcTradeSenderCompIdChange, this, _1, _2, _3, _4));
96  otcTradePacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onOtcTradeInactivity, this));
97  otcTradePacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onOtcTradeFeedInactivity, this, _1));
98 
99  exchangeTradePacketProcessor.reset(
101  settings_.maxPacketWaitingTime,
102  settings.heartbeatInterval,
104  lock())
105  );
106  exchangeTradePacketProcessor->subscribeOnPacketData(boost::bind(&EmdsHandlerImpl::onExchangeTradePacketData, this, _1, _2, _3, _4));
107  exchangeTradePacketProcessor->subscribeOnGap(boost::bind(&EmdsHandlerImpl::onExchangeTradePacketGap, this));
108  exchangeTradePacketProcessor->subscribeOnSenderCompIdChange(boost::bind(&EmdsHandlerImpl::onExchangeTradeSenderCompIdChange, this, _1, _2, _3, _4));
109  exchangeTradePacketProcessor->subscribeOnInactivity(boost::bind(&EmdsHandlerImpl::onExchangeTradeInactivity, this));
110  exchangeTradePacketProcessor->subscribeOnFeedInactivity(boost::bind(&EmdsHandlerImpl::onExchangeTradeFeedInactivity, this, _1));
111  }
112  catch(const std::exception& ex)
113  {
114  log(ONIXS_LOG_ERROR[this] << "Exception in EmdsHandlerImpl constructor: " << ex.what());
115  throw;
116  }
117  catch(...)
118  {
119  log(ONIXS_LOG_WARN [this] << "Unknown exception in EmdsHandlerImpl constructor");
120  throw;
121  }
122 }
123 
125 {
126  stop();
127 }
128 
129 void EmdsHandlerImpl::commonLog (int logLevel, const char * msg, size_t length)
130 {
131  if (logLevel < OnixS::Logging::MIN_LOG_LEVEL || logLevel > OnixS::Logging::MAX_LOG_LEVEL)
132  throw OperationException (
133  BOOST_CURRENT_FUNCTION,
134  OnixS::Util::format ("An error occurred during log operation. Invalid log level (logLevel=%d).", logLevel).c_str ());
135 
136  log (OnixS::Logging::LogMsg (OnixS::Logging::LogLevel (logLevel))["USER"] << OnixS::Util::ValuePtr (msg, length));
137 }
138 
140 {
141  setFeedEngine(FeedEngineImpl::getFeedEngine(feedEngine));
142 
143  forceLog(ONIXS_LOG_INFO[this] << "Bound FeedEngine");
144  forceLog(ONIXS_LOG_INFO[this] << feedEngine.settings().toString());
145  forceLog(ONIXS_LOG_INFO[this] << feedEngine.info());
146 }
147 
149 {
150  base::start();
151 }
152 
154 {
155  LogPlayerLinks links;
156  links.push_back(LogPlayerLink("Settlement", settlementFeedPacketProcessor.get()));
157  links.push_back(LogPlayerLink("Interest", openInterestPacketProcessor.get()));
158  links.push_back(LogPlayerLink("OtcTrade", otcTradePacketProcessor.get()));
159  links.push_back(LogPlayerLink("ExchangeTrade", exchangeTradePacketProcessor.get()));
160 
161  BaseHandlerImp::start(options, links, "EMDS");
162 }
163 
164 //void EmdsHandlerImpl::onPrepareLogPlayer(LogPlayer* logPlayer)
165 //{
166 // settlementFeedPacketProcessor->reset();
167 // openInterestPacketProcessor->reset();
168 // otcTradePacketProcessor->reset();
169 // exchangeTradePacketProcessor->reset();
170 //}
171 
173 {
174  base::stop();
175 }
176 
177 void EmdsHandlerImpl::onStarted()
178 {
179  settlementFeedPacketProcessor->reset();
180  openInterestPacketProcessor->reset();
181  otcTradePacketProcessor->reset();
182  exchangeTradePacketProcessor->reset();
183 
184  FE::ConnectionInfo connectionA;
185  FE::ConnectionInfo connectionB;
186 
187  connectionA.role(FE::NetFeedRole::A);
188  connectionB.role(FE::NetFeedRole::B);
189 
190  FE::MulticastFeedLayout::Enum layout = FE::MulticastFeedLayout::Both;
191  if (!settings_.useFeedA)
192  layout = FE::MulticastFeedLayout::BOnly;
193  else if (!settings_.useFeedB)
194  layout = FE::MulticastFeedLayout::AOnly;
195 
196  FE::NifsByFeedRole nifs;
197  nifs.A(settings_.networkInterfaceA.empty() ? settings_.networkInterface : settings_.networkInterfaceA);
198  nifs.B(settings_.networkInterfaceB.empty() ? settings_.networkInterface : settings_.networkInterfaceB);
199 
200  connectionA.type(FE::NetFeedType::Incremental);
201  connectionB.type(FE::NetFeedType::Incremental);
202 
203  ///
204  connectionA.id("SettlementA");
205  connectionB.id("SettlementB");
206 
207  connectionA.ip(settings_.settlementFeedDescriptor.serviceA.address);
208  connectionB.ip(settings_.settlementFeedDescriptor.serviceB.address);
209 
210  connectionA.port(settings_.settlementFeedDescriptor.serviceA.port);
211  connectionB.port(settings_.settlementFeedDescriptor.serviceB.port);
212 
213  FE::ConnectionInfoList settlementConnections;
214  settlementConnections.push_back(connectionA);
215  settlementConnections.push_back(connectionB);
216 
217  settlementFeeds_.reset(
218  constructCluster(
219  FE::NetFeedType::Incremental,
220  layout,
221  settlementConnections,
222  nifs,
223  FE::TimeSpan(0, 0, 0, settings_.maxPacketWaitingTime * FE::TimeTraits::nanosecondsPerMillisecond()),
224  *settlementFeedPacketProcessor
225  )
226  );
227  settlementFeeds_->connect();
228 
229  ///
230  connectionA.id("OpenInterestA");
231  connectionB.id("OpenInterestB");
232 
233  connectionA.ip(settings_.openInterestFeedDescriptor.serviceA.address);
234  connectionB.ip(settings_.openInterestFeedDescriptor.serviceB.address);
235 
236  connectionA.port(settings_.openInterestFeedDescriptor.serviceA.port);
237  connectionB.port(settings_.openInterestFeedDescriptor.serviceB.port);
238 
239  FE::ConnectionInfoList openInterestConnections;
240  openInterestConnections.push_back(connectionA);
241  openInterestConnections.push_back(connectionB);
242 
243  openInterestFeeds_.reset(
244  constructCluster(
245  FE::NetFeedType::Incremental,
246  layout,
247  openInterestConnections,
248  nifs,
249  FE::TimeSpan(0, 0, 0, settings_.maxPacketWaitingTime * FE::TimeTraits::nanosecondsPerMillisecond()),
250  *openInterestPacketProcessor
251  )
252  );
253  openInterestFeeds_->connect();
254 
255  /// OtcTrade removed in 4.0.0.0
256 
257  ///
258  connectionA.id("ExchangeTradeA");
259  connectionB.id("ExchangeTradeB");
260 
261  connectionA.ip(settings_.exchangeTradeFeedDescriptor.serviceA.address);
262  connectionB.ip(settings_.exchangeTradeFeedDescriptor.serviceB.address);
263 
264  connectionA.port(settings_.exchangeTradeFeedDescriptor.serviceA.port);
265  connectionB.port(settings_.exchangeTradeFeedDescriptor.serviceB.port);
266 
267  FE::ConnectionInfoList ExchangeTradeConnections;
268  ExchangeTradeConnections.push_back(connectionA);
269  ExchangeTradeConnections.push_back(connectionB);
270 
271  exchangeTradeFeeds_.reset(
272  constructCluster(
273  FE::NetFeedType::Incremental,
274  layout,
275  ExchangeTradeConnections,
276  nifs,
277  FE::TimeSpan(0, 0, 0, settings_.maxPacketWaitingTime * FE::TimeTraits::nanosecondsPerMillisecond()),
278  *exchangeTradePacketProcessor
279  )
280  );
281  exchangeTradeFeeds_->connect();
282 }
283 
284 void EmdsHandlerImpl::onStopped()
285 {
286  settlementFeeds_.reset();
287  openInterestFeeds_.reset();
288  otcTradeFeeds_.reset();
289  exchangeTradeFeeds_.reset();
290 }
291 
292 void EmdsHandlerImpl::onPrepareFeeds()
293 {
294 }
295 
296 void EmdsHandlerImpl::onSettlementPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
297 {
298  static Logging::LogFacility logFacility("Settlement", this, Logging::LOG_LEVEL_DEBUG);
299 
300  if(settings_.logSettings & LogSettings::LogPackets)
301  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
302 
303  onPacketData(logFacility, dataSource, data, length, headerSize);
304 }
305 
306 void EmdsHandlerImpl::onSettlementPacketGap()
307 {
308  static Logging::LogFacility logFacility("Settlement", this, Logging::LOG_LEVEL_DEBUG);
309  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
310 }
311 
312 void EmdsHandlerImpl::onSettlementSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
313 {
314  static Logging::LogFacility logFacility("Settlement", this, Logging::LOG_LEVEL_DEBUG);
315  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
316 }
317 
318 void EmdsHandlerImpl::onOpenInterestPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
319 {
320  static Logging::LogFacility logFacility("Interest", this, Logging::LOG_LEVEL_DEBUG);
321 
322  if(settings_.logSettings & LogSettings::LogPackets)
323  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
324 
325  onPacketData(logFacility, dataSource, data, length, headerSize);
326 }
327 
328 void EmdsHandlerImpl::onOpenInterestPacketGap()
329 {
330  static Logging::LogFacility logFacility("Interest", this, Logging::LOG_LEVEL_DEBUG);
331  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
332 }
333 
334 void EmdsHandlerImpl::onOpenInterestSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
335 {
336  static Logging::LogFacility logFacility("Interest", this, Logging::LOG_LEVEL_DEBUG);
337  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
338 }
339 
340 void EmdsHandlerImpl::onOtcTradePacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
341 {
342  static Logging::LogFacility logFacility("OtcTrade", this, Logging::LOG_LEVEL_DEBUG);
343 
344  if(settings_.logSettings & LogSettings::LogPackets)
345  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
346 
347  onPacketData(logFacility, dataSource, data, length, headerSize);
348 }
349 
350 void EmdsHandlerImpl::onOtcTradePacketGap()
351 {
352  static Logging::LogFacility logFacility("OtcTrade", this, Logging::LOG_LEVEL_DEBUG);
353  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
354 }
355 
356 void EmdsHandlerImpl::onOtcTradeSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
357 {
358  static Logging::LogFacility logFacility("OtcTrade", this, Logging::LOG_LEVEL_DEBUG);
359  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
360 }
361 
362 void EmdsHandlerImpl::onExchangeTradePacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
363 {
364  static Logging::LogFacility logFacility("ExchangeTrade", this, Logging::LOG_LEVEL_DEBUG);
365 
366  if(settings_.logSettings & LogSettings::LogPackets)
367  log( ONIXS_LOG_INFO[&logFacility] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
368 
369  onPacketData(logFacility, dataSource, data, length, headerSize);
370 }
371 
372 void EmdsHandlerImpl::onExchangeTradePacketGap()
373 {
374  static Logging::LogFacility logFacility("ExchangeTrade", this, Logging::LOG_LEVEL_DEBUG);
375  log( ONIXS_LOG_DEBUG[&logFacility] << "Packet gap");
376 }
377 
378 void EmdsHandlerImpl::onExchangeTradeSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
379 {
380  static Logging::LogFacility logFacility("ExchangeTrade", this, Logging::LOG_LEVEL_DEBUG);
381  log( ONIXS_LOG_DEBUG[&logFacility] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum "<< lastSeqNum << "->" << newSeqNum);
382 }
383 
384 void EmdsHandlerImpl::onPacketData(const Logging::LogFacility& logFacility, DataSource& dataSource, const char* data, size_t length, size_t headerSize)
385 {
386  try
387  {
388  PacketHeaderForEmds* packetHeader = (PacketHeaderForEmds*)(data);
389  const UInt32 packetHeaderTemplateId = PacketHeaderHelper::getTemplateId(packetHeader);
390 
391  if(packetHeaderTemplateId != PacketHeaderForEmds::TemplateIdValue)
392  {
393  throw OperationException (
394  BOOST_CURRENT_FUNCTION,
395  format("Wrong packet header id: expected %d, but received %d", PacketHeaderForEmds::TemplateIdValue, packetHeaderTemplateId).c_str()
396  );
397  }
398 
399  size_t offset = headerSize;
400  while(offset < length)
401  {
402  size_t chunkLength;
403  unsigned int templateId;
404  const CORE::Message* msg = fixDecoder_->decode(Util::ByteBuffer(data + offset, length - offset), &chunkLength, &templateId);
405 
406  if(chunkLength == 0)
407  break;
408 
409  offset += chunkLength;
410 
411  if( templateId == 120 ) //FastReset
412  {
413  fixDecoder_->resetDictionaries();
414  continue;
415  }
416 
417  dataSource.packetMessageSeqNum++;
418  dataSource.isLastInPacket = !(offset < length);
419 
420  processMessage(logFacility, dataSource, *msg, templateId);
421  }
422  }
423  catch(const std::exception& ex)
424  {
425  log(ONIXS_LOG_WARN [&logFacility] << "Exception while message decoding: " << ex.what());
426  }
427  catch(...)
428  {
429  log(ONIXS_LOG_WARN [&logFacility] << "Unknown exception while message decoding");
430  }
431 }
432 
433 void EmdsHandlerImpl::processMessage(const Logging::LogFacility& logFacility, const DataSource& dataSource, const CORE::Message& msg, unsigned int templateId)
434 {
435  log(ONIXS_LOG_DEBUG[&logFacility] << msg);
436 
437  const CORE::FieldValue& marketSegmentIdValue = msg[Tags::MarketSegmentID];
438 
439  if( CORE::DataTypes::Null != marketSegmentIdValue.type())
440  {
441  //filter by market segment id
442  UInt32 marketSegmentId;
443  if( !marketSegmentIdfilters_.empty() && marketSegmentIdValue.convertTo(marketSegmentId) && marketSegmentIdfilters_.end() == marketSegmentIdfilters_.find(marketSegmentId))
444  return;
445  }
446 
447  switch(templateId)
448  {
449  case 171:
450  {
451  const OpenInterestWrapper wrapper(msg);
452  listenerHolder_->invokeOpenInterest(&wrapper, dataSource);
453  break;
454  }
455  case 172:
456  {
457  const SettlementWrapper wrapper(msg);
458  listenerHolder_->invokeSettlement(&wrapper, dataSource);
459  break;
460  }
461  case 175:
462  {
463  const ExchangeTradeWrapper wrapper(msg);
464  listenerHolder_->invokeExchangeTrade(&wrapper, dataSource);
465  break;
466  }
467  case 152:
468  {
469  unsigned int mDCount = 0;
470  msg[Tags::MDReportCount].convertTo(mDCount);
471 
472  const CORE::FieldValue& mDReportEventValue = msg[Tags::MDReportEvent];
473  unsigned int mDReportEvent = 0;
474  if( CORE::DataTypes::Null != mDReportEventValue.type())
475  mDReportEventValue.convertTo(mDReportEvent);
476 
477  switch(mDReportEvent)
478  {
479  //case 2: listenerHolder_->invokeOtcTradeReplayCycleStart(mDCount, dataSource); break;
480  //case 3: listenerHolder_->invokeOtcTradeReplayCycleEnd(dataSource); break;
481  case 4: listenerHolder_->invokeExchangeTradeReplayCycleStart(mDCount, dataSource); break;
482  case 5: listenerHolder_->invokeExchangeTradeReplayCycleEnd(dataSource); break;
483  case 6: listenerHolder_->invokeOpenInterestReplayCycleStart(mDCount, dataSource); break;
484  case 7: listenerHolder_->invokeOpenInterestReplayCycleEnd(dataSource); break;
485  case 8: listenerHolder_->invokeSettlementReplayCycleStart(mDCount, dataSource); break;
486  case 9: listenerHolder_->invokeSettlementReplayCycleEnd(dataSource); break;
487  default: log(ONIXS_LOG_WARN[&logFacility] << "Unknown mDReportEvent: " << mDReportEvent);
488  }
489  }
490  break;
491  case 170: //heartbeat
492  break;
493  default:
494  log(ONIXS_LOG_WARN[&logFacility] << "Unknown message template id: " << templateId);
495  }
496 }
497 
499 {
500  Guard guard(marketSegmentIdfiltersLock_);
501  marketSegmentIdfilters_ = filters;
502 }
503 
505 {
506  Guard guard(marketSegmentIdfiltersLock_);
507  marketSegmentIdfilters_.clear();
508 }
509 
511 {
512  Guard guard(securityIdFiltersLock_);
513  securityIdFilters_ = filters;
514 }
515 
517 {
518  Guard guard(securityIdFiltersLock_);
519  securityIdFilters_.clear();
520 }
521 
522 void EmdsHandlerImpl::onSettlementInactivity()
523 {
524  listenerHolder_->invokeWarning("No settlement data");
525 }
526 
527 void EmdsHandlerImpl::onSettlementFeedInactivity(FE::NetFeedRole::Enum id)
528 {
529  listenerHolder_->invokeWarning("No settlement data on " + FE::toStr(id));
530 }
531 
532 void EmdsHandlerImpl::onOpenInterestInactivity()
533 {
534  listenerHolder_->invokeWarning("No open interest data");
535 }
536 
537 void EmdsHandlerImpl::onOpenInterestFeedInactivity(FE::NetFeedRole::Enum id)
538 {
539  listenerHolder_->invokeWarning("No open interest data on " + FE::toStr(id));
540 }
541 
542 void EmdsHandlerImpl::onOtcTradeInactivity()
543 {
544  listenerHolder_->invokeWarning("No OTC trade data");
545 }
546 
547 void EmdsHandlerImpl::onOtcTradeFeedInactivity(FE::NetFeedRole::Enum id)
548 {
549  listenerHolder_->invokeWarning("No OTC trade data on " + FE::toStr(id));
550 }
551 
552 void EmdsHandlerImpl::onExchangeTradeInactivity()
553 {
554  listenerHolder_->invokeWarning("No on-exchange trade data");
555 }
556 
557 void EmdsHandlerImpl::onExchangeTradeFeedInactivity(FE::NetFeedRole::Enum id)
558 {
559  listenerHolder_->invokeWarning("No on-exchange trade data on " + FE::toStr(id));
560 }
561 
562 void EmdsHandlerImpl::initLogger(const HandlerSettings &settings)
563 {
564  logger_.reset(new HandlerLogger(convertLogSettings(settings)));
565 
566  setLogger(&logger_->getLogger());
567 }
568 
569 void EmdsHandlerImpl::onStateChanged(HandlerCore::MarketData::MulticastFeedHandler::State::Enum newState)
570 {
571  listenerHolder_->invokeStateChanged(static_cast<HandlerState::Enum>(newState));
572 }
573 
574 
575 }}}}
576 
unsigned int SequenceNumber
Alias for sequence numbers.
std::string toString() const
Returns the string representation.
Manages processing machinery for market data received from feeds.
Definition: FeedEngine.h:283
void setMarketSegmentIdFilters(const MarketSegmentIdFilters &filters)
FeedDescriptor settlementFeedDescriptor
Sets Settlement prices feed technical configuration.
FeedDescriptor openInterestFeedDescriptor
Sets Open Interest prices feed technical configuration.
ServiceDescriptor serviceA
Service A.
std::string info() const
Feed engine info.
Definition: FeedEngine.cpp:64
EMDI handler configuration settings.
STL namespace.
ServiceDescriptor serviceB
Service B.
static UInt32 getTemplateId(TPacketHeader *packetHeader)
Definition: PacketHeader.h:105
Handler base configuration settings.
unsigned int UInt32
Definition: Numeric.h:41
Definition: Group.h:25
FilteringTraits::MarketSegmentIdFilters MarketSegmentIdFilters
Defines params which affect logs replay.
Definition: Replay.h:74
SequenceNumber packetSeqNum
Packet sequence number.
Definition: Defines.h:66
static HandlerCore::MarketData::FE::MulticastFeedEngine * getFeedEngine(FeedEngine &feedEngine)
void setSecurityIdFilters(const SecurityIdFilters &filters)
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
const FeedEngineSettings & settings() const
Settings used define behavior of given instance.
Definition: FeedEngine.cpp:59
Log binary data of received packets, applied only for Info log level and below.
Definition: LogSettings.h:75
FeedDescriptor exchangeTradeFeedDescriptor
Sets On-exchange trade prices feed technical configuration.
void commonLog(int logLevel, const char *msg, size_t length)
User&#39;s common log.
EmdsHandlerImpl(const EmdsHandlerSettings &settings)
Initialize new instance.
FilteringTraits::SecurityIdFilters SecurityIdFilters
bool isLastInPacket
Indicates whether a message is last in the packet.
Definition: Defines.h:72
SequenceNumber packetMessageSeqNum
Packet message number.
Definition: Defines.h:69
std::string toString() const
Returns the string representation.