This sample shows how to measure latency and use the warm-up feature using TCPDIrect® technology.
#include "../SessionTimeMarks.h"
#include "../PerformanceCounter.h"
#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
#ifdef _WIN32
# define ONIXS_USE_TCP_DIRECT_EMULATION 1
#else
# define ONIXS_USE_TCP_DIRECT_EMULATION 0
#endif
using namespace Settings;
using namespace OnixS::Sample;
namespace {
#if ONIXS_USE_TCP_DIRECT_EMULATION
{
public:
AcceptorListener()
: orderMessageType_(StringRef("D"))
, execReport_("8", FixProtocolVersion)
{
execReport_.set(Tags::OrderID, std::string("90001008"));
}
void onInboundApplicationMsg(
Message & msg,
Session * sn) ONIXS_FIXENGINE_OVERRIDE
{
if(isOrder(msg))
{
execReport_.set(Tags::OrderQty, msg.
get(Tags::OrderQty));
sn->send(&execReport_);
}
}
{
return msg.
type() == orderMessageType_;
}
private:
const StringRef orderMessageType_;
};
#endif
{
public:
explicit Listener(size_t numberOfMessages)
: marks_(new SessionTimeMarks[numberOfMessages])
, active_(false)
, counter_(0)
, numberOfMessages_(numberOfMessages)
{
}
~Listener() ONIXS_FIXENGINE_OVERRIDE
{
delete[] marks_;
}
{
return;
assert(counter_ < numberOfMessages_);
PerformanceCounter::current(&marks_[counter_].recvFinish);
++counter_;
}
{
return;
assert(counter_ < numberOfMessages_);
PerformanceCounter::current(&marks_[counter_].recvStart);
}
{
return;
assert(counter_ < numberOfMessages_);
PerformanceCounter::current(&marks_[counter_].sendFinish);
}
{
assert(active_);
assert(counter_ < numberOfMessages_);
PerformanceCounter::current(&marks_[counter_].sendStart);
}
{
assert(active_);
assert(counter_ < numberOfMessages_);
PerformanceCounter::current(&marks_[counter_].overallSendFinish);
}
{
counter_ = 0;
std::memset(marks_, 0, sizeof(SessionTimeMarks) * numberOfMessages_);
}
{
return counter_;
}
{
active_ = state;
}
{
return marks_;
}
void measureOverhead()
{
active_ = true;
const size_t iterations = numberOfMessages_;
for (size_t i = 0; i < iterations; ++i)
{
beforeSending();
onMessageSending(0, ONIXS_FIXENGINE_NULLPTR, 0, ONIXS_FIXENGINE_NULLPTR);
afterSending();
onReceivedBytes(ONIXS_FIXENGINE_NULLPTR, 0, ONIXS_FIXENGINE_NULLPTR);
onInboundApplicationMsg(dummyMessage, ONIXS_FIXENGINE_NULLPTR);
}
Durations sendDurations, overallSendDurations, receiveDurations, sendAndReceiveDurations, sendToReceiveDurations;
for (size_t i = 0; i < iterations; ++i)
{
const SessionTimeDurations durations = SessionTimeMarks::toDurations(marks()[i]);
receiveDurations.push_back(durations.receive);
sendDurations.push_back(durations.send);
overallSendDurations.push_back(durations.overallSend);
sendAndReceiveDurations.push_back(durations.sendAndReceive);
sendToReceiveDurations.push_back(durations.sendToReceive);
}
std::sort(sendDurations.begin(), sendDurations.end());
std::sort(overallSendDurations.begin(), overallSendDurations.end());
std::sort(receiveDurations.begin(), receiveDurations.end());
std::sort(sendAndReceiveDurations.begin(), sendAndReceiveDurations.end());
std::sort(sendToReceiveDurations.begin(), sendToReceiveDurations.end());
overhead_.send = SessionTimeMarks::median(sendDurations);
overhead_.overallSend = SessionTimeMarks::median(overallSendDurations);
overhead_.receive = SessionTimeMarks::median(receiveDurations);
overhead_.sendAndReceive = SessionTimeMarks::median(sendAndReceiveDurations);
overhead_.sendToReceive = SessionTimeMarks::median(sendToReceiveDurations);
active_ = false;
}
{
return overhead_;
}
private:
SessionTimeMarks* marks_;
bool active_;
size_t counter_;
const size_t numberOfMessages_;
SessionTimeDurations overhead_;
};
void getSessionTimeMarksStatistics(
TCPDirect::Stack& stack,
Session & initiator,
FlatMessage & order, Listener & listener,
unsigned int numberOfMessages,
unsigned int sendPeriodUsec, unsigned int warmupPeriodUsec)
{
listener.clear();
assert(valueRef);
const FlatFieldKey valueKey = order.
allocateKey(valueRef);
listener.active(true);
for(unsigned int messageCounter = 1; messageCounter <= numberOfMessages; ++messageCounter)
{
order.
set(valueKey, messageCounter);
listener.beforeSending();
listener.afterSending();
if(warmupPeriodUsec || sendPeriodUsec)
{
const PerformanceCounter::Count start = PerformanceCounter::current();
PerformanceCounter::Span elapsedMicroseconds = 0;
do
{
do
{
stack.dispatchEvents();
}
while((elapsedMicroseconds = PerformanceCounter::elapsedMicroseconds(start)) < warmupPeriodUsec);
stack.dispatchEvents();
elapsedMicroseconds = PerformanceCounter::elapsedMicroseconds(start);
}
while(elapsedMicroseconds < sendPeriodUsec);
}
while(listener.counter() < messageCounter)
stack.dispatchEvents();
}
listener.active(false);
}
{
const std::string rawMessage =
"8=FIX.4.2\0019=3\00135=D\00149=OnixS\00156=CME\00134=01\00152=20120709-10:10:54\00111=90001008\00121=1"
"\00155=ABC\00154=1\00138=100\00140=1\00160=20120709-10:10:54\00110=000\001";
Message::parse(rawMessage.data(), rawMessage.size(), msg);
return order;
}
}
int main(int argc, char * argv[])
{
const std::string NetworkInterfaceName = "";
const std::string CounterpartyHost = "";
clog << "OnixS TCPDirect Latency Benchmark Sample." << endl;
#if ONIXS_USE_TCP_DIRECT_EMULATION
clog << "Attention! Emulator is used." << endl;
#else
# ifndef NDEBUG
cerr << "Please use RELEASE build to measure latency." << endl;
return 1;
# endif
#endif
if(CounterpartyHost.empty() || NetworkInterfaceName.empty())
{
cerr << "Please setup `networkInterfaceName` and/or `counterpartyHost`." << endl;
return 1;
}
clog << "Advance usage: TCPDirectLatency numberOfMessages sendPeriodUsec warmupPeriodUsec" << endl;
const unsigned int NumberOfMessages = (argc > 1 ? std::atoi(argv[1]) : 10000);
const unsigned int SendPeriodUsec = (argc > 2 ? (std::atoi(argv[2])) : 100);
const unsigned int WarmupPeriodUsec = (argc > 3 ? std::atoi(argv[3]) : 100);
clog << "Current parameters: numberOfMessages=" << NumberOfMessages << "; SendPeriodUsec=" <<
SendPeriodUsec << "; warmupPeriodUsec=" << WarmupPeriodUsec << endl << endl;
try
{
.validateRequiredFields(false)
.validateUnknownFields(false)
.validateUnknownMessages(false)
.receiveSpinningTimeout(10000)
.logBeforeSending(false);
Engine::init(settings);
TCPDirect::Attributes attr;
attr.networkInterface(NetworkInterfaceName);
TCPDirect::Stack reactor(attr);
const Dictionary dictionary(
"LowLatencyDictionaryId");
#if ONIXS_USE_TCP_DIRECT_EMULATION
AcceptorListener acceptorListener;
Session acceptor(TargetCompId, SenderCompId, dictionary, &acceptorListener, SessionStorageType::MemoryBased);
acceptor.sendingThreadAffinity(EmulatorThreadsAffinity)
.receivingThreadAffinity(EmulatorThreadsAffinity)
.logonAsAcceptor();
#endif
Listener listener(NumberOfMessages);
Session initiator(&reactor, SenderCompId, TargetCompId, dictionary, &listener, SessionStorageType::MemoryBased);
reactor.dispatchEvents();
const unsigned int NumberOfWarmupMessages = NumberOfMessages > 1000 ? 1000 : NumberOfMessages;
listener.measureOverhead();
clog << "\nWarm-up phase to make first calls faster..." << endl;
getSessionTimeMarksStatistics(reactor, initiator, order, listener, NumberOfWarmupMessages, SendPeriodUsec, WarmupPeriodUsec);
clog << "\nMeasurement phase..." << endl;
getSessionTimeMarksStatistics(reactor, initiator, order, listener, NumberOfMessages, SendPeriodUsec, WarmupPeriodUsec);
reactor.dispatchEvents();
while(reactor.isQuiescent())
reactor.dispatchEvents();
#if ONIXS_USE_TCP_DIRECT_EMULATION
#endif
Engine::shutdown();
SessionTimeMarks::reportStatistics("Latency", listener.marks(), NumberOfMessages, listener.overhead(), true);
}
catch(const std::exception & ex)
{
processSampleException(ex.what());
}
return 0;
}