22 #include <util/TextBuilder.h> 24 #include "../PacketHeader.h" 26 #include <boost/function.hpp> 28 #include <OnixS/HandlerCore/FeedEngine/SharedReceiver.h> 29 #include <OnixS/HandlerCore/FeedEngine/PacketContainer.h> 30 #include <OnixS/HandlerCore/FeedEngine/SortedPackets.h> 31 #include <OnixS/HandlerCore/MarketData/MulticastFeedHandler.h> 38 namespace MarketData {
40 namespace Implementation {
42 using namespace HandlerCore::MarketData;
50 const PacketHeader* getPacketHeader(
const FE::PacketContainer& container);
53 SequenceNumber_ seqNumber(
const FE::PacketContainer& container);
56 SequenceNumber_ nextSeqNumber(
const FE::PacketContainer& container);
59 bool isHeartbeat(
const FE::PacketContainer&);
62 std::string toString(
const FE::PacketContainer& container);
74 typedef boost::function<void(DataSource& dataSource, const char* data, size_t length)>
OnPacketData;
75 typedef boost::function<void()>
OnGap;
81 unsigned int maxPacketWaitingTimeInMs,
82 unsigned int inactivityWaitingTimeInSec,
83 unsigned int outOfOrderMaxInterval,
84 Concurrency::RecursiveMutex* lock);
88 void reset()
override;
89 bool tryAcquire()
override;
90 void release()
override;
91 void onPacket(FE::PacketContainer&)
override;
92 void onTimeout(
const FE::NetFeed&)
override;
93 void onFailure(
const FE::NetFeed&,
const char*)
override {}
94 void processHoldOnPackets(
const FE::Timestamp& now);
96 void subscribeOnPacketData(
const OnPacketData callback);
97 void subscribeOnGap(
const OnGap callback);
98 void subscribeOnApplSeqReset(
const OnApplSeqReset callback);
99 void subscribeOnInactivity(
const OnInactivity callback);
100 void subscribeOnFeedInactivity(
const OnFeedInactivity callback);
129 OnixS::Concurrency::RecursiveMutex*
lock_;
138 unsigned int maxPacketWaitingTimeInMs,
139 unsigned int inactivityWaitingTimeInSec,
140 unsigned int outOfOrderMaxInterval,
141 Concurrency::RecursiveMutex* lock);
144 void onPacket(FE::PacketContainer&)
override;
const FE::TimeSpan maxPacketWaitingTimeSpan_
unsigned int SequenceNumber
Alias for sequence numbers.
virtual ~EobiPacketProcessor()
void onFailure(const FE::NetFeed &, const char *) override
SequenceNumber nextPacketSeqNum_
const UInt32 outOfOrderMaxInterval_
boost::function< void()> OnInactivity
FE::SortedPackets< PacketHelper > outOfOrder_
PacketHeaderForEobi PacketHeader
OnApplSeqReset onApplSeqReset_
FE::Timestamp lastReceptionTimeFe_
SequenceNumber SequenceNumber_
OnixS::Concurrency::RecursiveMutex * lock_
boost::function< void(FE::NetFeedRole::Enum)> OnFeedInactivity
FE::Timestamp lastReceptionTimeFeA_
boost::function< void()> OnGap
OnFeedInactivity onFeedInactivity_
OnPacketData onPacketData_
const FE::TimeSpan inactivityWaitingTimeSpan_
MarketData::Implementation::PacketHeaderForEobi PacketHeader
OnInactivity onInactivity_
boost::function< void(DataSource &dataSource, const char *data, size_t length)> OnPacketData
boost::function< void()> OnApplSeqReset
FE::Timestamp lastReceptionTimeFeB_