OnixS C++ Eurex T7 Market and Reference Data Interface (EMDI, RDI, EOBI) Handlers  7.4.2
API documentation
EobiPacketProcessor.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 
21 
22 #include "EobiPacketProcessor.h"
23 
24 
25 namespace OnixS {
26 namespace Eurex {
27 namespace MarketData {
28 namespace EOBI {
29 namespace Implementation {
30 
31 using namespace HandlerCore::MarketData;
32 
34 PacketHelper::getPacketHeader(const FE::PacketContainer& container)
35 {
36  const PacketHelper::PacketHeader* const packetHeader =
37  reinterpret_cast<const PacketHelper::PacketHeader*>(container.data());
38 
39  BOOST_ASSERT(packetHeader != nullptr);
40 
41  return packetHeader;
42 }
43 
44 PacketHelper::SequenceNumber_ PacketHelper::seqNumber(const HandlerCore::MarketData::FE::PacketContainer& container)
45 {
46  return getPacketHeader(container)->ApplSeqNum;
47 }
48 
49 PacketHelper::SequenceNumber_ PacketHelper::nextSeqNumber(const FE::PacketContainer& container)
50 {
51  return seqNumber(container) + 1;
52 }
53 
54 bool PacketHelper::isHeartbeat(const FE::PacketContainer&)
55 {
56  return false;
57 }
58 
60 {
61  return sizeof(PacketHeader);
62 }
63 
64 std::string PacketHelper::toString(const FE::PacketContainer& container)
65 {
66  std::stringstream ss;
67  ss << "Packet [seqNum = " << seqNumber(container) << " ]";
68 
69  return ss.str();
70 }
71 
73  unsigned int maxPacketWaitingTimeInMs,
74  unsigned int inactivityWaitingTimeInSec,
75  unsigned int outOfOrderMaxInterval,
76  OnixS::Concurrency::RecursiveMutex* lock)
77  : nextPacketSeqNum_(1)
78  , maxPacketWaitingTimeSpan_((Int64)maxPacketWaitingTimeInMs * FE::TimeTraits::nanosecondsPerMillisecond())
79  , inactivityWaitingTimeSpan_((Int64)inactivityWaitingTimeInSec * FE::TimeTraits::nanosecondsPerSecond())
80  , outOfOrderMaxInterval_(outOfOrderMaxInterval)
81  , inactivityFlag_(false)
82  , inactivityFlagA_(false)
83  , inactivityFlagB_(false)
84  , applSeqResetting_(false)
85  , lock_(lock)
86 {
87  BOOST_ASSERT(lock_);
89 }
90 
92 {
93  BOOST_ASSERT(lock_);
94  return lock_->tryAcquire();
95 }
96 
98 {
99  BOOST_ASSERT(lock_);
100  return lock_->release();
101 }
102 
103 void EobiPacketProcessor::onPacket(FE::PacketContainer& packetContainer)
104 {
105  if (packetContainer.size() < sizeof(PacketHeader))
106  throw OperationException(BOOST_CURRENT_FUNCTION, "Wrong packet: size less than header");
107 
108  const PacketHeader* const packetHeader = reinterpret_cast<const PacketHeader*>(packetContainer.data());
109  const SequenceNumber packetSeqNum = packetHeader->ApplSeqNum;
110 
111  lastReceptionTimeFe_ = packetContainer.receiveTime();
112  inactivityFlag_ = false;
113 
114  if (packetContainer.source().role() == FE::NetFeedRole::A)
115  {
116  lastReceptionTimeFeA_ = packetContainer.receiveTime();
117  inactivityFlagA_ = false;
118  }
119  else if (packetContainer.source().role() == FE::NetFeedRole::B)
120  {
121  lastReceptionTimeFeB_ = packetContainer.receiveTime();
122  inactivityFlagB_ = false;
123  }
124 
125  if (packetHeader->ApplSeqResetIndicator)
126  {
127  if (!applSeqResetting_)
128  {
129  reset();
130  onApplSeqReset_();
131  applSeqResetting_ = true;
132  }
133  }
134  else
135  applSeqResetting_ = false;
136 
137  if (packetSeqNum == nextPacketSeqNum_)
138  {
139  DataSource dataSource(createDataSource(packetContainer, packetSeqNum));
140 
141  const char* const packetData = static_cast<const char*>(packetContainer.data());
142  const size_t size = packetContainer.size();
143 
144  onPacketData_(dataSource, packetData, size);
145 
147  }
148  else if (packetSeqNum > nextPacketSeqNum_) // packets loss
149  outOfOrder_.push(packetContainer);
150 
152 }
153 
154 void EobiPacketProcessor::onTimeout(const FE::NetFeed& feed)
155 {
156  const FE::Timestamp now = HandlerCore::MarketData::FE::UtcWatch::now();
157 
159  {
160  const FE::TimeSpan span = now - lastReceptionTimeFe_;
161  if (span > inactivityWaitingTimeSpan_)
162  {
163  inactivityFlag_ = true;
164  onInactivity_();
165  }
166  }
167 
168  if (onFeedInactivity_)
169  {
170  if (!inactivityFlagA_ && feed.role() == FE::NetFeedRole::A)
171  {
172  const FE::TimeSpan span = now - lastReceptionTimeFeA_;
173  if (span > inactivityWaitingTimeSpan_)
174  {
175  inactivityFlagA_ = true;
176  onFeedInactivity_(feed.role());
177  }
178  }
179 
180  if (!inactivityFlagB_ && feed.role() == FE::NetFeedRole::B)
181  {
182  const FE::TimeSpan span = now - lastReceptionTimeFeB_;
183  if (span > inactivityWaitingTimeSpan_)
184  {
185  inactivityFlagB_ = true;
186  onFeedInactivity_(feed.role());
187  }
188  }
189  }
190 }
191 
192 void EobiPacketProcessor::processHoldOnPackets(const FE::Timestamp& now)
193 {
194  while (!outOfOrder_.empty())
195  {
196  FE::PacketContainer& packetContainer = outOfOrder_.lowest();
197  PacketHeader* packetHeader = reinterpret_cast<PacketHeader*>(packetContainer.data());
198  SequenceNumber packetSeqNum = packetHeader->ApplSeqNum;
199 
200  if (packetSeqNum == nextPacketSeqNum_)
201  {
202  outOfOrder_.popLowest();
203 
204  DataSource dataSource(createDataSource(packetContainer, packetSeqNum));
205 
206  dataSource.cached = true;
207 
208  const char* const packetData = static_cast<const char*>(packetContainer.data());
209  const size_t size = packetContainer.size();
210  BOOST_ASSERT(packetData);
211 
212  onPacketData_(dataSource, packetData, size);
213 
215 
216  packetContainer.release();
217  }
218  else if ((packetSeqNum > nextPacketSeqNum_) && onGap_)
219  {
220  const bool outOfOrderOverflow = (outOfOrderMaxInterval_ < (packetSeqNum - nextPacketSeqNum_));
221  const bool waitingTimeElapsed = (maxPacketWaitingTimeSpan_ < (now - packetContainer.receiveTime()));
222 
223  if (waitingTimeElapsed || outOfOrderOverflow || !orderPackets())
224  {
225  onGap_();
226  nextPacketSeqNum_ = packetSeqNum;
227  }
228  else
229  return;
230  }
231  }
232 }
233 
235 {
236  nextPacketSeqNum_ = 1;
237 
238  outOfOrder_.clear();
239 
240  lastReceptionTimeFe_ = FE::UtcWatch::now();
243 
244  inactivityFlag_ = false;
245 }
246 
248 {
249  onPacketData_ = callback;
250 }
251 
253 {
254  onGap_ = callback;
255 }
256 
258 {
259  onApplSeqReset_ = callback;
260 }
261 
263 {
264  onInactivity_ = callback;
265 }
266 
268 {
269  onFeedInactivity_ = callback;
270 }
271 
272 
274  FE::PacketContainer& packetContainer,
275  SequenceNumber packetSeqNum)
276 {
277  DataSource dataSource;
278 
279  const PacketHeader* packetHeader = reinterpret_cast<const PacketHeader*>(packetContainer.data());
280 
281  BOOST_ASSERT(packetHeader != nullptr);
282 
283  dataSource.packetSeqNum = packetSeqNum;
284  dataSource.packetMessageSeqNum = 0;
285  dataSource.isLastInPacket = false;
286  dataSource.sendingTime = packetHeader->TransactTime;
287 
288  dataSource.performanceIndicator = 0;
289  dataSource.partitionId = packetHeader->PartitionID;
290  dataSource.completionIndicator = (packetHeader->CompletionIndicator == 1);
291  dataSource.marketSegmentId = packetHeader->MarketSegmentID;
292 
293  dataSource.origin = feedRole2Origin(packetContainer.source().role());
294 
295  dataSource.cached = false;
296 
297  dataSource.packetReceptionTime = TimeManager::convert(packetContainer.receiveTime());
298 
299  if (packetContainer.auxiliaryReceiveTime() != FE::Timestamp())
300  dataSource.originalPacketReceptionTime = TimeManager::convert(packetContainer.auxiliaryReceiveTime());
301  else
302  dataSource.originalPacketReceptionTime = dataSource.packetReceptionTime;
303 
304  return dataSource;
305 }
306 
308  unsigned int maxPacketWaitingTimeInMs,
309  unsigned int inactivityWaitingTimeInSec,
310  unsigned int outOfOrderMaxInterval,
311  OnixS::Concurrency::RecursiveMutex* lock)
312  : EobiPacketProcessor(maxPacketWaitingTimeInMs, inactivityWaitingTimeInSec, outOfOrderMaxInterval, lock)
313 {
314 }
315 
316 void EobiSnapshotPacketProcessor::onPacket(FE::PacketContainer& packetContainer)
317 {
318  if (packetContainer.size() < sizeof(PacketHeader))
319  throw OperationException(BOOST_CURRENT_FUNCTION, "Wrong packet: size less than header");
320 
321  const PacketHeader* const packetHeader = reinterpret_cast<const PacketHeader*>(packetContainer.data());
322  BOOST_ASSERT(packetHeader);
323  const SequenceNumber packetSeqNum = packetHeader->ApplSeqNum;
324 
325  lastReceptionTimeFe_ = packetContainer.receiveTime();
326 
327  if (packetContainer.source().role() == FE::NetFeedRole::A)
328  lastReceptionTimeFeA_ = packetContainer.receiveTime();
329  else if (packetContainer.source().role() == FE::NetFeedRole::B)
330  lastReceptionTimeFeB_ = packetContainer.receiveTime();
331 
332  inactivityFlag_ = false;
333 
334  if (packetSeqNum == nextPacketSeqNum_)
335  {
336  DataSource dataSource(createDataSource(packetContainer, packetSeqNum));
337 
338  const char* const packetData = static_cast<const char*>(packetContainer.data());
339  BOOST_ASSERT(packetData);
340  const size_t size = packetContainer.size();
341 
342  onPacketData_(dataSource, packetData, size);
343 
345  }
346  else if (packetSeqNum > nextPacketSeqNum_) // packets loss
347  outOfOrder_.push(packetContainer);
348 
350 }
351 
352 }}}}}
unsigned int SequenceNumber
Alias for sequence numbers.
EobiPacketProcessor(unsigned int maxPacketWaitingTimeInMs, unsigned int inactivityWaitingTimeInSec, unsigned int outOfOrderMaxInterval, Concurrency::RecursiveMutex *lock)
DataSource::Origin feedRole2Origin(OnixS::HandlerCore::MarketData::FE::NetFeedRole::Enum value)
Definition: Utils.cpp:159
DataSource createDataSource(FE::PacketContainer &packetContainer, SequenceNumber packetSeqNum)
EobiSnapshotPacketProcessor(unsigned int maxPacketWaitingTimeInMs, unsigned int inactivityWaitingTimeInSec, unsigned int outOfOrderMaxInterval, Concurrency::RecursiveMutex *lock)
Timestamp packetReceptionTime
Time when the packet was received by Handler from UDP, in system ticks,.
Definition: Defines.h:59
static std::string toString(const HandlerCore::MarketData::FE::PacketContainer &container)
PacketHeaderForEobi PacketHeader
static SequenceNumber_ nextSeqNumber(const HandlerCore::MarketData::FE::PacketContainer &container)
static const TPacketHeader * getPacketHeader(const HandlerCore::MarketData::FE::PacketContainer &container)
static bool isHeartbeat(const HandlerCore::MarketData::FE::PacketContainer &)
UInt64 sendingTime
Time when market data feed handler writes packet on the wire.
Definition: Defines.h:75
Definition: Defines.h:30
SequenceNumber packetSeqNum
Packet sequence number.
Definition: Defines.h:66
boost::function< void(FE::NetFeedRole::Enum)> OnFeedInactivity
static SequenceNumber_ seqNumber(const HandlerCore::MarketData::FE::PacketContainer &container)
bool isLastInPacket
Indicates whether a message is last in the packet.
Definition: Defines.h:72
SequenceNumber packetMessageSeqNum
Packet message number.
Definition: Defines.h:69
static Timestamp convert(const OnixS::Time::DateTime &time)
Definition: Utils.cpp:169
boost::function< void(DataSource &dataSource, const char *data, size_t length)> OnPacketData