OnixS C++ FIX Engine  4.12.0
API Documentation
TCPDirect Latency Sample

This sample shows how to measure latency and use the warm-up feature using TCPDIrect® technology.

Source code:

#include "../SessionTimeMarks.h"
#include "../PerformanceCounter.h"
#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
#ifdef _WIN32
# define ONIXS_USE_TCP_DIRECT_EMULATION 1
#else
# define ONIXS_USE_TCP_DIRECT_EMULATION 0
#endif
using namespace OnixS;
using namespace Settings;
using namespace OnixS::Sample;
namespace {
#if ONIXS_USE_TCP_DIRECT_EMULATION
class AcceptorListener ONIXS_FIXENGINE_FINAL : public FIX::ISessionListener
{
public:
AcceptorListener()
: orderMessageType_(StringRef("D"))
, execReport_("8", FixProtocolVersion)
{
execReport_.set(Tags::OrderID, std::string("90001008"));
}
void onInboundApplicationMsg(Message & msg, Session * sn) ONIXS_FIXENGINE_OVERRIDE
{
if(isOrder(msg))
{
execReport_.set(Tags::OrderQty, msg.get(Tags::OrderQty));
sn->send(&execReport_);
}
}
bool isOrder(Message& msg) const
{
return msg.type() == orderMessageType_;
}
private:
const StringRef orderMessageType_;
Message execReport_;
};
#endif
class Listener ONIXS_FIXENGINE_FINAL : public FIX::ISessionListener
{
public:
explicit Listener(size_t numberOfMessages)
: marks_(new SessionTimeMarks[numberOfMessages])
, active_(false)
, counter_(0)
, numberOfMessages_(numberOfMessages)
{
clear();
}
~Listener() ONIXS_FIXENGINE_OVERRIDE
{
delete[] marks_;
}
void onInboundApplicationMsg(Message &, Session *) ONIXS_FIXENGINE_NOTHROW ONIXS_FIXENGINE_OVERRIDE
{
return;
assert(counter_ < numberOfMessages_);
PerformanceCounter::current(&marks_[counter_].recvFinish);
++counter_;
}
void onReceivedBytes(const char *, size_t, Session *) ONIXS_FIXENGINE_NOTHROW ONIXS_FIXENGINE_OVERRIDE
{
return;
assert(counter_ < numberOfMessages_);
PerformanceCounter::current(&marks_[counter_].recvStart);
}
void onMessageSending(SequenceNumber, char *, size_t, Session *) ONIXS_FIXENGINE_NOTHROW ONIXS_FIXENGINE_OVERRIDE
{
return;
assert(counter_ < numberOfMessages_);
PerformanceCounter::current(&marks_[counter_].sendFinish);
}
void beforeSending() ONIXS_FIXENGINE_NOTHROW
{
assert(active_);
assert(counter_ < numberOfMessages_);
PerformanceCounter::current(&marks_[counter_].sendStart);
}
void afterSending() ONIXS_FIXENGINE_NOTHROW
{
assert(active_);
assert(counter_ < numberOfMessages_);
PerformanceCounter::current(&marks_[counter_].overallSendFinish);
}
{
counter_ = 0;
std::memset(marks_, 0, sizeof(SessionTimeMarks) * numberOfMessages_);
}
size_t counter() const ONIXS_FIXENGINE_NOTHROW
{
return counter_;
}
void active(bool state) ONIXS_FIXENGINE_NOTHROW
{
active_ = state;
}
const SessionTimeMarks* marks() const ONIXS_FIXENGINE_NOTHROW
{
return marks_;
}
void measureOverhead()
{
clear();
active_ = true;
const size_t iterations = numberOfMessages_;
Message dummyMessage = Message();
for (size_t i = 0; i < iterations; ++i)
{
beforeSending();
onMessageSending(0, ONIXS_FIXENGINE_NULLPTR, 0, ONIXS_FIXENGINE_NULLPTR);
afterSending();
onReceivedBytes(ONIXS_FIXENGINE_NULLPTR, 0, ONIXS_FIXENGINE_NULLPTR);
onInboundApplicationMsg(dummyMessage, ONIXS_FIXENGINE_NULLPTR);
}
Durations sendDurations, overallSendDurations, receiveDurations, sendAndReceiveDurations, sendToReceiveDurations;
for (size_t i = 0; i < iterations; ++i)
{
const SessionTimeDurations durations = SessionTimeMarks::toDurations(marks()[i]);
receiveDurations.push_back(durations.receive);
sendDurations.push_back(durations.send);
overallSendDurations.push_back(durations.overallSend);
sendAndReceiveDurations.push_back(durations.sendAndReceive);
sendToReceiveDurations.push_back(durations.sendToReceive);
}
std::sort(sendDurations.begin(), sendDurations.end());
std::sort(overallSendDurations.begin(), overallSendDurations.end());
std::sort(receiveDurations.begin(), receiveDurations.end());
std::sort(sendAndReceiveDurations.begin(), sendAndReceiveDurations.end());
std::sort(sendToReceiveDurations.begin(), sendToReceiveDurations.end());
overhead_.send = SessionTimeMarks::median(sendDurations);
overhead_.overallSend = SessionTimeMarks::median(overallSendDurations);
overhead_.receive = SessionTimeMarks::median(receiveDurations);
overhead_.sendAndReceive = SessionTimeMarks::median(sendAndReceiveDurations);
overhead_.sendToReceive = SessionTimeMarks::median(sendToReceiveDurations);
clear();
active_ = false;
}
const SessionTimeDurations& overhead() const ONIXS_FIXENGINE_NOTHROW
{
return overhead_;
}
private:
SessionTimeMarks* marks_;
bool active_;
size_t counter_;
const size_t numberOfMessages_;
SessionTimeDurations overhead_;
};
void getSessionTimeMarksStatistics(
TCPDirect::Stack& stack, Session & initiator, FlatMessage & order, Listener & listener, unsigned int numberOfMessages,
unsigned int sendPeriodUsec, unsigned int warmupPeriodUsec)
{
listener.clear();
const FlatFieldRef valueRef = order.find(FIX42::Tags::OrderQty);
assert(valueRef);
const FlatFieldKey valueKey = order.allocateKey(valueRef);
listener.active(true);
for(unsigned int messageCounter = 1; messageCounter <= numberOfMessages; ++messageCounter)
{
order.set(valueKey, messageCounter);
listener.beforeSending();
initiator.send(&order);
listener.afterSending();
if(warmupPeriodUsec || sendPeriodUsec)
{
const PerformanceCounter::Count start = PerformanceCounter::current();
PerformanceCounter::Span elapsedMicroseconds = 0;
do
{
do
{
stack.dispatchEvents();
initiator.warmUp(&order);
}
while((elapsedMicroseconds = PerformanceCounter::elapsedMicroseconds(start)) < warmupPeriodUsec);
stack.dispatchEvents();
elapsedMicroseconds = PerformanceCounter::elapsedMicroseconds(start);
}
while(elapsedMicroseconds < sendPeriodUsec);
}
while(listener.counter() < messageCounter)
stack.dispatchEvents();
}
listener.active(false);
}
FlatMessage prepareMessage()
{
const std::string rawMessage =
"8=FIX.4.2\0019=3\00135=D\00149=OnixS\00156=CME\00134=01\00152=20120709-10:10:54\00111=90001008\00121=1"
"\00155=ABC\00154=1\00138=100\00140=1\00160=20120709-10:10:54\00110=000\001";
Message msg;
Message::parse(rawMessage.data(), rawMessage.size(), msg);
msg.validate();
FlatMessage order(msg);
return order;
}
}
const Threading::CpuIndex ThreadAffinity = 1;
int main(int argc, char * argv[])
{
const std::string NetworkInterfaceName = "";
const std::string CounterpartyHost = "";
clog << "OnixS TCPDirect Latency Benchmark Sample." << endl;
#if ONIXS_USE_TCP_DIRECT_EMULATION
clog << "Attention! Emulator is used." << endl;
#else
# ifndef NDEBUG
cerr << "Please use RELEASE build to measure latency." << endl;
return 1;
# endif
#endif
if(CounterpartyHost.empty() || NetworkInterfaceName.empty())
{
cerr << "Please setup `networkInterfaceName` and/or `counterpartyHost`." << endl;
return 1;
}
clog << "Advance usage: TCPDirectLatency numberOfMessages sendPeriodUsec warmupPeriodUsec" << endl;
const unsigned int NumberOfMessages = (argc > 1 ? std::atoi(argv[1]) : 10000);
const unsigned int SendPeriodUsec = (argc > 2 ? (std::atoi(argv[2])) : 100);
const unsigned int WarmupPeriodUsec = (argc > 3 ? std::atoi(argv[3]) : 100);
clog << "Current parameters: numberOfMessages=" << NumberOfMessages << "; SendPeriodUsec=" <<
SendPeriodUsec << "; warmupPeriodUsec=" << WarmupPeriodUsec << endl << endl;
try
{
EngineSettings settings;
settings.licenseStore(LicenseStore)
.listenPort(ListenPort)
.dictionaryFile("../LowLatencyDictionary.xml")
.validateRequiredFields(false)
.validateUnknownFields(false)
.validateUnknownMessages(false)
.receiveSpinningTimeout(10000)
.logBeforeSending(false);
Engine::init(settings);
TCPDirect::Attributes attr;
attr.networkInterface(NetworkInterfaceName);
TCPDirect::Stack reactor(attr);
const Dictionary dictionary("LowLatencyDictionaryId");
#if ONIXS_USE_TCP_DIRECT_EMULATION
AcceptorListener acceptorListener;
Session acceptor(TargetCompId, SenderCompId, dictionary, &acceptorListener, SessionStorageType::MemoryBased);
const Threading::CpuIndex EmulatorThreadsAffinity = 0;
acceptor.sendingThreadAffinity(EmulatorThreadsAffinity)
.receivingThreadAffinity(EmulatorThreadsAffinity)
.logonAsAcceptor();
#endif
Listener listener(NumberOfMessages);
Session initiator(&reactor, SenderCompId, TargetCompId, dictionary, &listener, SessionStorageType::MemoryBased);
initiator.logonAsInitiatorAsync(CounterpartyHost, ListenPort, 60, ONIXS_FIXENGINE_NULLPTR, true);
while(!connected.isReady())
reactor.dispatchEvents();
if(connected.hasException())
connected.get();
FlatMessage order = prepareMessage();
const unsigned int NumberOfWarmupMessages = NumberOfMessages > 1000 ? 1000 : NumberOfMessages;
listener.measureOverhead();
clog << "\nWarm-up phase to make first calls faster..." << endl;
getSessionTimeMarksStatistics(reactor, initiator, order, listener, NumberOfWarmupMessages, SendPeriodUsec, WarmupPeriodUsec);
clog << "\nMeasurement phase..." << endl;
getSessionTimeMarksStatistics(reactor, initiator, order, listener, NumberOfMessages, SendPeriodUsec, WarmupPeriodUsec);
const Threading::SharedFuture<void> disconnected = initiator.logoutAsync("");
while(!disconnected.isReady())
reactor.dispatchEvents();
if(disconnected.hasException())
disconnected.get();
initiator.shutdown();
while(reactor.isQuiescent())
reactor.dispatchEvents();
#if ONIXS_USE_TCP_DIRECT_EMULATION
acceptor.logout();
acceptor.shutdown();
#endif
Engine::shutdown();
SessionTimeMarks::reportStatistics("Latency", listener.marks(), NumberOfMessages, listener.overhead(), true);
}
catch(const std::exception & ex)
{
processSampleException(ex.what());
}
return 0;
}