OnixS C++ CME iLink 3 Binary Order Entry Handler 1.19.4
Users' manual and API documentation
Loading...
Searching...
No Matches
TCPDirect Benchmark Sample

This sample shows how to measure latency on TCPDirect sessions and use the warm-up feature.

TCPDirect is part of Solarflare’s suite of network acceleration technologies.

The TCP loopback is not supported by TCPDirect technology, so the emulator should be started on a remote host (or on a different physically connected card with a configured route to it).

For developing/debugging purposes OnixS iLink-3 provides TCPDirect-mode emulation for Windows over usual sockets, so TCPDirectBenchmark can be run in loopback mode under Windows.

#include "../Common/BenchmarkGatewayListener.h"
#include "../Common/BenchmarkSessionListener.h"
#include "../Common/Helpers.h"
#include "../Common/PerformanceCounter.h"
#include "../Settings/Defaults.h"
#ifndef _WIN32
#include <sched.h> /* header file for POSIX scheduling */
#endif
#include <algorithm>
// #define DEBUGGING
using namespace Samples;
void usage()
{
clog << "Usage: Benchmark [MarketSegmentId] [Host] [Port] (MainThreadAffinity)"
" (NumberOfMessages) (IntervalBetweenSendingUsec) (WarmupIntervalUsec)" << endl;
}
int main(int argc, char* argv[])
{
clog << "CME iLink 3 TCPDirectBenchmark Sample, version " << Session::version() << "." << endl << endl;
#if !defined(NDEBUG) && !defined(DEBUGGING)
cerr << "Please use the RELEASE build to measure latency." << endl;
return 1;
#endif
int marketSegmentId = 99;
// use the IP address belonging to the TcpDirectStack interface
string host = "192.168.0.50";
Port port = 49152;
bool useEmulator = false;
if (argc < 4)
{
#if defined(_WIN32)
useEmulator = true;
#else
usage();
return 1;
#endif
}
else
{
marketSegmentId = atoi(argv[1]);
host = argv[2];
port = atoi(argv[3]);
}
const Threading::CpuIndex MainThreadAffinity = argc > 4 ? atoi(argv[4]) : 1u;
const unsigned int NumberOfMessages = argc > 5 ? atoi(argv[5]) : 300u;
const unsigned int WarmupIntervalUsec = argc > 7 ? atoi(argv[7]) : 10u;
// See https://www.cmegroup.com/confluence/display/EPICSANDBOX/Messaging+Controls
const unsigned int IntervalBetweenSendingUsec = std::max(argc > 6 ? atoi(argv[6]) : 10000u, WarmupIntervalUsec);
BenchmarkGatewayListener gatewayListener;
std::unique_ptr<GatewayEmulatorThread> gateway;
int result = 0;
try
{
SignalHelper::manageLinuxSignals();
SessionSettings settings = fillSettings(useEmulator);
settings
.receiveSpinningTimeout(1000*1000)
.logBeforeSending(false);
if(useEmulator)
gateway.reset(new GatewayEmulatorThread(settings.licenseStores(), host, port, &gatewayListener));
ThisThread::affinity(MainThreadAffinity);
setPriorityAndPolicy();
TcpDirectAttr attr;
// The default interface name could be provided by the environment variable
// ZF_ATTR="interface=<interface-name>", see TCPDirect specs,
//
// or could be set manually:
// attr.set("interface", "interface-name");
//
// Interface names on Windows could be found using 'ipconfig /all' and 'getmac' commands.
// On Windows, it looks like:
// ZF_ATTR="interface={89790CD9-E625-48C7-91DB-4111A6218CA5}"
TcpDirectStack stack(attr);
ReactorBenchmarkListener listener(NumberOfMessages);
listener.measureOverhead();
try {
#if defined(DEBUGGING)
const SessionStorageType::Enum storageType = SessionStorageType::FileBased;
#else
const SessionStorageType::Enum storageType = SessionStorageType::MemoryBased;
#endif
clog << "Parameters: NumberOfMessages=" << NumberOfMessages << "; IntervalBetweenSendingUsec=" << IntervalBetweenSendingUsec
<< "; warmupIntervalUsec=" << WarmupIntervalUsec
<< "; StorageType=" << SessionStorageType::toString(storageType) << '.' << endl;
Session session(stack, settings, marketSegmentId, &listener, storageType);
if(session.negotiated())
session.reset(true);
SharedFuture<void> connected = session.connectAsync(host, port);
while(!connected.is_ready())
stack.dispatchEvents();
if(connected.has_exception())
connected.get();
NewOrderSingle order;
Helper::setOrderFields(order, PartyDetailsListReqID, DefaultSecurityId, DefaultPriceMantissa);
// The TCPDirect technology is NOT thread-safe.
// Therefore, the event dispatching and message sending should be performed from the same thread.
clog << "\nWarm-up phase to make first calls faster..." << endl;
listener.collectSessionTimeMarks(
stack, session, order, NumberOfMessages, IntervalBetweenSendingUsec, WarmupIntervalUsec);
if(listener.receivedAllMessages()) {
clog << "\nMeasurement phase..." << endl;
listener.collectSessionTimeMarks(
stack, session, order, NumberOfMessages, IntervalBetweenSendingUsec, WarmupIntervalUsec);
}
SharedFuture<void> disconnected = session.disconnectAsync();
while(!disconnected.is_ready())
stack.dispatchEvents();
if(disconnected.has_exception())
disconnected.get();
} catch(const exception& ex) {
cerr << "\nEXCEPTION: " << ex.what() << endl;
result = 1;
}
while(!stack.isQuiescent())
stack.dispatchEvents();
if(listener.receivedAllMessages()) {
if(listener.packetGroupingDetected())
clog << "Attention: packet grouping detected! Increase the interval between sending!" << endl;
BenchmarkData::reportResults("Latency", listener.sendMarks(), listener.receiveMarks(), NumberOfMessages, listener.overhead());
}
#ifndef _WIN32
ThisThread::policy(SCHED_OTHER);
#endif
}
catch (const exception& ex) {
cerr << "\nEXCEPTION: " << ex.what() << endl;
result = 1;
}
if(gateway)
{
try
{
gateway->wait();
}
catch(...)
{
// Emulator exception (if any) is already reported from Emulator's thread
}
}
return result;
}
#include "../Common/PerformanceCounter.inc.h"