OnixS C++ CME MDP Premium Market Data Handler  5.8.10
API Documentation
Benchmark Sample

This sample demonstetes how to benchmark the Handler.

Source code

using namespace OnixS::CME::MDH;
// Benchmarking parameters and settings.
namespace {
// Defines what is to be measured by the sample
// like messaging API or book maintenance machinery.
struct MeasurementTarget
{
enum Enum
{
// Sample will measure time starting from data
// reception till the moment it's passed through
// MarketDataListener interface.
Messaging,
// Sample will measure time starting from data
// reception till the moment updated MBO book is
// delivered through SecurityListener::onBookUpdate
// callback.
MboBooks,
// Sample will measure time starting from data reception
// till the moment updated direct book is delivered through
// SecurityListener::onBookUpdate callback.
DirectBooks,
// Sample will measure time starting from data reception
// till the moment updated implied book is delivered through
// SecurityListener::onBookUpdate callback.
ImpliedBooks,
// Sample will measure time starting from data reception
// till the moment updated consolidated book is delivered
// through SecurityListener::onBookUpdate callback.
ConsolidatedBooks
};
};
// Deserializes measurement target from text representation.
bool fromStr(MeasurementTarget::Enum& value, const Char* str, size_t length)
{
const StrRef serialized(str, length);
if (serialized == "Messaging")
{
value = MeasurementTarget::Messaging;
return true;
}
if (serialized == "MboBooks")
{
value = MeasurementTarget::MboBooks;
return true;
}
if (serialized == "DirectBooks")
{
value = MeasurementTarget::DirectBooks;
return true;
}
if (serialized == "ImpliedBooks")
{
value = MeasurementTarget::ImpliedBooks;
return true;
}
if (serialized == "ConsolidatedBooks")
{
value = MeasurementTarget::ConsolidatedBooks;
return true;
}
return false;
}
// Configures book management according to input parameters
// (if user wants to measure book maintenance, then it must
// be enabled for a book type of interest).
void apply(BookManagement& management, MeasurementTarget::Enum target)
{
switch (target)
{
case MeasurementTarget::MboBooks:
management.mboBooks().maintain(true);
break;
case MeasurementTarget::DirectBooks:
management.directBooks().maintain(true);
break;
case MeasurementTarget::ImpliedBooks:
management.impliedBooks().maintain(true);
break;
case MeasurementTarget::ConsolidatedBooks:
management.consolidatedBooks().maintain(true);
break;
default:
break;
}
}
//
typedef TimeSpan::Ticks TimeDiff;
typedef std::vector<TimeDiff> TimeDiffs;
typedef std::vector<Timestamp> Timestamps;
struct DeltaT
{
TimeDiff operator()(const Timestamp& l, const Timestamp& r) const
{
return (l - r).ticks();
}
};
TimeDiffs calculateTimeDiffs(const Timestamps& timestamps)
{
TimeDiffs diffs;
diffs.reserve(timestamps.size() - 1);
std::transform(timestamps.begin() + 1, timestamps.end(), timestamps.begin(), std::back_inserter(diffs), DeltaT());
return diffs;
}
TimeDiff average(const TimeDiffs& diffs)
{
const size_t sampleQty = diffs.size();
assert(0 < sampleQty);
return (std::accumulate(diffs.begin(), diffs.end(), TimeDiff(0)) / sampleQty);
}
TimeDiff percentile(const TimeDiffs& diffs, unsigned percent)
{
const size_t sampleQty = diffs.size();
assert(0 < sampleQty);
assert(50 <= percent && 100 > percent);
const size_t multipliedIndex = sampleQty * percent;
const size_t index = multipliedIndex / 100;
return (0 == (multipliedIndex % 100) && 1 < sampleQty) ? (diffs[index] + diffs[index - 1]) / 2
: (diffs[index]);
}
class Summary
{
public:
explicit Summary(const TimeDiffs& timeDiffs)
: length_(timeDiffs.size())
, min_(timeDiffs[0])
, max_(*timeDiffs.rbegin())
, average_(average(timeDiffs))
, p50_(percentile(timeDiffs, 50))
, p95_(percentile(timeDiffs, 95))
, p99_(percentile(timeDiffs, 99))
{
}
void dump(std::ostream& output, unsigned unitScale, const char* unitName) const
{
assert(0 < unitScale);
const double realScale = static_cast<double>(unitScale);
output << std::fixed << std::setprecision(3)
<< "Data processing result (in " << unitName << "): " << std::endl << std::endl
<< "Minimal: " << (min_ / realScale) << std::endl
<< "Median: " << (p50_ / realScale) << std::endl
<< "Mean: " << (average_ / realScale) << std::endl
<< "95%: " << (p95_ / realScale) << std::endl
<< "99%: " << (p99_ / realScale) << std::endl
<< "Maximal: " << (max_ / realScale) << std::endl << std::endl
<< "Samples: " << length_ << ". " << std::endl;
}
private:
size_t length_;
TimeDiff min_;
TimeDiff max_;
TimeDiff average_;
TimeDiff p50_;
TimeDiff p95_;
TimeDiff p99_;
};
struct FilenameFilter
{
bool operator()(char symbol) const
{
return (ONIXS_CMEMDH_NULLPTR != strchr("-:.", symbol));
}
};
std::string statisticsFilename(UInt32 channel)
{
std::string filename("Benchmark-Statistics-Channel");
filename += toStr(channel);
filename += "-";
std::string timestamp = toStr(UtcWatch::now());
timestamp.erase(std::remove_if(timestamp.begin(), timestamp.end(), FilenameFilter()), timestamp.end());
filename += timestamp;
filename += ".csv";
return filename;
}
class StatisticsSerializer
{
public:
explicit StatisticsSerializer(std::ostream& output)
: output_(&output)
{
}
void operator()(TimeDiff latency) const
{
*output_ << latency << std::endl;
}
private:
std::ostream* output_;
};
void outputStatistics(std::ostream& output, const TimeDiffs& latencies)
{
std::for_each(latencies.begin(), latencies.end(), StatisticsSerializer(output));
}
// Fine turning of working thread can
// be done in bounds of given callback.
void updateThreadPriority()
{
#if defined(_WIN32)
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);
#endif
}
} // namespace
// Handles sample input parameters.
class Configuration
: public ChannelConfiguration
, public SessionConfiguration
, public FeedEngineConfiguration
, public ConnectivityConfiguration
, public FeedConfiguration
{
public:
Configuration(size_t qty, Char** args)
: ConfigurationBase(qty, args)
, ChannelConfiguration(qty, args, 310)
, SessionConfiguration(qty, args)
, FeedEngineConfiguration(qty, args, 0)
, ConnectivityConfiguration(qty, args)
, FeedConfiguration(qty, args)
{
}
MeasurementTarget::Enum measurementTarget() const
{
return argOrDefault(options(), "measure", MeasurementTarget::DirectBooks);
}
private:
void showOptions(std::ostream& out) const ONIXS_CMEMDH_OVERRIDE
{
ChannelConfiguration::showOptions(out);
out << " --measure <target>" << std::endl
<< "\tDefines benchmarking target. " << std::endl
<< "\tSupported values are: Messaging, MboBooks, DirectBooks, "
"ImpliedBooks, ConsolidatedBooks. The first one measures market "
"data processing only. Others measure order book maintenance. "
<< std::endl
<< std::endl;
SessionConfiguration::showOptions(out);
FeedEngineConfiguration::showOptions(out);
ConnectivityConfiguration::showOptions(out);
FeedConfiguration::showOptions(out);
}
};
// Measures latency of data processing since the moment data is
// received and till the moment updated book is delivered to the user.
class Application
: SecurityListener
, MarketDataListener
{
public:
// Configures sample.
Application(const Configuration& configuration)
: configuration_(configuration)
, watch_(configuration.watch())
, target_(configuration.measurementTarget())
, state_(Suspended)
{
}
// Allows instances to be destroyed.
~Application() {}
// Spawns latency measuring.
void run()
{
prepare();
const ScopedPtr<NetFeedEngine> feedEngine(constructFeedEngine(configuration_));
const ScopedPtr<TcpRecoveryService> tcpRecovery(constructTcpRecovery(configuration_));
const ScopedPtr<NullLogger> logger(new NullLogger());
// Constructs handler.
Handler handler;
handler.bindLogger(*logger);
HandlerSettings& settings = handler.settings();
setLicense(settings);
settings.channel(configuration_.channel()).connectivityConfigurationFile(configuration_.connectivityFile());
settings.feeds().engine(feedEngine.get());
apply(settings.feeds(), configuration_);
apply(settings.session(), configuration_, tcpRecovery.get());
apply(settings.bookManagement(), configuration_.measurementTarget());
// Associates events processing with handler.
handler.settings().listeners().handler(this);
handler.settings().listeners().marketData(this);
// Binds security-related events only if
// there's a particular interest in them to
// avoid security-related load onto processing.
if (MeasurementTarget::Messaging != target_)
handler.settings().listeners().security(this);
// Starting live market data processing.
std::cout << "Starting processing market data.. " << std::endl
<< std::endl
<< "Press Ctrl+C to stop processing market data, "
"output benchmarking results and exit. "
<< std::endl;
handler.start();
while (!InterruptDetector::instance().detected())
{
try
{
feedEngine->process();
}
catch (const std::exception& ex)
{
std::cerr << std::endl
<< "WARNING! Feed engine raised an "
"issue while processing incoming data. "
<< ex.what() << std::endl;
}
}
std::cout << "Stopping processing market data.. " << std::endl;
handler.stop();
saveMeasurements();
std::cout << std::endl << "Done. " << std::endl << std::endl;
}
static void identify()
{
std::cout << "Benchmarking for the CME MDP Premium Market Data Handler, v" << toStr(Version::current()) << "."
<< std::endl
<< std::endl;
}
private:
// Each measurement represents cycling over given states.
enum State
{
// Benchmarking is suspended due
// to recovery or issue handling.
Suspended,
// Latency measuring is started and
// application waits for incoming data.
WaitingForBegin,
// Application received market data packet
// and waits for data processing results.
WaitingForProcessingResults,
// Application obtained market data processing
// results and now waits till the beginning of
// next measuring iteration.
FinalizingMeasuring
};
static const unsigned int MaxProcessedPackets = 1000000;
// Benchmarking options.
const Configuration& configuration_;
// Watch to be used to measure latency.
WatchService& watch_;
// Measurement target (cached value).
const MeasurementTarget::Enum target_;
// Latency measuring state.
State state_;
// Received packets timestamps
Timestamps receivedTimestamps_;
// To gather statistics.
TimeDiffs latencies_;
// Prohibits copy construction.
Application(const Application&);
// No re-assignments for this time.
Application& operator=(const Application&);
// Detects and initializes measuring
// for single processing iteration.
void onPacket(Handler&, const PacketArgs& args) ONIXS_CMEMDH_OVERRIDE
{
if (WaitingForBegin == state_)
state_ = WaitingForProcessingResults;
if(MaxProcessedPackets < receivedTimestamps_.size())
return;
if (WaitingForProcessingResults == state_)
receivedTimestamps_.push_back(args.receiveTime());
}
// Measures latency for single processing iteration.
void measureLatency()
{
if (WaitingForProcessingResults == state_)
{
state_ = FinalizingMeasuring;
if((MaxProcessedPackets < receivedTimestamps_.size()) || receivedTimestamps_.empty())
return;
latencies_.push_back(DeltaT()(watch_.now(), receivedTimestamps_.back()));
}
}
// Measuring message processing machinery.
void measureMessagingLatency()
{
if (MeasurementTarget::Messaging == target_)
{
measureLatency();
}
}
void onMessage(Handler&, const ChannelReset4Args&) ONIXS_CMEMDH_OVERRIDE
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
void onMessage(Handler&, const SecurityStatus30Args&) ONIXS_CMEMDH_OVERRIDE
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
void onMessage(Handler&, const IncrementalRefreshVolume37Args&) ONIXS_CMEMDH_OVERRIDE
{
measureMessagingLatency();
}
void onMessage(Handler&, const IncrementalRefreshBook46Args&) ONIXS_CMEMDH_OVERRIDE
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
void onMessage(Handler&, const QuoteRequest39Args&) ONIXS_CMEMDH_OVERRIDE
{
measureMessagingLatency();
}
// Measuring book maintenance latency.
void onBookUpdate(Handler&, const Security&, const MboBook&) ONIXS_CMEMDH_OVERRIDE
{
measureLatency();
}
void onBookUpdate(Handler&, const Security&, const DirectBook&) ONIXS_CMEMDH_OVERRIDE
{
measureLatency();
}
void onBookUpdate(Handler&, const Security&, const ImpliedBook&) ONIXS_CMEMDH_OVERRIDE
{
measureLatency();
}
void onBookUpdate(Handler&, const Security&, const ConsolidatedBook&) ONIXS_CMEMDH_OVERRIDE
{
measureLatency();
}
// Handling End of Market Event. At given point latency
// measuring is restarted and prepared for the iteration.
void handleEndOfEvent(const MatchEventIndicator& indicator)
{
if (Suspended != state_)
if (indicator.endOfEvent())
state_ = WaitingForBegin;
}
void onEndOfMessage(Handler&, const IncrementalRefreshBook46Args& args) ONIXS_CMEMDH_OVERRIDE
{
handleEndOfEvent(args.message().matchEventIndicator());
}
void onEndOfMessage(Handler&, const IncrementalRefreshOrderBook47Args& args) ONIXS_CMEMDH_OVERRIDE
{
handleEndOfEvent(args.message().matchEventIndicator());
}
//
void onEndOfPacket(Handler&, const PacketArgs&) ONIXS_CMEMDH_OVERRIDE
{
if (MeasurementTarget::Messaging == target_)
{
if (Suspended != state_)
state_ = WaitingForBegin;
}
}
//
void onRealtimeProcessing(Handler&) ONIXS_CMEMDH_OVERRIDE
{
state_ = WaitingForBegin;
std::cout << std::endl << "Handler switched processing real-time data. " << std::endl;
}
void onMarketRecovery(Handler&) ONIXS_CMEMDH_OVERRIDE
{
state_ = Suspended;
std::cout << std::endl << "Handler recovering market state. " << std::endl;
}
void onInstrumentRecovery(Handler&) ONIXS_CMEMDH_OVERRIDE
{
state_ = Suspended;
std::cout << std::endl << "Handler recovering instruments. " << std::endl;
}
void prepare()
{
latencies_.clear();
latencies_.reserve(MaxProcessedPackets);
receivedTimestamps_.clear();
receivedTimestamps_.reserve(MaxProcessedPackets);
}
void saveMeasurements()
{
if (latencies_.empty())
{
std::cout << "No latencies were recorded. " << std::endl;
}
else
{
const std::string outputFilename = statisticsFilename(configuration_.channel());
std::ofstream output(outputFilename.c_str());
if (output.good())
{
outputStatistics(output, latencies_);
}
else
{
std::string errorReason;
errorReason += "Cannot open file to output "
"benchmarking statistics [filename=";
errorReason += outputFilename;
errorReason += "]. ";
throw std::domain_error(errorReason);
}
std::sort(latencies_.begin(), latencies_.end());
TimeDiffs interpacketDelay = calculateTimeDiffs(receivedTimestamps_);
std::sort(interpacketDelay.begin(), interpacketDelay.end());
std::cout << "Latencies:" << std::endl;
Summary(latencies_).dump(std::cout, 1000, "microseconds");
std::cout << std::endl;
std::cout << "Delay between packets:" << std::endl;
Summary(interpacketDelay).dump(std::cout, 1000, "microseconds");
}
}
void onWarning(Handler&, const WarningArgs& warning) ONIXS_CMEMDH_OVERRIDE
{
std::cout << std::endl << "WARNING: " << warning << std::endl << std::endl;
}
void onError(Handler&, const ErrorArgs& error) ONIXS_CMEMDH_OVERRIDE
{
std::cout << std::endl << "ERROR: " << error << std::endl << std::endl;
}
};
int main(int qty, char** args)
{
try
{
// Uncomment the given statement to snap working thread to a particular CPU.
// ThisThread::affinity(1);
Application::identify();
const Configuration configuration(qty, args);
if (configuration.show())
{
configuration.show(std::cout);
}
else
{
checkNetworkSettings(configuration);
updateThreadPriority();
Application(configuration).run();
}
return 0;
}
catch (const std::exception& ex)
{
std::cerr << std::endl << "ERROR: " << ex.what() << std::endl;
return 1;
}
}