OnixS C++ FIX Engine  4.12.0
API Documentation
Latency Sample

This sample shows how to measure latency and use the warm-up feature.

Using OpenOnload® Network Stack

OpenOnload is part of Solarflare’s suite of network acceleration technologies.

To achieve better results, use the latency profile. For example:

$ ./run-under-onload.sh 10000 100 100

TCP Loopback Acceleration

The TCP loopback acceleration is turned off by default. It is configured via the environment variables EF_TCP_CLIENT_LOOPBACK and EF_TCP_SERVER_LOOPBACK.

Activate

$ export EF_TCP_CLIENT_LOOPBACK=1 $ export EF_TCP_SERVER_LOOPBACK=1

Verify

$ echo $EF_TCP_CLIENT_LOOPBACK $ echo $EF_TCP_SERVER_LOOPBACK


Source code:


Listener.h:

#include "../PerformanceCounter.h"
#include "../SessionTimeMarks.h"
class MySessionListener ONIXS_FIXENGINE_FINAL : public OnixS::FIX::ISessionListener
{
public:
explicit MySessionListener(size_t numberOfMessages)
: marks_(new OnixS::Sample::SessionTimeMarks[numberOfMessages])
, active_(false)
, counter_(0)
, numberOfMessages_(numberOfMessages) {
std::memset(marks_, 0, sizeof(OnixS::Sample::SessionTimeMarks) * numberOfMessages_);
}
~MySessionListener() ONIXS_FIXENGINE_OVERRIDE {
delete[] marks_;
}
void onInboundApplicationMsg(Message & /*msg*/, Session * /*sn*/) ONIXS_FIXENGINE_OVERRIDE {
return;
OnixS::Sample::PerformanceCounter::current(&marks_[counter_].recvFinish);
++counter_;
ready_.release();
}
void onReceivedBytes(const char * /*bytes*/, size_t /*size*/,
Session * /*session*/) ONIXS_FIXENGINE_NOTHROW ONIXS_FIXENGINE_OVERRIDE {
return;
OnixS::Sample::PerformanceCounter::current(&marks_[counter_].recvStart);
}
void onMessageSending(SequenceNumber /*msgSeqNum*/, char * /*bytes*/, size_t /*size*/,
Session * /*session*/) ONIXS_FIXENGINE_NOTHROW ONIXS_FIXENGINE_OVERRIDE {
return;
OnixS::Sample::PerformanceCounter::current(&marks_[counter_].sendFinish);
}
void active(bool value) ONIXS_FIXENGINE_NOTHROW {
active_ = value;
}
OnixS::Sample::SessionTimeMarks & currentMark() ONIXS_FIXENGINE_NOTHROW {
return marks_[counter_];
}
counter_ = 0;
std::memset(marks_, 0, sizeof(OnixS::Sample::SessionTimeMarks) * numberOfMessages_);
}
void spinWaitReceivedMsg() {
while(!ready_.tryAcquire())
}
OnixS::Sample::SessionTimeMarks * marks_;
private:
bool active_;
size_t counter_;
const size_t numberOfMessages_;
};

Latency.cpp:

#include "Listener.h"
#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
using namespace Settings;
using namespace std;
using namespace OnixS::Sample;
namespace {
void getSessionTimeMarksStatistics(
Session & initiator,
FlatMessage & order,
MySessionListener & listener,
unsigned int numberOfMessages,
unsigned int sendPeriodUsec,
unsigned int warmupPeriodUsec)
{
listener.ready_.acquire();
listener.clear();
listener.ready_.release();
const FlatFieldRef valueRef = order.find(FIX42::Tags::OrderQty);
assert(valueRef);
const FlatFieldKey valueKey = order.allocateKey(valueRef);
for(unsigned int i = 0; i < numberOfMessages; ++i) {
order.set(valueKey, i + 1);
listener.spinWaitReceivedMsg();
OnixS::Sample::SessionTimeMarks & currentMark = listener.currentMark();
PerformanceCounter::current(&currentMark.sendStart);
initiator.send(&order);
PerformanceCounter::current(&currentMark.overallSendFinish);
if(warmupPeriodUsec) {
const PerformanceCounter::Count start = PerformanceCounter::current();
do {
initiator.warmUp(&order);
}
while(PerformanceCounter::elapsedMicroseconds(start) < sendPeriodUsec);
}
else if(sendPeriodUsec)
}
}
}
const OnixS::Threading::CpuIndex SendingThreadAffinity = 2;
const OnixS::Threading::CpuIndex ReceivingThreadAffinity = 4;
const OnixS::Threading::CpuIndex OtherThreadAffinity = 0;
int main(int argc, char * argv[])
{
clog << "OnixS Latency Benchmark Sample." << endl;
clog << "Advance usage: Latency.exe numberOfMessages sendPeriodUsec warmupPeriodUsec" << endl;
#if _DEBUG
cerr << "Please use RELEASE build to measure latency." << endl;
return 1;
#endif
#ifdef _DEBUG
const unsigned int NumberOfMessages = (argc > 1 ? std::atoi(argv[1]) : 1000);
#else
const unsigned int NumberOfMessages = (argc > 1 ? std::atoi(argv[1]) : 10000);
#endif
const unsigned int SendPeriodUsec = (argc > 2 ? (std::atoi(argv[2])) : 100);
const unsigned int WarmupPeriodUsec = (argc > 3 ? std::atoi(argv[3]) : 100);
clog << "Current parameters: numberOfMessages=" << NumberOfMessages << "; SendPeriodUsec=" <<
SendPeriodUsec << "; warmupPeriodUsec=" << WarmupPeriodUsec << endl << endl;
try {
EngineSettings settings;
settings.licenseStore(LicenseStore)
.listenPort(ListenPort)
.dictionaryFile("../LowLatencyDictionary.xml")
.validateRequiredFields(false)
.validateUnknownFields(false)
.validateUnknownMessages(false)
.receiveSpinningTimeout(10000)
.logBeforeSending(false)
.useSpinLock(true);
Engine::init(settings);
const Dictionary dictionary("LowLatencyDictionaryId");
const string rawMessage =
"8=FIX.4.2\0019=3\00135=D\00149=OnixS\00156=CME\00134=01\00152=20120709-10:10:54\00111=90001008\00121=1\00155=ABC\00154=1\00138=100\00140=1\00160=20120709-10:10:54\00110=000\001";
Message msg;
Message::parse(rawMessage.data(), rawMessage.size(), msg);
msg.validate();
FlatMessage order(msg);
MySessionListener listener(NumberOfMessages);
Session acceptor(TargetCompId, SenderCompId, dictionary, &listener,
SessionStorageType::MemoryBased);
acceptor.sendingThreadAffinity(OtherThreadAffinity)
.receivingThreadAffinity(ReceivingThreadAffinity)
.logonAsAcceptor();
Session initiator(SenderCompId, TargetCompId, dictionary, &listener,
SessionStorageType::MemoryBased);
initiator.sendingThreadAffinity(OtherThreadAffinity)
.receivingThreadAffinity(OtherThreadAffinity)
.logonAsInitiator("localhost", ListenPort, 0);
listener.active(true);
const unsigned int NumberOfWarmupMessages = NumberOfMessages > 1000 ? 1000 : NumberOfMessages;
// Warm-up phase to make the first call fast.
getSessionTimeMarksStatistics(initiator, order, listener, NumberOfWarmupMessages,
SendPeriodUsec, WarmupPeriodUsec);
// Resulting measurement phase.
getSessionTimeMarksStatistics(initiator, order, listener, NumberOfMessages, SendPeriodUsec,
WarmupPeriodUsec);
listener.ready_.acquire();
listener.active(false);
acceptor.logout();
initiator.logout();
acceptor.shutdown();
initiator.shutdown();
SessionTimeMarks::reportStatistics("Latency", listener.marks_, NumberOfMessages, false);
Engine::shutdown();
}
catch(const std::exception & ex) {
processSampleException(ex.what());
}
return 0;
}