OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.4.3
API documentation
PacketProcessor.h
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 
20 #pragma once
21 
22 #include <boost/function.hpp>
23 
24 #include <OnixS/HandlerCore/FeedEngine/SharedReceiver.h>
25 #include <OnixS/HandlerCore/FeedEngine/PacketContainer.h>
26 #include <OnixS/HandlerCore/FeedEngine/SortedPackets.h>
27 #include <OnixS/HandlerCore/MarketData/MulticastFeedHandler.h>
28 
30 
31 #include "PacketHeader.h"
32 #include "Utils.h"
33 
34 using namespace OnixS::HandlerCore::MarketData;
35 
36 namespace OnixS {
37 namespace Eurex {
38 namespace MarketData {
39 namespace Implementation {
40 
41 
42  template<typename TPacketHeader>
43  struct PacketHelper
44  {
46 
47  static
48  const TPacketHeader* getPacketHeader(const HandlerCore::MarketData::FE::PacketContainer& container)
49  {
50  const TPacketHeader* const header = reinterpret_cast<const TPacketHeader*>(container.data());
51  BOOST_ASSERT(header);
52 
53  return header;
54  }
55 
56  static
57  SequenceNumber_ seqNumber(const HandlerCore::MarketData::FE::PacketContainer& container)
58  {
59  return PacketHeaderHelper::getSeqNum(getPacketHeader(container));
60  }
61 
62  static
63  SequenceNumber_ nextSeqNumber(const HandlerCore::MarketData::FE::PacketContainer& container)
64  {
65  return seqNumber(container) + 1;
66  }
67 
68  static
69  bool isHeartbeat(const HandlerCore::MarketData::FE::PacketContainer&)
70  {
71  return false;
72  }
73 
74  static
75  unsigned int senderCompId(const HandlerCore::MarketData::FE::PacketContainer& container)
76  {
77  return PacketHeaderHelper::getSenderCompId(getPacketHeader(container));
78  }
79 
80  static
81  std::string toString(const HandlerCore::MarketData::FE::PacketContainer& container)
82  {
83  std::stringstream ss;
84  ss << "Packet [seqNum = " << seqNumber(container) << ", senderCompId = " << senderCompId(container) << " ]";
85 
86  return ss.str();
87  }
88 
89  static
90  size_t headerSize()
91  {
92  return sizeof(TPacketHeader);
93  }
94  };
95 
96  template<class TPacketHeader>
97  class PacketProcessor : public LegacyLogPlayerPacketProcessor
98  {
99  public:
100  PacketProcessor(unsigned int, unsigned int, unsigned int, Concurrency::RecursiveMutex*);
101  virtual ~PacketProcessor() {}
102  void reset() override;
103 
104  bool tryAcquire() override;
105  void release() override;
106  void onPacket(FE::PacketContainer&) override;
107  void onTimeout(const FE::NetFeed&) override;
108  void onFailure(const FE::NetFeed&, const char*) override {}
109 
110 
111  typedef boost::function<void(DataSource&, const char*, size_t, size_t)> OnPacketData;
112  typedef boost::function<void()> OnGap;
113  typedef boost::function<void(unsigned int, unsigned int, SequenceNumber, SequenceNumber)> OnSenderCompIdChange;
114  typedef boost::function<void()> OnInactivity;
115  typedef boost::function<void(FE::NetFeedRole::Enum)> OnFeedInactivity;
116 
117  void subscribeOnPacketData(OnPacketData callback)
118  {
119  onPacketData_ = callback;
120  }
121 
122  void subscribeOnGap(OnGap callback)
123  {
124  onGap_ = callback;
125  }
126 
127  void subscribeOnSenderCompIdChange(OnSenderCompIdChange callback)
128  {
129  onSenderCompIdChange_ = callback;
130  }
131 
132  void subscribeOnInactivity(OnInactivity callback)
133  {
134  onInactivity_ = callback;
135  }
136 
137  void subscribeOnFeedInactivity(OnFeedInactivity callback)
138  {
139  onFeedInactivity_ = callback;
140  }
141 
142  protected:
143  virtual bool filterPacket(const TPacketHeader* /*header*/) { return true; };
144  virtual void fillDataSource(FE::PacketContainer& packetContainer, DataSource& dataSource);
145 
146 
147  private:
148  void processHoldOnPackets(const FE::Timestamp& now);
149 
150  private:
151  const FE::TimeSpan maxPacketWaitingTimeSpan_;
152  const FE::TimeSpan inactivityWaitingTimeSpan_;
153  const UInt32 outOfOrderMaxInterval_;
154 
155  unsigned int senderCompId_;
156  unsigned int prevSenderCompId_;
157  unsigned int nextSeqNum_;
158 
159  FE::Timestamp lastReceptionTimeFe_;
160  FE::Timestamp lastReceptionTimeFeA_;
161  FE::Timestamp lastReceptionTimeFeB_;
162 
163  bool inactivityFlag_;
164  bool inactivityFlagA_;
165  bool inactivityFlagB_;
166 
167  FE::SortedPackets<PacketHelper<TPacketHeader> > outOfOrder_;
168  Concurrency::RecursiveMutex* lock_;
169 
170  OnPacketData onPacketData_;
171  OnGap onGap_;
172  OnSenderCompIdChange onSenderCompIdChange_;
173  OnFeedInactivity onFeedInactivity_;
174  OnInactivity onInactivity_;
175  };
176 
177  template<class TPacketHeader>
179  unsigned int maxPacketWaitingTimeInMs,
180  unsigned int inactivityWaitingTimeInSec,
181  unsigned int outOfOrderMaxInterval,
182  Concurrency::RecursiveMutex* lock)
183  : maxPacketWaitingTimeSpan_((Int64)maxPacketWaitingTimeInMs * FE::TimeTraits::nanosecondsPerMillisecond())
184  , inactivityWaitingTimeSpan_((Int64)inactivityWaitingTimeInSec * FE::TimeTraits::nanosecondsPerSecond())
185  , outOfOrderMaxInterval_(outOfOrderMaxInterval)
186  , lock_(lock)
187  {
188  BOOST_ASSERT(lock_);
190  }
191 
192  template<class TPacketHeader>
194  {
195  senderCompId_ = 0;
196  prevSenderCompId_ = 0;
197  nextSeqNum_ = 1;
198 
199  outOfOrder_.clear();
200 
201  lastReceptionTimeFe_ = FE::UtcWatch::now();
202  lastReceptionTimeFeA_ = lastReceptionTimeFe_;
203  lastReceptionTimeFeB_ = lastReceptionTimeFe_;
204 
205  inactivityFlag_ = false;
206  inactivityFlagA_ = false;
207  inactivityFlagB_ = false;
208  }
209 
210  template<class TPacketHeader>
212  {
213  BOOST_ASSERT(lock_);
214  return lock_->tryAcquire();
215  }
216 
217  template<class TPacketHeader>
219  {
220  BOOST_ASSERT(lock_);
221  return lock_->release();
222  }
223 
224  template<class TPacketHeader>
225  void PacketProcessor<TPacketHeader>::fillDataSource(FE::PacketContainer& packetContainer, DataSource& dataSource)
226  {
227  const TPacketHeader* const packetHeader = reinterpret_cast<TPacketHeader*>(packetContainer.data());
228  BOOST_ASSERT(packetHeader != nullptr);
229 
230  dataSource.packetMessageSeqNum = 0;
231  dataSource.isLastInPacket = false;
232  dataSource.sendingTime = PacketHeaderHelper::getSendingTime(packetHeader);
233  dataSource.performanceIndicator = 0;
234  dataSource.partitionId = 0;
235  dataSource.senderCompID = PacketHeaderHelper::getSenderCompId(packetHeader);
236  dataSource.completionIndicator = true;
237  dataSource.marketSegmentId = 0;
238  dataSource.cached = false;
239 
240  if (packetContainer.source().role() == FE::NetFeedRole::A)
241  dataSource.origin = DataSource::FeedA;
242  else if (packetContainer.source().role() == FE::NetFeedRole::B)
243  dataSource.origin = DataSource::FeedB;
244  else
245  dataSource.origin = DataSource::Undefined;
246 
247  dataSource.packetReceptionTime = TimeManager::convert(packetContainer.receiveTime());
248 
249  if (packetContainer.auxiliaryReceiveTime() != FE::Timestamp())
250  dataSource.originalPacketReceptionTime = TimeManager::convert(packetContainer.auxiliaryReceiveTime());
251  else
252  dataSource.originalPacketReceptionTime = dataSource.packetReceptionTime;
253  };
254 
255 
256  template<class TPacketHeader>
257  void PacketProcessor<TPacketHeader>::onPacket(FE::PacketContainer& packetContainer)
258  {
259  static const size_t HEADER_SIZE = sizeof(TPacketHeader);
260  if (packetContainer.size() < HEADER_SIZE)
261  throw OperationException(BOOST_CURRENT_FUNCTION, "Wrong packet: size less than header");
262 
263  const TPacketHeader* const packetHeader = reinterpret_cast<const TPacketHeader*>(packetContainer.data());
264  BOOST_ASSERT(packetHeader);
265 
266  SequenceNumber seqNum = PacketHeaderHelper::getSeqNum(packetHeader);
267 
268  lastReceptionTimeFe_ = packetContainer.receiveTime();
269  inactivityFlag_ = false;
270 
271  if (packetContainer.source().role() == FE::NetFeedRole::A)
272  {
273  lastReceptionTimeFeA_ = packetContainer.receiveTime();
274  inactivityFlagA_ = false;
275  }
276  else if (packetContainer.source().role() == FE::NetFeedRole::B)
277  {
278  lastReceptionTimeFeB_ = packetContainer.receiveTime();
279  inactivityFlagB_ = false;
280  }
281 
282  const unsigned int senderCompId = PacketHeaderHelper::getSenderCompId(packetHeader);
283  if (senderCompId_ == 0)
284  {
285  senderCompId_ = senderCompId;
286  nextSeqNum_ = seqNum;
287  }
288  else if (senderCompId_ != senderCompId)
289  {
290  if (prevSenderCompId_ != 0 && prevSenderCompId_ == senderCompId)
291  {
292  processHoldOnPackets(lastReceptionTimeFe_);
293  return;
294  }
295 
296  processHoldOnPackets(lastReceptionTimeFe_);
297  outOfOrder_.clear();
298 
299  const unsigned int oldSenderCompId = senderCompId_;
300  const SequenceNumber oldSeqNum = nextSeqNum_;
301 
302  senderCompId_ = senderCompId;
303  nextSeqNum_ = 1;
304  onSenderCompIdChange_(oldSenderCompId, senderCompId_, oldSeqNum, seqNum);
305  prevSenderCompId_ = oldSenderCompId;
306  }
307 
308  if (seqNum == nextSeqNum_)
309  {
310  if (filterPacket(packetHeader))
311  {
312  DataSource dataSource;
313 
314  fillDataSource(packetContainer, dataSource);
315  dataSource.packetSeqNum = seqNum;
316 
317  onPacketData_(dataSource, static_cast<char*>(packetContainer.data()), packetContainer.size(), HEADER_SIZE);
318  }
319 
320  ++nextSeqNum_;
321  }
322  else if (seqNum > nextSeqNum_)
323  outOfOrder_.push(packetContainer);
324 
325  processHoldOnPackets(lastReceptionTimeFe_);
326  }
327 
328  template< class TPacketHeader >
329  void PacketProcessor<TPacketHeader>::onTimeout(const FE::NetFeed& feed)
330  {
331  const FE::Timestamp now = HandlerCore::MarketData::FE::UtcWatch::now();
332 
333  if (!inactivityFlag_ && onInactivity_)
334  {
335  const FE::TimeSpan span = now - lastReceptionTimeFe_;
336  if (span > inactivityWaitingTimeSpan_)
337  {
338  inactivityFlag_ = true;
339  onInactivity_();
340  }
341  }
342 
343  if (onFeedInactivity_)
344  {
345  if (!inactivityFlagA_ && feed.role() == FE::NetFeedRole::A)
346  {
347  const FE::TimeSpan span = now - lastReceptionTimeFeA_;
348  if (span > inactivityWaitingTimeSpan_)
349  {
350  inactivityFlagA_ = true;
351  onFeedInactivity_(feed.role());
352  }
353  }
354 
355  if (!inactivityFlagB_ && feed.role() == FE::NetFeedRole::B)
356  {
357  const FE::TimeSpan span = now - lastReceptionTimeFeB_;
358  if (span > inactivityWaitingTimeSpan_)
359  {
360  inactivityFlagB_ = true;
361  onFeedInactivity_(feed.role());
362  }
363  }
364  }
365  }
366 
367  template< class TPacketHeader >
368  void PacketProcessor<TPacketHeader>::processHoldOnPackets(const FE::Timestamp& now)
369  {
370  while (!outOfOrder_.empty())
371  {
372  FE::PacketContainer& packetContainer = outOfOrder_.lowest();
373  const TPacketHeader* packetHeader = reinterpret_cast<TPacketHeader*>(packetContainer.data());
374  SequenceNumber seqNum = PacketHeaderHelper::getSeqNum(packetHeader);
375 
376  if (seqNum == nextSeqNum_)
377  {
378  outOfOrder_.popLowest();
379 
380  if (filterPacket(packetHeader))
381  {
382  DataSource dataSource;
383 
384  fillDataSource(packetContainer, dataSource);
385  dataSource.packetSeqNum = seqNum;
386  dataSource.cached = true;
387 
388  onPacketData_(dataSource, static_cast<char*>(packetContainer.data()), packetContainer.size(), sizeof(TPacketHeader));
389  }
390 
391  packetContainer.release();
392  ++nextSeqNum_;
393  }
394  else if (seqNum > nextSeqNum_)
395  {
396  const bool outOfOrderOverflow = (outOfOrderMaxInterval_ < (seqNum - nextSeqNum_));
397  const bool waitingTimeElapsed = (maxPacketWaitingTimeSpan_ < (now - packetContainer.receiveTime()));
398 
399  if (waitingTimeElapsed || outOfOrderOverflow || !orderPackets())
400  {
401  onGap_();
402  nextSeqNum_ = seqNum;
403  }
404  else
405  return;
406  }
407  }
408  }
409 }
410 
411 
412 }
413 }
414 }
415 
unsigned int SequenceNumber
Alias for sequence numbers.
static SequenceNumber getSeqNum(const TPacketHeader *packetHeader)
Definition: PacketHeader.h:134
boost::function< void(FE::NetFeedRole::Enum)> OnFeedInactivity
UInt32 senderCompID
Unique id for a sender.
Definition: Defines.h:85
Timestamp packetReceptionTime
Time when the packet was received by Handler from UDP, in system ticks,.
Definition: Defines.h:59
static std::string toString(const HandlerCore::MarketData::FE::PacketContainer &container)
static SequenceNumber_ nextSeqNumber(const HandlerCore::MarketData::FE::PacketContainer &container)
static unsigned int getSenderCompId(TPacketHeader *packetHeader)
Definition: PacketHeader.h:106
static const TPacketHeader * getPacketHeader(const HandlerCore::MarketData::FE::PacketContainer &container)
static bool isHeartbeat(const HandlerCore::MarketData::FE::PacketContainer &)
UInt64 sendingTime
Time when market data feed handler writes packet on the wire.
Definition: Defines.h:75
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
SequenceNumber packetSeqNum
Packet sequence number.
Definition: Defines.h:66
static unsigned int senderCompId(const HandlerCore::MarketData::FE::PacketContainer &container)
static UInt64 getSendingTime(TPacketHeader *packetHeader)
Definition: PacketHeader.h:153
boost::function< void(unsigned int, unsigned int, SequenceNumber, SequenceNumber)> OnSenderCompIdChange
static SequenceNumber_ seqNumber(const HandlerCore::MarketData::FE::PacketContainer &container)
void subscribeOnSenderCompIdChange(OnSenderCompIdChange callback)
bool isLastInPacket
Indicates whether a message is last in the packet.
Definition: Defines.h:72
SequenceNumber packetMessageSeqNum
Packet message number.
Definition: Defines.h:69
static Timestamp convert(const OnixS::Time::DateTime &time)
Definition: Utils.cpp:169
boost::function< void(DataSource &, const char *, size_t, size_t)> OnPacketData
void onFailure(const FE::NetFeed &, const char *) override
virtual void fillDataSource(FE::PacketContainer &packetContainer, DataSource &dataSource)