32#ifndef ONIXS_USE_HUGE_PAGE
33#define ONIXS_USE_HUGE_PAGE 1
36#if ONIXS_USE_HUGE_PAGE
41# define MAP_FAILED reinterpret_cast<void*>(-1)
46# undef ONIXS_USE_HUGE_PAGE
47# define ONIXS_USE_HUGE_PAGE 0
52# define ONIXS_ICEBOE_PREFETCH(ptr) __builtin_prefetch(ptr, 1, 1);
54# define ONIXS_ICEBOE_PREFETCH(ptr)
63 , available_(HugePageSize)
77 throwAllocatorExhausted();
79 void*
const ptr = ptr_;
81 ptr_ = Messaging::advanceByBytes(ptr_, size);
84 ::madvise(ptr, size, MADV_SEQUENTIAL);
85 ::madvise(ptr, size, MADV_WILLNEED);
94 return static_cast<T*
>(
allocate(
sizeof(T) * n));
98 enum { HugePageSize = 1 << 21 };
102 const std::size_t
size = HugePageSize;
106 ptr = _aligned_malloc(size, 4096);
109# if ONIXS_USE_HUGE_PAGE
110 const int flags = MAP_PRIVATE | MAP_ANONYMOUS | MAP_LOCKED | MAP_POPULATE | MAP_HUGETLB;
112 ptr = mmap(
nullptr, size, PROT_READ | PROT_WRITE, flags, -1, 0);
117 if (0 != posix_memalign(&ptr, 4096, size))
123 throw std::bad_alloc();
128 void fini(
void* ptr)
noexcept
133# if ONIXS_USE_HUGE_PAGE
134 munmap(ptr, HugePageSize);
141 static size_t round(
size_t n)
noexcept
143 return (((n - 1) / 4096) + 1) * 4096;
147 static void throwNoHugePage()
149 struct Exception :
public std::bad_alloc
151 const char* what()
const throw()
override
154 "Unable to allocate a huge page. "
155 "Please enable it on your system (sudo sysctl -w vm.nr_hugepages=N), "
156 "or disable it's usage in the application (ONIXS_USE_HUGE_PAGE).";
164 static void throwAllocatorExhausted()
166 struct Exception :
public std::bad_alloc
168 const char* what()
const throw()
override
170 return "The allocator is exhausted.";
188 , packetGroupingDetected_()
189 , numberOfMessages_(numberOfMessages)
190 , receiveMarks_(allocator_.allocate<
BenchmarkData::ReceiveMarks>(numberOfMessages))
192 , sendMarks_(allocator_.allocate<
BenchmarkData::SendMarks>(numberOfMessages))
195 for(
size_t i = 0; i < numberOfMessages; ++i)
198 for(
size_t i = 0; i < numberOfMessages; ++i)
211 return packetGroupingDetected_;
221 return receiveMarks_;
232 Threading::ThisThread::spinWait(100000);
234 ReceivedDataTimestamp dummy;
237 const size_t iterations = numberOfMessages_ - 1;
239 for(
size_t i = 0; i < iterations; i++)
245 onReceivedApplicationMessage();
248 const size_t median = iterations / 2;
250 std::vector<PerformanceCounter::Span> data;
251 data.reserve(iterations);
253 for(
size_t i = 0; i < iterations; i++)
254 data.push_back(receiveMarks_[i].recvSpanNano());
256 std::sort(data.begin(), data.end());
257 overhead_.receive = data[median];
260 for(
size_t i = 0; i < iterations; i++)
261 data.push_back(sendMarks_[i].sendSpanNano());
263 std::sort(data.begin(), data.end());
264 overhead_.send = data[median];
267 for(
size_t i = 0; i < iterations; i++)
268 data.push_back(sendMarks_[i].overallSendSpanNano());
270 std::sort(data.begin(), data.end());
271 overhead_.overallSend = data[median];
274 for(
size_t i = 0; i < iterations; i++)
277 std::sort(data.begin(), data.end());
278 overhead_.oneWay = data[median];
286 for(
size_t i = 0; i < numberOfMessages_; ++i)
294 packetGroupingDetected_ =
false;
303 assert(sendCounter_ < numberOfMessages_);
310 assert(sendCounter_ < numberOfMessages_);
316 void onStateChange(SessionStateId::Enum newState, SessionStateId::Enum, Session *)
override
320 case SessionStateId::Established:
325 case SessionStateId::Disconnected:
337 onReceivedApplicationMessage();
343 onReceivedApplicationMessage();
349 onReceivedApplicationMessage();
355 onReceivedApplicationMessage();
359 void onReceivedBytes(
const char * ,
size_t ,
const ReceivedDataTimestamp&, Session * )
noexcept override
361 if ONIXS_ICEBOE_UNLIKELY(!active_)
365 assert(receiveCounter_ < numberOfMessages_);
372 if ONIXS_ICEBOE_UNLIKELY(!active_)
375 assert(sendCounter_ < numberOfMessages_);
379 using SessionListener::onError;
382 SessionErrorReason::Enum ,
const std::string & description, Session * , Messaging::SbeMessage)
override
384 std::cerr <<
"\nSession-level error: " << description << std::endl;
389 SessionWarningReason::Enum,
const std::string & description, Session *, Messaging::SbeMessage)
override
391 std::cerr <<
"\nSession-level warning: " << description << std::endl;
402 void onReceivedApplicationMessage() noexcept
404 if ONIXS_ICEBOE_UNLIKELY(!active_)
414 packetGroupingDetected_ =
true;
417 if ONIXS_ICEBOE_UNLIKELY(numberOfMessages_ == ++receiveCounter_)
423 Allocator allocator_;
425 bool packetGroupingDetected_;
427 const size_t numberOfMessages_;
429 BenchmarkData::Overhead overhead_;
431 alignas(ONIXS_ICEBOE_HARDWARE_DESTRUCTIVE_INTERFACE_SIZE) BenchmarkData::ReceiveMarks *
const receiveMarks_;
432 size_t receiveCounter_;
434 alignas(ONIXS_ICEBOE_HARDWARE_DESTRUCTIVE_INTERFACE_SIZE) BenchmarkData::SendMarks *
const sendMarks_;
443 , receivedAllMessages_()
448 return receivedAllMessages_;
454 receivedAllMessages_ =
false;
457 template <
typename Stack>
459 Stack& stack, Session & session, MessageHolder<NewOrderRequest>& order,
460 size_t numberOfMessages,
size_t sendPeriod,
size_t warmupPeriod);
466 receivedAllMessages_ =
true;
470 bool receivedAllMessages_;
474template <
typename Stack>
477 Stack& stack, Session & session, MessageHolder<NewOrderRequest>& order,
478 size_t numberOfMessages,
size_t sendPeriod,
size_t warmupPeriod)
484 Messaging::Timestamp sendingTime = UtcWatch::now();
486 for(uint64_t messageCounter = 0; messageCounter < numberOfMessages; ++messageCounter)
488 order->orderQty(messageCounter + 1);
489 order->clOrdId(messageCounter);
492 session.send(order, sendingTime);
495 stack.dispatchEvents();
497 sendingTime = UtcWatch::now();
499 if(warmupPeriod || sendPeriod)
508 stack.dispatchEvents();
509 session.warmUp(order, sendingTime);
514 stack.dispatchEvents();
522 stack.dispatchEvents();
#define ONIXS_ICEBOE_PREFETCH(ptr)
#define ONIXS_ICEBOE_HOTPATH
void * allocate(size_t size)
ONIXS_ICEBOE_HOTPATH void onMessageSending(char *, size_t, Session *) noexcept override
ONIXS_ICEBOE_HOTPATH void onExecutionReport_Cancel(const Messaging::ExecutionReport_Cancel, Session *) override
void onStateChange(SessionStateId::Enum newState, SessionStateId::Enum, Session *) override
ONIXS_ICEBOE_HOTPATH ONIXS_ICEBOE_FORCEINLINE void afterSending() noexcept
void onWarning(SessionWarningReason::Enum, const std::string &description, Session *, Messaging::SbeMessage) override
const BenchmarkData::SendMarks * sendMarks() const noexcept
bool active() const noexcept
ONIXS_ICEBOE_HOTPATH void onReceivedBytes(const char *, size_t, const ReceivedDataTimestamp &, Session *) noexcept override
void onError(SessionErrorReason::Enum, const std::string &description, Session *, Messaging::SbeMessage) override
const BenchmarkData::ReceiveMarks * receiveMarks() const noexcept
virtual void resetBenchmark() noexcept
virtual void onReceivedAllMessages()
const BenchmarkData::Overhead & overhead() const noexcept
ONIXS_ICEBOE_HOTPATH void onExecutionReport_Reject(const Messaging::ExecutionReport_Reject, Session *) override
ONIXS_ICEBOE_HOTPATH ONIXS_ICEBOE_FORCEINLINE void beforeSending() noexcept
BenchmarkSessionListener(size_t numberOfMessages)
~BenchmarkSessionListener() override=default
ONIXS_ICEBOE_HOTPATH void onExecutionReport_New(const Messaging::ExecutionReport_New, Session *) override
ONIXS_ICEBOE_HOTPATH void onExecutionReport_Modify(const Messaging::ExecutionReport_Modify, Session *) override
bool packetGroupingDetected() const noexcept
ReactorBenchmarkListener(size_t numberOfMessages)
void resetBenchmark() noexcept override
void onReceivedAllMessages() noexcept override
bool receivedAllMessages() const noexcept
void collectSessionTimeMarks(Stack &stack, Session &session, MessageHolder< NewOrderRequest > &order, size_t numberOfMessages, size_t sendPeriod, size_t warmupPeriod)
constexpr std::enable_if<!details::HasMemberTraits< Value >::value, size_t >::type size() noexcept
PerformanceCounter::Count recvStart
PerformanceCounter::Count recvFinish
static PerformanceCounter::Span oneWaySpanNano(const SendMarks &sm, const ReceiveMarks &rm)