OnixS C++ CME iLink 3 Binary Order Entry Handler  1.18.9
API Documentation
TCPDirect Benchmark Sample

This sample shows how to measure latency on TCPDirect sessions and use the warm-up feature.

Using TCPDirect Network Stack

TCPDirect is part of Solarflare’s suite of network acceleration technologies.

The TCP loopback is not supported by TCPDirect technology, so the emulator should be started on a remote host (or on a different physically connected card with a configured route to it).

For developing/debugging purposes OnixS iLink-3 provides TCPDirect-mode emulation for Windows over usual sockets, so TCPDirectBenchmark can be run in loopback mode under Windows.

Source code

#if defined(ONIXS_ILINK3_HAS_GATEWAY_EMULATOR)
#include "../Common/BenchmarkGatewayListener.h"
#endif
#include "../Common/BenchmarkSessionListener.h"
#include "../Common/Helpers.h"
#include "../Common/PerformanceCounter.h"
#include "../Settings/Defaults.h"
#ifndef _WIN32
#include <sched.h> /* header file for POSIX scheduling */
#endif
#include <algorithm>
// #define DEBUGGING
using namespace Samples;
void usage()
{
clog << "Usage: Benchmark [MarketSegmentId] [Host] [Port] (MainThreadAffinity)"
" (NumberOfMessages) (IntervalBetweenSendingUsec) (WarmupIntervalUsec)" << endl;
}
int main(int argc, char* argv[])
{
clog << "CME iLink 3 TCPDirectBenchmark Sample, version " << Session::version() << "." << endl << endl;
#if !defined(NDEBUG) && !defined(DEBUGGING)
cerr << "Please use the RELEASE build to measure latency." << endl;
return 1;
#endif
int marketSegmentId = 99;
// use the IP address belonging to the TcpDirectStack interface
string host = "192.168.0.50";
Port port = 49152;
bool useEmulator = false;
if (argc < 4)
{
#if defined(_WIN32)
useEmulator = true;
#else
usage();
return 1;
#endif
}
else
{
marketSegmentId = atoi(argv[1]);
host = argv[2];
port = atoi(argv[3]);
}
const Threading::CpuIndex MainThreadAffinity = argc > 4 ? atoi(argv[4]) : 1u;
const unsigned int NumberOfMessages = argc > 5 ? atoi(argv[5]) : 300u;
const unsigned int WarmupIntervalUsec = argc > 7 ? atoi(argv[7]) : 10u;
// See https://www.cmegroup.com/confluence/display/EPICSANDBOX/Messaging+Controls
const unsigned int IntervalBetweenSendingUsec = std::max(argc > 6 ? atoi(argv[6]) : 10000u, WarmupIntervalUsec);
#if defined(ONIXS_ILINK3_HAS_GATEWAY_EMULATOR)
BenchmarkGatewayListener gatewayListener;
std::unique_ptr<GatewayEmulatorThread> gateway;
#endif
int result = 0;
try
{
SignalHelper::manageLinuxSignals();
SessionSettings settings = fillSettings(useEmulator);
settings
.receiveSpinningTimeout(1000*1000)
.logBeforeSending(false);
#if defined(ONIXS_ILINK3_HAS_GATEWAY_EMULATOR)
if(useEmulator)
gateway.reset(new GatewayEmulatorThread(settings.licenseStores(), host, port, &gatewayListener));
#endif
ThisThread::affinity(MainThreadAffinity);
setPriorityAndPolicy();
TcpDirectAttr attr;
// The default interface name could be provided by the environment variable
// ZF_ATTR="interface=<interface-name>", see TCPDirect specs,
//
// or could be set manually:
// attr.set("interface", "interface-name");
//
// Interface names on Windows could be found using 'ipconfig /all' and 'getmac' commands.
// On Windows, it looks like:
// ZF_ATTR="interface={89790CD9-E625-48C7-91DB-4111A6218CA5}"
TcpDirectStack stack(attr);
ReactorBenchmarkListener listener(NumberOfMessages);
listener.measureOverhead();
try {
#if defined(DEBUGGING)
const SessionStorageType::Enum storageType = SessionStorageType::FileBased;
#else
const SessionStorageType::Enum storageType = SessionStorageType::MemoryBased;
#endif
clog << "Parameters: NumberOfMessages=" << NumberOfMessages << "; IntervalBetweenSendingUsec=" << IntervalBetweenSendingUsec
<< "; warmupIntervalUsec=" << WarmupIntervalUsec
<< "; StorageType=" << SessionStorageType::toString(storageType) << '.' << endl;
Session session(stack, settings, marketSegmentId, &listener, storageType);
if(session.negotiated())
session.reset(true);
SharedFuture<void> connected = session.connectAsync(host, port);
while(!connected.is_ready())
stack.dispatchEvents();
if(connected.has_exception())
connected.get();
NewOrderSingle order;
Helper::setOrderFields(order, PartyDetailsListReqID, DefaultSecurityId, DefaultPriceMantissa);
// The TCPDirect technology is NOT thread-safe.
// Therefore, the event dispatching and message sending should be performed from the same thread.
clog << "\nWarm-up phase to make first calls faster..." << endl;
listener.collectSessionTimeMarks(
stack, session, order, NumberOfMessages, IntervalBetweenSendingUsec, WarmupIntervalUsec);
if(listener.receivedAllMessages()) {
clog << "\nMeasurement phase..." << endl;
listener.collectSessionTimeMarks(
stack, session, order, NumberOfMessages, IntervalBetweenSendingUsec, WarmupIntervalUsec);
}
SharedFuture<void> disconnected = session.disconnectAsync();
while(!disconnected.is_ready())
stack.dispatchEvents();
if(disconnected.has_exception())
disconnected.get();
} catch(const exception& ex) {
cerr << "\nEXCEPTION: " << ex.what() << endl;
result = 1;
}
while(!stack.isQuiescent())
stack.dispatchEvents();
if(listener.receivedAllMessages()) {
if(listener.packetGroupingDetected())
clog << "Attention: packet grouping detected! Increase the interval between sending!" << endl;
BenchmarkData::reportResults("Latency", listener.sendMarks(), listener.receiveMarks(), NumberOfMessages, listener.overhead());
}
#ifndef _WIN32
ThisThread::policy(SCHED_OTHER);
#endif
}
catch (const exception& ex) {
cerr << "\nEXCEPTION: " << ex.what() << endl;
result = 1;
}
#if defined(ONIXS_ILINK3_HAS_GATEWAY_EMULATOR)
if(gateway) {
try {
gateway->wait();
}
catch(...) {
// Emulator exception (if any) is already reported from Emulator's thread
}
}
#endif
return result;
}
#include "../Common/PerformanceCounter.inc.h"