This sample demonstetes how to benchmark the Handler.
namespace {
struct MeasurementTarget
{
enum Enum
{
Messaging,
MboBooks,
DirectBooks,
ImpliedBooks,
ConsolidatedBooks
};
};
{
const StrRef serialized(str, length);
if (serialized == "Messaging")
{
value = MeasurementTarget::Messaging;
return true;
}
if (serialized == "MboBooks")
{
value = MeasurementTarget::MboBooks;
return true;
}
if (serialized == "DirectBooks")
{
value = MeasurementTarget::DirectBooks;
return true;
}
if (serialized == "ImpliedBooks")
{
value = MeasurementTarget::ImpliedBooks;
return true;
}
if (serialized == "ConsolidatedBooks")
{
value = MeasurementTarget::ConsolidatedBooks;
return true;
}
return false;
}
void apply(
BookManagement& management, MeasurementTarget::Enum target)
{
switch (target)
{
case MeasurementTarget::MboBooks:
break;
case MeasurementTarget::DirectBooks:
break;
case MeasurementTarget::ImpliedBooks:
break;
case MeasurementTarget::ConsolidatedBooks:
break;
default:
break;
}
}
typedef std::vector<TimeDiff> TimeDiffs;
typedef std::vector<Timestamp> Timestamps;
struct DeltaT
{
TimeDiff operator()(const Timestamp& l, const Timestamp& r) const
{
return (l - r).ticks();
}
};
TimeDiffs calculateTimeDiffs(const Timestamps& timestamps)
{
TimeDiffs diffs;
diffs.reserve(timestamps.size() - 1);
std::transform(timestamps.begin() + 1, timestamps.end(), timestamps.begin(), std::back_inserter(diffs), DeltaT());
return diffs;
}
TimeDiff average(const TimeDiffs& diffs)
{
const size_t sampleQty = diffs.size();
assert(0 < sampleQty);
return (std::accumulate(diffs.begin(), diffs.end(), TimeDiff(0)) / sampleQty);
}
TimeDiff percentile(const TimeDiffs& diffs, unsigned percent)
{
const size_t sampleQty = diffs.size();
assert(0 < sampleQty);
assert(50 <= percent && 100 > percent);
const size_t multipliedIndex = sampleQty * percent;
const size_t index = multipliedIndex / 100;
return (0 == (multipliedIndex % 100) && 1 < sampleQty) ? (diffs[index] + diffs[index - 1]) / 2
: (diffs[index]);
}
class Summary
{
public:
explicit Summary(const TimeDiffs& timeDiffs)
: length_(timeDiffs.size())
, min_(timeDiffs[0])
, max_(*timeDiffs.rbegin())
, average_(average(timeDiffs))
, p50_(percentile(timeDiffs, 50))
, p95_(percentile(timeDiffs, 95))
, p99_(percentile(timeDiffs, 99))
{
}
void dump(std::ostream& output, unsigned unitScale, const char* unitName) const
{
assert(0 < unitScale);
const double realScale = static_cast<double>(unitScale);
output << std::fixed << std::setprecision(3)
<< "Data processing result (in " << unitName << "): " << std::endl << std::endl
<< "Minimal: " << (min_ / realScale) << std::endl
<< "Median: " << (p50_ / realScale) << std::endl
<< "Mean: " << (average_ / realScale) << std::endl
<< "95%: " << (p95_ / realScale) << std::endl
<< "99%: " << (p99_ / realScale) << std::endl
<< "Maximal: " << (max_ / realScale) << std::endl << std::endl
<< "Samples: " << length_ << ". " << std::endl;
}
private:
size_t length_;
TimeDiff min_;
TimeDiff max_;
TimeDiff average_;
TimeDiff p50_;
TimeDiff p95_;
TimeDiff p99_;
};
struct FilenameFilter
{
bool operator()(
char symbol)
const {
}
};
std::string statisticsFilename(
UInt32 channel)
{
std::string filename("Benchmark-Statistics-Channel");
filename +=
toStr(channel);
filename += "-";
timestamp.erase(std::remove_if(timestamp.begin(), timestamp.end(), FilenameFilter()), timestamp.end());
filename += ".csv";
return filename;
}
class StatisticsSerializer
{
public:
explicit StatisticsSerializer(std::ostream& output)
: output_(&output)
{
}
void operator()(TimeDiff latency) const
{
*output_ << latency << std::endl;
}
private:
std::ostream* output_;
};
void outputStatistics(std::ostream& output, const TimeDiffs& latencies)
{
std::for_each(latencies.begin(), latencies.end(), StatisticsSerializer(output));
}
void updateThreadPriority()
{
#if defined(_WIN32)
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);
#endif
}
}
class Configuration
: public ChannelConfiguration
, public SessionConfiguration
, public FeedEngineConfiguration
, public ConnectivityConfiguration
, public FeedConfiguration
{
public:
Configuration(
size_t qty,
Char** args)
: ConfigurationBase(qty, args)
, ChannelConfiguration(qty, args, 310)
, SessionConfiguration(qty, args)
, FeedEngineConfiguration(qty, args, 0)
, ConnectivityConfiguration(qty, args)
, FeedConfiguration(qty, args)
{
}
MeasurementTarget::Enum measurementTarget() const
{
return argOrDefault(options(), "measure", MeasurementTarget::DirectBooks);
}
private:
{
ChannelConfiguration::showOptions(out);
out << " --measure <target>" << std::endl
<< "\tDefines benchmarking target. " << std::endl
<< "\tSupported values are: Messaging, MboBooks, DirectBooks, "
"ImpliedBooks, ConsolidatedBooks. The first one measures market "
"data processing only. Others measure order book maintenance. "
<< std::endl
<< std::endl;
SessionConfiguration::showOptions(out);
FeedEngineConfiguration::showOptions(out);
ConnectivityConfiguration::showOptions(out);
FeedConfiguration::showOptions(out);
}
};
class Application
: SecurityListener
, MarketDataListener
{
public:
Application(const Configuration& configuration)
: configuration_(configuration)
, watch_(configuration.watch())
, target_(configuration.measurementTarget())
, state_(Suspended)
{
}
~Application() {}
void run()
{
prepare();
const ScopedPtr<NetFeedEngine> feedEngine(constructFeedEngine(configuration_));
const ScopedPtr<TcpRecoveryService> tcpRecovery(constructTcpRecovery(configuration_));
const ScopedPtr<NullLogger> logger(
new NullLogger());
Handler handler;
handler.bindLogger(*logger);
setLicense(settings);
settings.
channel(configuration_.channel()).connectivityConfigurationFile(configuration_.connectivityFile());
apply(settings.
feeds(), configuration_);
apply(settings.
session(), configuration_, tcpRecovery.get());
handler.settings().listeners().handler(this);
handler.settings().listeners().marketData(this);
if (MeasurementTarget::Messaging != target_)
handler.settings().listeners().security(this);
std::cout << "Starting processing market data.. " << std::endl
<< std::endl
<< "Press Ctrl+C to stop processing market data, "
"output benchmarking results and exit. "
<< std::endl;
handler.start();
while (!InterruptDetector::instance().detected())
{
try
{
}
catch (const std::exception& ex)
{
std::cerr << std::endl
<< "WARNING! Feed engine raised an "
"issue while processing incoming data. "
<< ex.what() << std::endl;
}
}
std::cout << "Stopping processing market data.. " << std::endl;
handler.stop();
saveMeasurements();
std::cout << std::endl << "Done. " << std::endl << std::endl;
}
static void identify()
{
std::cout <<
"Benchmarking for the CME MDP Premium Market Data Handler, v" <<
toStr(
Version::current()) <<
"." << std::endl
<< std::endl;
}
private:
enum State
{
Suspended,
WaitingForBegin,
WaitingForProcessingResults,
FinalizingMeasuring
};
static const unsigned int MaxProcessedPackets = 1000000;
const Configuration& configuration_;
const MeasurementTarget::Enum target_;
State state_;
Timestamps receivedTimestamps_;
TimeDiffs latencies_;
Application(const Application&);
Application& operator=(const Application&);
{
if (WaitingForBegin == state_)
state_ = WaitingForProcessingResults;
if(MaxProcessedPackets < receivedTimestamps_.size())
return;
if (WaitingForProcessingResults == state_)
receivedTimestamps_.push_back(args.receiveTime());
}
void measureLatency()
{
if (WaitingForProcessingResults == state_)
{
state_ = FinalizingMeasuring;
if((MaxProcessedPackets < receivedTimestamps_.size()) || receivedTimestamps_.empty())
return;
latencies_.push_back(DeltaT()(watch_.
now(), receivedTimestamps_.back()));
}
}
void measureMessagingLatency()
{
if (MeasurementTarget::Messaging == target_)
{
measureLatency();
}
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureMessagingLatency();
}
{
measureLatency();
}
{
measureLatency();
}
{
measureLatency();
}
{
measureLatency();
}
{
if (Suspended != state_)
if (indicator.endOfEvent())
state_ = WaitingForBegin;
}
{
handleEndOfEvent(args.message().matchEventIndicator());
}
{
handleEndOfEvent(args.message().matchEventIndicator());
}
{
if (MeasurementTarget::Messaging == target_)
{
if (Suspended != state_)
state_ = WaitingForBegin;
}
}
void onRealtimeProcessing(Handler&) ONIXS_CMEMDH_OVERRIDE
{
state_ = WaitingForBegin;
std::cout << std::endl << "Handler switched processing real-time data. " << std::endl;
}
void onMarketRecovery(Handler&) ONIXS_CMEMDH_OVERRIDE
{
state_ = Suspended;
std::cout << std::endl << "Handler recovering market state. " << std::endl;
}
void onInstrumentRecovery(Handler&) ONIXS_CMEMDH_OVERRIDE
{
state_ = Suspended;
std::cout << std::endl << "Handler recovering instruments. " << std::endl;
}
void prepare()
{
latencies_.clear();
latencies_.reserve(MaxProcessedPackets);
receivedTimestamps_.clear();
receivedTimestamps_.reserve(MaxProcessedPackets);
}
void saveMeasurements()
{
if (latencies_.empty())
{
std::cout << "No latencies were recorded. " << std::endl;
}
else
{
const std::string outputFilename = statisticsFilename(configuration_.channel());
std::ofstream output(outputFilename.c_str());
if (output.good())
{
outputStatistics(output, latencies_);
}
else
{
std::string errorReason;
errorReason += "Cannot open file to output "
"benchmarking statistics [filename=";
errorReason += outputFilename;
errorReason += "]. ";
throw std::domain_error(errorReason);
}
std::sort(latencies_.begin(), latencies_.end());
TimeDiffs interpacketDelay = calculateTimeDiffs(receivedTimestamps_);
std::sort(interpacketDelay.begin(), interpacketDelay.end());
std::cout << "Latencies:" << std::endl;
Summary(latencies_).dump(std::cout, 1000, "microseconds");
std::cout << std::endl;
std::cout << "Delay between packets:" << std::endl;
Summary(interpacketDelay).dump(std::cout, 1000, "microseconds");
}
}
void onWarning(Handler&,
const WarningArgs& warning) ONIXS_CMEMDH_OVERRIDE
{
std::cout << std::endl << "WARNING: " << warning << std::endl << std::endl;
}
void onError(Handler&,
const ErrorArgs& error) ONIXS_CMEMDH_OVERRIDE
{
std::cout << std::endl << "ERROR: " << error << std::endl << std::endl;
}
};
int main(int qty, char** args)
{
try
{
Application::identify();
const Configuration configuration(qty, args);
if (configuration.show())
{
configuration.show(std::cout);
}
else
{
checkNetworkSettings(configuration);
updateThreadPriority();
Application(configuration).run();
}
return 0;
}
catch (const std::exception& ex)
{
std::cerr << std::endl << "ERROR: " << ex.what() << std::endl;
return 1;
}
}