OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  8.1.0
API documentation
MdiHandlerImpl.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 #include <system/File.h>
20 #include <util/String.h>
21 
22 #include <OnixS/Core/Bootstrap/Platform.h>
23 
25 
26 #include <OnixS/HandlerCore/FeedEngine/NetFeedAttributes.h>
27 
28 #include "MdiHandlerImpl.h"
29 #include "Formatting.h"
30 #include "FeedEngineImpl.h"
31 #include "Version.h"
32 
33 
34 namespace OnixS {
35 namespace Eurex {
36 namespace MarketData {
37 namespace Implementation {
38 
39 using namespace HandlerCore::MarketData;
40 using namespace HandlerCore::Common;
41 using namespace Networking;
42 
43 
45  : base("MDI", nullptr, settings.maxPacketSize, settings.licenseDirectory)
46  , settings_(settings)
47  , logFacilityForIncrementals_("Incremental", this, Logging::LOG_LEVEL_DEBUG)
48  , messageContentAllocator_(settings.messagePoolSize)
49 {
50  initLogger(settings);
51 
52  forceLog(ONIXS_LOG_INFO[this] << "OnixS Eurex T7 Mdi Handler, version " << OnixS::Eurex::MarketData::Implementation::version() << ", " << os());
53  forceLog(ONIXS_LOG_INFO[this] << settings.toString());
54 
55  initListenerHolder();
56 
57  try
58  {
59  checkXmlFile();
60 
61  const std::string xml(removeInvalidTemplateNodes(System::File::readTextFile(settings.fastTemplatesFile), "MDPacketHeader"));
62 
63  incrementalFixDecoder_.reset(new FixDecoder(xml, false, InputDataTraits::CompleteMessagesOnly));
64 
65  MdiDataRepositoryConfig repositoryConfig;
66  repositoryConfig.name = "DataRepository";
67  repositoryConfig.parent = this;
68  repositoryConfig.logger = logger_.get();
69  repositoryConfig.listenerHolder = listenerHolder_.get();
70  repositoryConfig.settings = settings;
71  dataRepository_.reset(new MdiDataRepository(repositoryConfig));
72 
73  incrementalPacketProcessor_.reset(
75  settings.lostPacketWaitTime,
76  settings.heartbeatInterval,
78  lock())
79  );
80 
81  incrementalPacketProcessor_->subscribeOnPacketData(boost::bind(&MdiHandlerImpl::onPacketData, this, _1, _2, _3, _4));
82  incrementalPacketProcessor_->subscribeOnGap(boost::bind(&MdiHandlerImpl::onPacketGap, this));
83  incrementalPacketProcessor_->subscribeOnSenderCompIdChange(boost::bind(&MdiHandlerImpl::onSenderCompIdChange, this, _1, _2, _3, _4));
84  incrementalPacketProcessor_->subscribeOnInactivity(boost::bind(&MdiHandlerImpl::onInactivity, this));
85  incrementalPacketProcessor_->subscribeOnFeedInactivity(boost::bind(&MdiHandlerImpl::onFeedInactivity, this, _1));
86  }
87  catch (const std::exception& ex)
88  {
89  reportError(TextBuilder() << "Exception in MdiHandlerImpl constructor: " << ex.what());
90  throw;
91  }
92  catch (...)
93  {
94  reportError("Unknown exception in MdiHandlerImpl constructor");
95  throw;
96  }
97 }
98 
100 {
101  messageContentAllocator_.reset();
102 
103  stop();
104 }
105 
106 void MdiHandlerImpl::checkXmlFile()
107 {
108  std::string detectedXmlVersion;
109 
110  if (!checkXmlVersion(settings_.fastTemplatesFile, detectedXmlVersion, EmdiXmlMajorVersion, EmdiXmlMinorVersion))
111  {
112  reportWarning("Can not extract template file version.");
113  }
114  else
115  forceLog(ONIXS_LOG_INFO[this] << "Detected template file version: " << detectedXmlVersion);
116 }
117 
119 {
120  setFeedEngine(FeedEngineImpl::getFeedEngine(feedEngine));
121 
122  forceLog(ONIXS_LOG_INFO[this] << "Bound FeedEngine");
123  forceLog(ONIXS_LOG_INFO[this] << feedEngine.settings().toString());
124  forceLog(ONIXS_LOG_INFO[this] << feedEngine.info());
125 }
126 
128 {
129  dataRepository_->preBuildProductDataRepositories();
130 
131  base::start();
132 }
133 
135 {
136  dataRepository_->preBuildProductDataRepositories();
137 
138  incrementalPacketProcessor_->activateReplayMode();
139 
140  LogPlayerLinks links;
141  links.push_back(LogPlayerLink("Incremental", incrementalPacketProcessor_.get()));
142 
143  BaseHandlerImp::start(options, links, "MDI");
144 }
145 
147 {
148  base::stop();
149 }
150 
151 void MdiHandlerImpl::onStarting()
152 {
153  incrementalPacketProcessor_->reset();
154 
155  FE::MulticastFeedLayout::Enum layout = FE::MulticastFeedLayout::Both;
156  if (!settings_.useFeedA)
157  layout = FE::MulticastFeedLayout::BOnly;
158  else if (!settings_.useFeedB)
159  layout = FE::MulticastFeedLayout::AOnly;
160 
161  FE::NifsByFeedRole nifs;
162  nifs.A(settings_.networkInterfaceA.empty() ? settings_.networkInterface : settings_.networkInterfaceA);
163  nifs.B(settings_.networkInterfaceB.empty() ? settings_.networkInterface : settings_.networkInterfaceB);
164 
165 
166  {
167  FE::ConnectionInfo connectionA;
168  FE::ConnectionInfo connectionB;
169 
170  connectionA.role(FE::NetFeedRole::A);
171  connectionB.role(FE::NetFeedRole::B);
172 
173  ///
174  connectionA.type(FE::NetFeedType::Incremental);
175  connectionB.type(FE::NetFeedType::Incremental);
176 
177  connectionA.id("IncrementalA");
178  connectionB.id("IncrementalB");
179 
180  connectionA.ip(settings_.interfaceDescriptor.incrementalFeed.serviceA.address);
181  connectionB.ip(settings_.interfaceDescriptor.incrementalFeed.serviceB.address);
182 
183  connectionA.port(settings_.interfaceDescriptor.incrementalFeed.serviceA.port);
184  connectionB.port(settings_.interfaceDescriptor.incrementalFeed.serviceB.port);
185 
186  FE::ConnectionInfoList incrementalConnections;
187  incrementalConnections.push_back(connectionA);
188  incrementalConnections.push_back(connectionB);
189 
190  BOOST_ASSERT(incrementalPacketProcessor_.get());
191 
192  incrementalFeeds_.reset(
193  constructCluster(
194  FE::NetFeedType::Incremental,
195  layout,
196  incrementalConnections,
197  nifs,
198  FE::TimeSpan(0, 0, 0, settings_.lostPacketWaitTime * FE::TimeTraits::nanosecondsPerMillisecond()),
199  *incrementalPacketProcessor_.get()
200  )
201  );
202  }
203 
204  incrementalFeeds_->connect(FE::CanThrowNow());
205 }
206 
207 void MdiHandlerImpl::onStopping()
208 {
209  if(incrementalFeeds_)
210  incrementalFeeds_->disconnect();
211 }
212 
213 void MdiHandlerImpl::onPacketData(DataSource& dataSource, const char* data, size_t length, size_t headerSize)
214 {
215  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Packet " << dataSource.packetSeqNum);
216 
217  if (settings_.logSettings& LogSettings::LogPackets)
218  log(ONIXS_LOG_INFO[&logFacilityForIncrementals_] << dataSource.packetSeqNum << " Packet received (" << Base64Wrapper(data, length) << ").");
219 
220 
221  try
222  {
223  size_t offset = headerSize;
224  size_t chunkLength;
225  TemplateId templateId = 0;
226  bool result = false;
227 
228  const PacketHeaderForMDI* const packetHeader = (PacketHeaderForMDI*)(data);
229  const UInt32 packetHeaderTemplateId = PacketHeaderHelper::getTemplateId(packetHeader);
230 
231  if (packetHeaderTemplateId != PacketHeaderForMDI::TemplateIdValue)
232  {
233  throw OperationException(
234  BOOST_CURRENT_FUNCTION,
235  format("Wrong packet header id: expected %d, but received %d",
236  PacketHeaderForMDI::TemplateIdValue, packetHeaderTemplateId).c_str()
237  );
238  }
239 
240  while (offset < length)
241  {
242  CORE::Message msg(messageContentAllocator_);
243 
244  try
245  {
246  result = incrementalFixDecoder_->decode(
247  reinterpret_cast<const uint8*>(data), offset, length - offset, &msg, &chunkLength, &templateId);
248  }
249  catch (const std::exception& ex)
250  {
251  reportWarning(
252  TextBuilder()
253  << "Exception while message decoding: "
254  << ex.what()
255  << ", Packet ("
256  << Base64Wrapper(data, length) << ")."
257  );
258 
259  break;
260  }
261  catch (...)
262  {
263  reportWarning(
264  TextBuilder()
265  << "Unknown exception while message decoding"
266  << ", Packet ("
267  << Base64Wrapper(data, length) << ")."
268  );
269 
270  break;
271  }
272 
273  if (chunkLength == 0)
274  break;
275 
276  if (!result)
277  break;
278 
279  offset += chunkLength;
280  ++dataSource.packetMessageSeqNum;
281  dataSource.isLastInPacket = offset >= length;
282 
283  if (templateId == ResetTemplateId)
284  continue;
285 
286  dataRepository_->onMessage(dataSource, msg, templateId);
287  }
288  }
289  catch (const std::exception& ex)
290  {
291  reportWarning(
292  TextBuilder()
293  << "Exception while message processing: "
294  << ex.what()
295  << ", Packet ("
296  << Base64Wrapper(data, length) << ")."
297  );
298  }
299  catch (...)
300  {
301  reportWarning(
302  TextBuilder()
303  << "Unknown exception while message processing"
304  << ", Packet ("
305  << Base64Wrapper(data, length) << ")."
306  );
307  }
308 }
309 
310 void MdiHandlerImpl::onPacketGap()
311 {
312  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "Packet gap");
313  dataRepository_->onPacketGap();
314 }
315 
316 void MdiHandlerImpl::onSenderCompIdChange(unsigned int lastSenderCompId, unsigned int newSenderCompId, SequenceNumber lastSeqNum, SequenceNumber newSeqNum)
317 {
318  log(ONIXS_LOG_DEBUG[&logFacilityForIncrementals_] << "SenderCompIdChange: " << lastSenderCompId << "->" << newSenderCompId << ", packetSeqNum " << lastSeqNum << "->" << newSeqNum);
319  dataRepository_->onSenderCompIdChange();
320 }
321 
322 void MdiHandlerImpl::onInactivity()
323 {
324  reportWarning(
325  TextBuilder()
326  << "No " << LogFacility::name() << " incremental data"
327  );
328 
329  dataRepository_->noDataOnFeeds();
330 }
331 
332 void MdiHandlerImpl::onFeedInactivity(FE::NetFeedRole::Enum id)
333 {
334  reportWarning(
335  TextBuilder()
336  << "No " << LogFacility::name() << " incremental data on " << FE::toStr(id)
337  );
338 }
339 
341 {
342  dataRepository_->setMarketSegmentId2Depth(map);
343 
344  if (map.size() > 0)
345  {
346  TextBuilder tb;
347  tb << "Market SegmentId to Depth Map: ";
348  for (MarketSegmentId2Depth::const_iterator it = map.begin(), end = map.end(); it != end; ++it)
349  tb << '\n' << it->first << " => " << it->second;
350  log(ONIXS_LOG_INFO[this] << tb.toString());
351  }
352 }
353 
355 {
356  BOOST_ASSERT(dataRepository_.get() != nullptr);
357  //BOOST_ASSERT(!filters.empty());
358 
359  dataRepository_->setMarketSegmentIdFilters(filters);
360 
361  if (filters.size() > 0)
362  {
363  TextBuilder tb;
364  tb << "Market Segment Id Filters: ";
365  for (MarketSegmentIdFilters::const_iterator it = filters.begin(), end = filters.end(); it != end; ++it)
366  tb << '\n' << *it;
367  log(ONIXS_LOG_INFO[this] << tb.toString());
368  }
369 }
370 
372 {
373  BOOST_ASSERT(dataRepository_.get() != nullptr);
374  dataRepository_->clearMarketSegmentIdFilters();
375  log(ONIXS_LOG_INFO[this] << "removeAllMarketSegmentIdFilters");
376 }
377 
379 {
380  dataRepository_->setSecurityIdFilters(filters);
381 
382  if (filters.size() > 0)
383  {
384  TextBuilder tb;
385  tb << "Security Id Filters: ";
386  for (SecurityIdFilters::const_iterator it = filters.begin(), end = filters.end(); it != end; ++it)
387  tb << '\n' << *it;
388  log(ONIXS_LOG_INFO[this] << tb.toString());
389  }
390 }
391 
393 {
394  dataRepository_->removeAllSecurityIdFilters();
395  log(ONIXS_LOG_INFO[this] << "removeAllSecurityIdFilters");
396 }
397 
398 void MdiHandlerImpl::initLogger(const HandlerSettings &settings)
399 {
400  logger_.reset(new HandlerLogger(convertLogSettings(settings)));
401 
402  setLogger(&logger_->getLogger());
403 }
404 
405 void MdiHandlerImpl::initListenerHolder()
406 {
407  try
408  {
409  listenerHolder_.reset(new ListenerHolder(this, logger_.get()));
410  }
411  catch (const std::exception& ex)
412  {
413  log(ONIXS_LOG_ERROR[this] << "Exception while constructing ListenerHolder: " << ex.what());
414  throw;
415  }
416  catch (...)
417  {
418  log(ONIXS_LOG_ERROR[this] << "Unknown exception while constructing ListenerHolder");
419  throw;
420  }
421 }
422 
423 void MdiHandlerImpl::onStateChanged(HandlerCore::MarketData::MulticastFeedHandler::State::Enum newState)
424 {
425  listenerHolder_->invokeStateChanged(static_cast<HandlerState::Enum>(newState));
426 }
427 
428 void MdiHandlerImpl::invokeError(const std::string& description)
429 {
431 }
432 
433 void MdiHandlerImpl::invokeWarning(const std::string& description)
434 {
435  listenerHolder().invokeWarning(description);
436 }
437 
438 }
439 }
440 }
441 }
442 
unsigned int SequenceNumber
Alias for sequence numbers.
std::string toString() const
Returns the string representation.
Manages processing machinery for market data received from feeds.
Definition: FeedEngine.h:287
MarketDepthTraits::MarketSegmentId2Depth MarketSegmentId2Depth
void invokeWarning(const std::string &description) override
MDI handler configuration settings.
ServiceDescriptor serviceA
Service A.
std::string info() const
Feed engine info.
Definition: FeedEngine.cpp:64
void setMarketSegmentId2Depth(const MarketSegmentId2Depth &map)
ListenerHolder & listenerHolder()
Returns commons services as shared object.
MdiHandlerImpl(const MdiHandlerSettings &settings)
Initialize new instance.
ServiceDescriptor serviceB
Service B.
static UInt32 getTemplateId(TPacketHeader *packetHeader)
Definition: PacketHeader.h:117
Handler base configuration settings.
Util::TextBuilder TextBuilder
Definition: Formatting.h:43
void invokeError(ErrorCode::Enum code, const std::string &description) ONIXS_NOEXCEPT
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
void setMarketSegmentIdFilters(const MarketSegmentIdFilters &filters)
FilteringTraits::MarketSegmentIdFilters MarketSegmentIdFilters
Defines params which affect logs replay.
Definition: Replay.h:77
SequenceNumber packetSeqNum
Packet sequence number.
Definition: Defines.h:66
void setSecurityIdFilters(const SecurityIdFilters &filters)
static HandlerCore::MarketData::FE::MulticastFeedEngine * getFeedEngine(FeedEngine &feedEngine)
void invokeError(const std::string &description) override
std::string toString() const
Returns the string representation.
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
const FeedEngineSettings & settings() const
Settings used define behavior of given instance.
Definition: FeedEngine.cpp:59
Log binary data of received packets, applied only for Info log level and below.
Definition: LogSettings.h:75
FilteringTraits::SecurityIdFilters SecurityIdFilters
bool checkXmlVersion(const std::string &filename, std::string &detectedVersion, UInt32 expectedXmlMajorVersion, UInt32 expectedEmlMinorVersion)
Definition: Utils.cpp:37
bool isLastInPacket
Indicates whether a message is last in the packet.
Definition: Defines.h:72
Identifiers errors of generic nature.
Definition: ErrorListener.h:38
SequenceNumber packetMessageSeqNum
Packet message number.
Definition: Defines.h:69
OnixS::FIX::Core::FAST::TemplateId TemplateId
void invokeWarning(const std::string &description) ONIXS_NOEXCEPT
std::string removeInvalidTemplateNodes(std::string xml, const std::string &node)
Definition: Utils.cpp:120