OnixS C++ CME iLink 3 Binary Order Entry Handler  1.18.9
API Documentation
Benchmark Sample

This sample shows how to measure latency and use the warm-up feature.

Using OpenOnload® Network Stack

OpenOnload is part of Solarflare’s suite of network acceleration technologies.

To achieve better results, use the latency profile. For example:

$ ./run-under-onload.sh 54 192.168.28.1 20054 1 2

TCP Loopback Acceleration

The TCP loopback acceleration is turned off by default. It is configured via the environment variables EF_TCP_CLIENT_LOOPBACK and EF_TCP_SERVER_LOOPBACK.

Activate

$ export EF_TCP_CLIENT_LOOPBACK=1 $ export EF_TCP_SERVER_LOOPBACK=1

Verify

$ echo $EF_TCP_CLIENT_LOOPBACK
$ echo $EF_TCP_SERVER_LOOPBACK

Source code

#if defined(ONIXS_ILINK3_HAS_GATEWAY_EMULATOR)
#include "../Common/BenchmarkGatewayListener.h"
#endif
#include "../Common/BenchmarkSessionListener.h"
#include "../Common/Helpers.h"
#include "../Common/PerformanceCounter.h"
#include "../Settings/Defaults.h"
#ifndef _WIN32
#include <sched.h> /* header file for POSIX scheduling */
#endif
#include <algorithm>
// #define DEBUGGING
using namespace Samples;
void usage()
{
clog << "Usage: Benchmark [MarketSegmentId] [Host] [Port] (MainThreadAffinity) (ReceivingThreadAffinity) (AuxiliarySendingThreadAffinity)"
" (NumberOfMessages) (IntervalBetweenSendingUsec) (WarmupIntervalUsec)" << endl;
}
int main(int argc, char* argv[])
{
clog << "CME iLink 3 Benchmark Sample, version " << Session::version() << "." << endl << endl;
#if !defined(NDEBUG) && !defined(DEBUGGING)
cerr << "Please use the RELEASE build to measure latency." << endl;
return 1;
#endif
int marketSegmentId = 99;
string host = "127.0.0.1";
Port port = 49152;
bool useEmulator = false;
if (argc < 4)
{
#if defined(ONIXS_ILINK3_HAS_GATEWAY_EMULATOR)
useEmulator = true;
clog << "WARNING: gateway emulator is used!" << endl;
#else
usage();
return 1;
#endif
}
else
{
marketSegmentId = atoi(argv[1]);
host = argv[2];
port = atoi(argv[3]);
}
const Threading::CpuIndex MainThreadAffinity = argc > 4 ? atoi(argv[4]) : 1u;
const Threading::CpuIndex ReceivingThreadAffinity = argc > 5 ? atoi(argv[5]) : 2u;
const Threading::CpuIndex AuxiliarySendingThreadAffinity = argc > 6 ? atoi(argv[6]) : 3u;
const unsigned int NumberOfMessages = argc > 7 ? atoi(argv[7]) : 300u;
const unsigned int WarmupIntervalUsec = argc > 9 ? atoi(argv[9]) : 10u;
// See https://www.cmegroup.com/confluence/display/EPICSANDBOX/Messaging+Controls
const unsigned int IntervalBetweenSendingUsec = std::max(argc > 8 ? atoi(argv[8]) : 10000u, WarmupIntervalUsec);
#if defined(ONIXS_ILINK3_HAS_GATEWAY_EMULATOR)
BenchmarkGatewayListener gatewayListener;
std::unique_ptr<GatewayEmulatorThread> gateway;
#endif
int result = 0;
try
{
SignalHelper::manageLinuxSignals();
SessionSettings settings = fillSettings(useEmulator);
settings
.receiveSpinningTimeout(1000*1000)
.useSpinLock(true)
.logBeforeSending(false);
#if defined(ONIXS_ILINK3_HAS_GATEWAY_EMULATOR)
if(useEmulator)
gateway.reset(new GatewayEmulatorThread(settings.licenseStores(), host, port, &gatewayListener));
#endif
class BenchmarkListener ONIXS_ILINK3_FINAL : public BenchmarkSessionListener
{
public:
explicit BenchmarkListener(size_t numberOfMessages)
: BenchmarkSessionListener(numberOfMessages)
, receivedAllMessages_(0)
{}
void onError(SessionErrorReason::Enum reason, const std::string & description, Session * session, Messaging::SbeMessage msg) ONIXS_ILINK3_OVERRIDE {
BenchmarkSessionListener::onError(reason, description, session, msg);
receivedAllMessages_.release();
}
void collectSessionTimeMarks(Session & session, Samples::NewOrderSingle & order,
unsigned int numberOfMessages, unsigned int sendPeriodUsec, unsigned int warmupPeriodUsec) {
resetBenchmark();
std::string clOrdId;
clOrdId.reserve(std::numeric_limits<UInt32>::digits10 + 2);
// When the session is used with the OnixS C++ CME Market Data Handler,
// this value can be taken from the packet receiving time
Messaging::Timestamp sendingTime = UtcWatch::now();
for(UInt32 messageCounter = 0; messageCounter < numberOfMessages; ++messageCounter) {
clOrdId.clear();
order->setOrderQty(messageCounter + 1);
Messaging::toStr(clOrdId, messageCounter);
order->setClOrdId(clOrdId);
beforeSending();
session.send(order, sendingTime);
afterSending();
sendingTime = UtcWatch::now();
if(warmupPeriodUsec) {
const PerformanceCounter::Count start = PerformanceCounter::current();
do {
ThisThread::spinWait(warmupPeriodUsec);
session.warmUp(order, sendingTime);
} while(PerformanceCounter::elapsedMicroseconds(start) < sendPeriodUsec);
}
else if(sendPeriodUsec) {
ThisThread::spinWait(sendPeriodUsec);
}
}
waitUntilReceivedAllMessages();
}
private:
void onReceivedAllMessages() ONIXS_ILINK3_OVERRIDE {
BenchmarkSessionListener::onReceivedAllMessages();
receivedAllMessages_.release();
}
void waitUntilReceivedAllMessages() {
receivedAllMessages_.acquire();
}
Semaphore receivedAllMessages_;
}
listener(NumberOfMessages);
#if defined(DEBUGGING)
const SessionStorageType::Enum storageType = SessionStorageType::FileBased;
#else
const SessionStorageType::Enum storageType = SessionStorageType::MemoryBased;
#endif
clog << "Parameters: NumberOfMessages=" << NumberOfMessages << "; IntervalBetweenSendingUsec=" << IntervalBetweenSendingUsec
<< "; warmupIntervalUsec=" << WarmupIntervalUsec
<< "; StorageType=" << SessionStorageType::toString(storageType) << '.' << endl;
Session session(settings, marketSegmentId, &listener, storageType);
if(session.negotiated())
session.reset(true);
ThisThread::affinity(MainThreadAffinity);
setPriorityAndPolicy(&session);
listener.measureOverhead();
session
.receivingThreadAffinity(ReceivingThreadAffinity)
.sendingThreadAffinity(AuxiliarySendingThreadAffinity)
.connect(host, port);
NewOrderSingle order;
Helper::setOrderFields(order, PartyDetailsListReqID, DefaultSecurityId, DefaultPriceMantissa);
clog << "\nWarm-up phase to make first calls faster..." << endl;
listener.collectSessionTimeMarks(session, order, NumberOfMessages, IntervalBetweenSendingUsec, WarmupIntervalUsec);
clog << "\nMeasurement phase..." << endl;
listener.collectSessionTimeMarks(session, order, NumberOfMessages, IntervalBetweenSendingUsec, WarmupIntervalUsec);
TcpInfo tcpInfo;
const bool hasTcpInfo = session.getTcpInfo(tcpInfo);
session.disconnect();
if(listener.packetGroupingDetected())
clog << "Attention: packet grouping detected! Increase the interval between sending!" << endl;
BenchmarkData::reportResults("Latency", listener.sendMarks(), listener.receiveMarks(), NumberOfMessages, listener.overhead());
if(hasTcpInfo)
clog << "\n\n" << tcpInfo.toString();
#ifndef _WIN32
ThisThread::policy(SCHED_OTHER);
#endif
}
catch (const exception& ex)
{
cerr << "\nEXCEPTION: " << ex.what() << endl;
result = 1;
}
#if defined(ONIXS_ILINK3_HAS_GATEWAY_EMULATOR)
if(gateway) {
try {
gateway->wait();
}
catch(...) {
// Emulator exception (if any) is already reported from Emulator's thread
}
}
#endif
return result;
}
#include "../Common/PerformanceCounter.inc.h"