OnixS C++ B3 Binary UMDF Market Data Handler  1.7.0
API documentation
Benchmark Sample

This sample measures the execution time of API functionality within a library. It provides two measurement modes:

  1. Packet Reception to The First Callback (Messaging): Measures the time elapsed from receiving a data packet through a multicast socket to invocating the first callback for a message within that packet.

    ./Benchmark –channel 80 –packetsCount 10000 –connectivity ../Settings/config.Prod.xml –ifA n1 –ifB n2

  2. Atomic Order Book Update Time: Measures the time required for an atomic update of the order book. Specifically, it tracks the duration from receiving an order change message within a packet to when the callback confirms that the message has been applied (OrderBookListener::onOrderBookChanged is called).

    ./Benchmark –channel 80 –measureBookChanges –chunksAmount 10000 –packetsCount 10000 –connectivity ../Settings/config.Prod.xml –ifA n1 –ifB n2

This sample helps analyze the performance of API calls related to market data processing.

To get all other command line parameters:

./Benchmark –help

Source code

#include <iostream>
#include <Common/Configuring.h>
#include <Common/Options.h>
#include <Common/Utils.h>
#include "MyListener.h"
#include <OnixS/B3/MarketData/UMDF/Handler.h>
#include <OnixS/B3/MarketData/UMDF/HandlerSettings.h>
#define ONIXS_USE_EF_VI 0
using namespace std;
using namespace OnixS::B3::MarketData::UMDF;
using Sample::MyListener;
typedef std::shared_ptr<OrderBookPool> OrderBookPoolPtr;
constexpr UInt32 MaxProcessedPackets = 1000000;
// Handles sample input parameters.
class Configuration
: public ChannelConfiguration
, public ConnectivityConfiguration
, public FeedConfiguration
{
public:
Configuration(size_t qty, Char** args)
: ConfigurationBase(qty, args)
, ChannelConfiguration(qty, args, 80)
, ConnectivityConfiguration(qty, args)
, FeedConfiguration(qty, args)
{
}
UInt32 packetsCount() const
{
return argOrDefault(options(), "packetsCount", MaxProcessedPackets);
}
bool measureBookChanges() const
{
return exist(options(), "measureBookChanges");
}
UInt32 chunksLength() const
{
return argOrDefault<UInt32>(options(), "chunksLength", 2 * 1024 * 1024);
}
std::set<Messaging::SecurityID> instruments() const
{
const auto instruments = argOrDefault(options(), "", std::string{});
const char* const delim = ",;";
const char* token = ::strtok(const_cast<char*>(instruments.c_str()), delim);
std::set<Messaging::SecurityID> result;
while (token)
{
result.insert(OptionArgConverter<Messaging::SecurityID>("instruments")(token));
token = strtok(nullptr, delim);
}
if(!result.empty())
return result;
return {146687};
}
private:
void showOptions(std::ostream& out) const ONIXS_B3_UMDF_MD_OVERRIDE
{
ChannelConfiguration::showOptions(out);
out << " --packetsCount <target>" << std::endl
<< "\tDefines the number of incremental packets to process. The MaxProcessedPackets value is used by default." << std::endl
<< std::endl;
out << " --instruments <list>" << std::endl
<< "\tA comma-separated list of `SecurityID`'s of interest." << std::endl
<< std::endl;
out << " --measureBookChanges" << std::endl
<< "\tMeasure book change latency." << std::endl
<< std::endl
<< " --chunksLength <number>" << std::endl
<< "\tDefines the amount of memory chunks used for book building, see OrderBookPoolSettings::chunkLength." << std::endl
<< std::endl;
ConnectivityConfiguration::showOptions(out);
FeedConfiguration::showOptions(out);
}
};
template <typename T>
bool run(T & engine)
{
try
{
return process(engine);
}
catch(const std::exception& ex)
{
std::cerr << std::endl
<< "WARNING! Feed engine raised an issue while processing incoming data. "
<< ex.what() << std::endl;
}
return false;
}
/// The main entry point.
int main(int qty, char** args)
{
clog << "OnixS C++ B3 Binary UMDF Market Data Handler Benchmark sample, version " << Handler::version() << '.' << endl << endl;
const Configuration configuration(qty, args);
if (configuration.show())
{
configuration.show(std::cout);
return 0;
}
try
{
HandlerSettings handlerSettings;
handlerSettings.loadFeeds(configuration.channel(), configuration.connectivityFile());
handlerSettings.networkInterfaceA = configuration.ifA();
handlerSettings.networkInterfaceB = configuration.ifB();
handlerSettings.licenseDirectory = "../../license";
handlerSettings.loggerSettings.logDirectory = "logs";
handlerSettings.loggerSettings.logLevel = LogLevel::Disabled;
handlerSettings.loggerSettings.logSettings = LogSettings::TraceToFile;
const auto instruments = configuration.instruments();
OrderBookPoolPtr orderBookPool;
if (configuration.measureBookChanges())
{
OrderBookPoolSettings orderBookPoolSettings;
orderBookPoolSettings.orderBooksAmount = instruments.size();
orderBookPoolSettings.chunkLength = configuration.chunksLength();
static constexpr auto oneMb = 1024 * 1024;
#ifndef _WIN32
if(configuration.chunksLength() > oneMb)
orderBookPoolSettings.preferHugePage = true;
#endif
orderBookPoolSettings.chunksAmount =
orderBookPoolSettings.chunkLength < oneMb ?
orderBookPoolSettings.orderBooksAmount * 4 :
orderBookPoolSettings.orderBooksAmount;
orderBookPool = make_shared<OrderBookPool>(orderBookPoolSettings);
handlerSettings.orderBookPool = orderBookPool.get();
handlerSettings.buildOrderBooks = true;
}
System::ThisThread::affinity(1);
MyListener myListener(configuration.packetsCount());
LoggerSettings feLoggerSettings = handlerSettings.loggerSettings;
feLoggerSettings.logFileNamePrefix += "_feedEngine";
#if ONIXS_USE_EF_VI
#error "Please make sure the NIC time is synchronized with the host"
SingleThreadedEfViFeedEngine feedEngine(feLoggerSettings, handlerSettings.networkInterfaceA);
#else
checkNetworkSettings(configuration);
SingleThreadedSocketFeedEngine feedEngine(feLoggerSettings);
#endif
handlerSettings.feedEngine = &feedEngine;
Handler handler(handlerSettings);
handler.registerErrorListener(&myListener);
handler.registerWarningListener(&myListener);
handler.registerMessageListener(&myListener);
if (configuration.measureBookChanges())
handler.registerOrderBookListener(&myListener);
clog << "Will start the Handler ..." << endl;
handler.start();
while(!InterruptDetector::instance().detected())
run(feedEngine);
clog << "Stopping..." << endl;
handler.stop(true);
clog << "The Handler has been stopped." << endl;
/// Print statistics
myListener.processLatencies();
}
catch(const std::exception& ex)
{
cerr << "EXCEPTION: " << ex.what() << endl;
}
catch(...)
{
cerr << "UNKNOWN EXCEPTION" << endl;
}
return 0;
}

MyListener.h:

#include <vector>
#include <OnixS/B3/MarketData/UMDF/ErrorListener.h>
#include <OnixS/B3/MarketData/UMDF/Messaging.h>
#include <OnixS/B3/MarketData/UMDF/MessageListener.h>
#include <OnixS/B3/MarketData/UMDF/WarningListener.h>
#include <OnixS/B3/MarketData/UMDF/OrderBookListener.h>
#include <OnixS/B3/MarketData/UMDF/Handler.h>
#define ONIXS_USE_HUGE_PAGE 1
#if ONIXS_USE_HUGE_PAGE
# ifndef _WIN32
# include <sys/mman.h>
# ifdef MAP_FAILED
# undef MAP_FAILED
# endif
# define MAP_FAILED reinterpret_cast<void*>(-1)
# endif
#endif
#ifndef MAP_HUGETLB
# undef ONIXS_USE_HUGE_PAGE
# define ONIXS_USE_HUGE_PAGE 0
# undef MAP_FAILED
#endif
namespace Sample
{
using namespace std;
using namespace OnixS::B3::MarketData::UMDF;
class Allocator
{
public:
Allocator(size_t size);
~Allocator();
void* allocate(size_t size);
template <typename T>
T* allocate(size_t n)
{
return static_cast<T*>(allocate(sizeof(T) * n));
}
private:
void* init(size_t size);
void fini(void* ptr);
static void throwNoHugePage();
static void throwAllocatorExhausted();
const size_t size_;
void* const chunk_;
void* ptr_;
size_t available_;
};
struct alignas(64) Marks
{
Timestamp receiveTime;
Timestamp messageTime;
Messaging::MessageTemplateId messageType = 0;
Timestamp bookTime;
};
typedef unsigned long long Latency;
typedef std::vector<Latency> Latencies;
class MyListener :
public MessageListener,
public OrderBookListener,
public WarningListener,
public ErrorListener
{
public:
/// Constructor
MyListener( size_t packetsToProcess);
void onNews_5(const Messaging::News_5, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onClosingPrice_17(const Messaging::ClosingPrice_17, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onQuantityBand_21(const Messaging::QuantityBand_21, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onPriceBand_22(const Messaging::PriceBand_22, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onOpeningPrice_15(const Messaging::OpeningPrice_15, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onTheoreticalOpeningPrice_16(const Messaging::TheoreticalOpeningPrice_16, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onAuctionImbalance_19(const Messaging::AuctionImbalance_19, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onHighPrice_24(const Messaging::HighPrice_24, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onLowPrice_25(const Messaging::LowPrice_25, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onLastTradePrice_27(const Messaging::LastTradePrice_27, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onSettlementPrice_28(const Messaging::SettlementPrice_28, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onOpenInterest_29(const Messaging::OpenInterest_29, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onOrder_MBO_50(const Messaging::Order_MBO_50, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onDeleteOrder_MBO_51(const Messaging::DeleteOrder_MBO_51, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onMassDeleteOrders_MBO_52(const Messaging::MassDeleteOrders_MBO_52, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onTrade_53(const Messaging::Trade_53, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onForwardTrade_54(const Messaging::ForwardTrade_54, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onExecutionSummary_55(const Messaging::ExecutionSummary_55, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onExecutionStatistics_56(const Messaging::ExecutionStatistics_56, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onTradeBust_57(const Messaging::TradeBust_57, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
/// Inherited from OrderBookListener
/// Is called when the book is changed within it's depth, when Order_MBO_50, DeleteOrder_MBO_51 or MassDeleteOrders_MBO_52 are received.
void onOrderBookChanged(const OrderBook& book, const Messaging::SbeMessage message) ONIXS_B3_UMDF_MD_FINAL;
void onOrderBookUpdated(const OrderBook&) ONIXS_B3_UMDF_MD_FINAL {}
void onOrderBookOutOfDate(const OrderBook&) ONIXS_B3_UMDF_MD_FINAL {}
void collectOrderBookChangeMark(Messaging::MessageTemplateId templateId);
/// Inherited from Warning Listener
/// Is called when the Warning condition is detected
void onWarning(const std::string& reason) ONIXS_B3_UMDF_MD_FINAL;
/// Inherited from Error Listener
/// Notifications about errors
void onError(ErrorCode::Enum code, const std::string& description) ONIXS_B3_UMDF_MD_FINAL;
/// Calculate and print latency statistics.
void processLatencies() const;
private:
static constexpr size_t MaxLatenciesCount = 1000000;
/// Add latency to list of all messaging measurement latencies.
void collectMessageReceptionMark(const DataSource& dataSource);
static Latency calculateAdjustment();
Allocator allocator_;
Marks* const receiveMarks_;
const size_t packetsToProcess_;
size_t marksCounter_ = 0;
size_t packetsCounter_ = 0;
};
}

MyListener.cpp:

#include <iostream>
#include <algorithm>
#include <iterator>
#include "MyListener.h"
#include <Common/Utils.h>
namespace Sample
{
ONIXS_B3_UMDF_MD_COLDPATH
void* Allocator::init(size_t size)
{
void* ptr = nullptr;
#ifdef _WIN32
ptr = _aligned_malloc(size, 4096);
#else
# if ONIXS_USE_HUGE_PAGE
const int flags = MAP_PRIVATE | MAP_ANONYMOUS | MAP_LOCKED | MAP_POPULATE | MAP_HUGETLB;
ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, flags, -1, 0);
if (ptr == MAP_FAILED)
throwNoHugePage();
# else
if (0 != posix_memalign(&ptr, 4096, size))
ptr = nullptr;
# endif
#endif
if (!ptr)
throw std::bad_alloc();
#ifndef _WIN32
::madvise(ptr, size, MADV_SEQUENTIAL);
#endif
return ptr;
}
ONIXS_B3_UMDF_MD_COLDPATH
void Allocator::fini(void* ptr)
{
#ifdef _WIN32
_aligned_free(ptr);
#else
# if ONIXS_USE_HUGE_PAGE
munmap(ptr, size_);
# else
free(ptr);
# endif
#endif
}
Allocator::Allocator(size_t size)
: size_(size)
, chunk_(init(size_))
, ptr_(chunk_)
, available_(size_)
{
}
ONIXS_B3_UMDF_MD_COLDPATH
Allocator::~Allocator()
{
fini(chunk_);
}
ONIXS_B3_UMDF_MD_COLDPATH
void Allocator::throwNoHugePage()
{
struct Exception : public std::bad_alloc
{
const char* what() const throw() override
{
return
"Unable to allocate a huge page. "
"Please enable it on your system (sudo sysctl -w vm.nr_hugepages=N), "
"or disable it's usage in the application (ONIXS_USE_HUGE_PAGE).";
}
};
throw Exception();
}
ONIXS_B3_UMDF_MD_COLDPATH
void Allocator::throwAllocatorExhausted()
{
struct Exception : public std::bad_alloc
{
const char* what() const throw() override
{
return "The allocator is exhausted.";
}
};
throw Exception();
}
ONIXS_B3_UMDF_MD_COLDPATH
void* Allocator::allocate(size_t size)
{
if(size > available_)
throwAllocatorExhausted();
void* const ptr = ptr_;
available_ -= size;
ptr_ = Messaging::advanceByBytes(ptr_, size);
return ptr;
}
Latency MyListener::calculateAdjustment()
{
const int iterations = 10000;
Latencies latencies;
latencies.reserve(iterations);
for(int i = 0; i < iterations; ++i)
{
const TimeSpan latency = Timestamp::utcNow() - Timestamp::utcNow();
latencies.push_back (latency.ticks());
}
std::sort (latencies.begin(), latencies.end());
return (latencies[latencies.size() / 2]);
}
MyListener::MyListener(size_t packetsToProcess)
: allocator_(MaxLatenciesCount * sizeof(Marks))
, receiveMarks_(new (allocator_.allocate<Marks>(MaxLatenciesCount)) Marks[MaxLatenciesCount])
, packetsToProcess_(packetsToProcess)
{
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onNews_5(const Messaging::News_5, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onClosingPrice_17(const Messaging::ClosingPrice_17, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onQuantityBand_21(const Messaging::QuantityBand_21, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onPriceBand_22(const Messaging::PriceBand_22, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onOpeningPrice_15(const Messaging::OpeningPrice_15, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onTheoreticalOpeningPrice_16(const Messaging::TheoreticalOpeningPrice_16, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onAuctionImbalance_19(const Messaging::AuctionImbalance_19, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onHighPrice_24(const Messaging::HighPrice_24, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onLowPrice_25(const Messaging::LowPrice_25, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onLastTradePrice_27(const Messaging::LastTradePrice_27, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onSettlementPrice_28(const Messaging::SettlementPrice_28, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onOpenInterest_29(const Messaging::OpenInterest_29, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onOrder_MBO_50(const Messaging::Order_MBO_50, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onDeleteOrder_MBO_51(const Messaging::DeleteOrder_MBO_51, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onMassDeleteOrders_MBO_52(const Messaging::MassDeleteOrders_MBO_52, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onTrade_53(const Messaging::Trade_53, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onForwardTrade_54(const Messaging::ForwardTrade_54, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onExecutionSummary_55(const Messaging::ExecutionSummary_55, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onExecutionStatistics_56(const Messaging::ExecutionStatistics_56, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
void
ONIXS_B3_UMDF_MD_HOTPATH
MyListener::onTradeBust_57(const Messaging::TradeBust_57, const DataSource& dataSource)
{
collectMessageReceptionMark(dataSource);
}
ONIXS_B3_UMDF_MD_HOTPATH
void MyListener::collectOrderBookChangeMark(Messaging::MessageTemplateId templateId)
{
receiveMarks_[marksCounter_].bookTime = Timestamp::utcNow();
receiveMarks_[marksCounter_].messageType = templateId;
}
ONIXS_B3_UMDF_MD_HOTPATH
void MyListener::onOrderBookChanged(const OrderBook&, const Messaging::SbeMessage message)
{
collectOrderBookChangeMark(message.templateId());
}
ONIXS_B3_UMDF_MD_COLDPATH
void MyListener::onWarning(const std::string& reason)
{
clog << "Warning occurred. Description: '" << reason << "'" << endl;
}
ONIXS_B3_UMDF_MD_COLDPATH
void MyListener::onError(ErrorCode::Enum code, const std::string& description)
{
clog << "Error occurred, errorCode = " << enumToString (code) << ". Description: '" << description << "'" << endl;
}
ONIXS_B3_UMDF_MD_HOTPATH
void MyListener::collectMessageReceptionMark(const DataSource& dataSource)
{
++marksCounter_;
receiveMarks_[marksCounter_].messageTime = Timestamp::utcNow();
const bool store = (0 == dataSource.packetMessageNumber) && (dataSource.origin == DataSource::Incremental) && !dataSource.cached;
packetsCounter_ += store;
receiveMarks_[marksCounter_ * store].receiveTime = dataSource.packetReceptionTime;
InterruptDetector::instance().setDetected(packetsCounter_ >= packetsToProcess_ || marksCounter_ >= MaxLatenciesCount);
}
namespace
{
ONIXS_B3_UMDF_MD_COLDPATH
Latency average(const Latencies& diffs)
{
const size_t sampleQty = diffs.size();
assert(0 < sampleQty);
return (std::accumulate(diffs.begin(), diffs.end(), Latency(0)) / sampleQty);
}
ONIXS_B3_UMDF_MD_COLDPATH
Latency percentile(const Latencies& diffs, unsigned percent)
{
const size_t sampleQty = diffs.size();
assert(0 < sampleQty);
assert(50 <= percent && 100 > percent);
const size_t multipliedIndex = sampleQty * percent;
const size_t index = multipliedIndex / 100;
return (0 == (multipliedIndex % 100) && 1 < sampleQty) ? (diffs[index] + diffs[index - 1]) / 2
: (diffs[index]);
}
class Summary
{
public:
ONIXS_B3_UMDF_MD_COLDPATH
explicit Summary(const Latencies& latencies, Latency adjustment)
: latencies_(latencies)
, length_(latencies.size())
, min_(latencies[0] - adjustment)
, max_(*latencies.rbegin() - adjustment)
, average_(average(latencies) - adjustment)
, p50_(percentile(latencies, 50) - adjustment)
, p95_(percentile(latencies, 95) - adjustment)
, p99_(percentile(latencies, 99) - adjustment)
{
std::transform(latencies_.begin(), latencies_.end(), latencies_.begin(), [adjustment](Latency elem) { return elem - adjustment; });
}
ONIXS_B3_UMDF_MD_COLDPATH
const Summary& dump(std::ostream& output, unsigned unitScale, const char* unitName) const
{
assert(0 < unitScale);
const double realScale = static_cast<double>(unitScale);
output << std::fixed << std::setprecision(3)
<< "Data processing result (in " << unitName << "): " << std::endl
<< "Minimal: " << (min_ / realScale) << std::endl
<< "Median: " << (p50_ / realScale) << std::endl
<< "Mean: " << (average_ / realScale) << std::endl
<< "95%: " << (p95_ / realScale) << std::endl
<< "99%: " << (p99_ / realScale) << std::endl
<< "Maximal: " << (max_ / realScale) << std::endl
<< "Samples: " << length_ << ". " << std::endl << std::endl;
return *this;
}
ONIXS_B3_UMDF_MD_COLDPATH
const Summary& histogram(std::ostream& output) const
{
std::vector<Latency> bins;
const auto stride = static_cast<Latency>((p99_ - min_) / 10);
for(Latency i = min_; i <= p99_; i += stride)
bins.push_back(i);
std::vector<int> histogram(bins.size(), 0);
for (Latency value : latencies_)
{
auto it = std::upper_bound(bins.begin(), bins.end(), value);
const auto binIndex = std::distance(bins.begin(), --it);
histogram[binIndex]++;
}
const int maxCount = *std::max_element(histogram.begin(), histogram.end());
output << "Histogram (nanoseconds):\n";
const char dividerChar = '|';
const char barChar = '*';
std::string inf = "inf";
const auto width = std::max(std::to_string(p99_).size(), inf.size());
inf = (width > inf.size() ? std::string(" ", width - inf.size()) : "") + inf;
for (size_t i = 1; i <= bins.size(); i++)
{
Latency rangeStart = bins[i - 1];
const int percentage = (maxCount > 0) ? histogram[i - 1] * 100 / maxCount : 0;
std::cout << std::fixed << std::setprecision(2);
if (i < bins.size())
{
Latency rangeEnd = bins[i];
output
<< "[" << std::setw(width) << std::right << rangeStart << " - " << std::setw(width) << std::right << rangeEnd << "): "
<< std::setw(8) << std::right << histogram[i - 1] << ' ' << dividerChar
<< std::string(percentage, barChar) << std::endl;
}
else
output
<< "[" << std::setw(width) << std::right << rangeStart << " - " << inf << "): "
<< std::setw(8) << std::right << histogram[i - 1] << ' ' << dividerChar
<< std::string(percentage, barChar) << std::endl;
}
return *this;
}
private:
Latencies latencies_;
size_t length_;
Latency min_;
Latency max_;
Latency average_;
Latency p50_;
Latency p95_;
Latency p99_;
};
}
ONIXS_B3_UMDF_MD_COLDPATH
void MyListener::processLatencies() const
{
if (marksCounter_ == 0)
{
clog << "Nothing to process (latencies list is empty)." << endl;
return;
}
Latencies messagingLatencies;
Latencies orderMessageLatencies;
Latencies deleteOrderMessageLatencies;
Latencies massDeleteOrderMessageLatencies;
std::for_each(receiveMarks_ + 1, receiveMarks_ + marksCounter_, [&](const Marks& marks)
{
if(marks.receiveTime != Timestamp())
messagingLatencies.push_back((marks.messageTime - marks.receiveTime).ticks());
switch (marks.messageType)
{
case Messaging::Order_MBO_50::TemplateId:
if(marks.bookTime != Timestamp()) orderMessageLatencies.push_back((marks.bookTime - marks.messageTime).ticks());
break;
case Messaging::DeleteOrder_MBO_51::TemplateId:
if(marks.bookTime != Timestamp()) deleteOrderMessageLatencies.push_back((marks.bookTime - marks.messageTime).ticks());
break;
case Messaging::MassDeleteOrders_MBO_52::TemplateId:
if(marks.bookTime != Timestamp()) massDeleteOrderMessageLatencies.push_back((marks.bookTime - marks.messageTime).ticks());
break;
}
});
constexpr unsigned int nanosecondsInMicroseconds = 1000;
const Latency adjustment = calculateAdjustment();
{
std::ofstream csvFile("results.csv");
if(csvFile)
std::copy(messagingLatencies.begin(), messagingLatencies.end(), std::ostream_iterator<Latency>(csvFile, "\n"));
}
{
std::sort(messagingLatencies.begin(), messagingLatencies.end());
std::cout << "Messaging" << std::endl;
Summary(messagingLatencies, adjustment)
.dump(std::cout, nanosecondsInMicroseconds, "microseconds")
.histogram(std::cout);
std::cout << std::endl;
}
if(!orderMessageLatencies.empty())
{
std::sort(orderMessageLatencies.begin(), orderMessageLatencies.end());
clog << "Order_MBO_50" << endl;
Summary(orderMessageLatencies, adjustment)
.dump(std::cout, nanosecondsInMicroseconds, "microseconds")
.histogram(std::cout);
std::cout << std::endl;
}
if (!deleteOrderMessageLatencies.empty())
{
std::sort(deleteOrderMessageLatencies.begin(), deleteOrderMessageLatencies.end());
clog << "DeleteOrder_MBO_51" << endl;
Summary(deleteOrderMessageLatencies, adjustment)
.dump(std::cout, nanosecondsInMicroseconds, "microseconds")
.histogram(std::cout);
std::cout << std::endl;
}
if (!massDeleteOrderMessageLatencies.empty())
{
std::sort(massDeleteOrderMessageLatencies.begin(), massDeleteOrderMessageLatencies.end());
clog << "MassDeleteOrders_MBO_52" << endl;
Summary(massDeleteOrderMessageLatencies, adjustment)
.dump(std::cout, nanosecondsInMicroseconds, "microseconds")
.histogram(std::cout);
std::cout << std::endl;
}
}
}