OnixS C++ ICE Binary Order Entry Handler 1.1.1
API Documentation
Loading...
Searching...
No Matches
BenchmarkSessionListener.h
Go to the documentation of this file.
1/*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19
20#pragma once
21
23
24#include "PerformanceCounter.h"
25#include "BenchmarkData.h"
26#include "Helpers.h"
27
28#ifdef __GNUC__
29# include <sys/mman.h>
30#endif
31
32#ifndef ONIXS_USE_HUGE_PAGE
33#define ONIXS_USE_HUGE_PAGE 1
34#endif
35
36#if ONIXS_USE_HUGE_PAGE
37# ifndef _WIN32
38# ifdef MAP_FAILED
39# undef MAP_FAILED
40# endif
41# define MAP_FAILED reinterpret_cast<void*>(-1)
42# endif
43#endif
44
45#ifndef MAP_HUGETLB
46# undef ONIXS_USE_HUGE_PAGE
47# define ONIXS_USE_HUGE_PAGE 0
48# undef MAP_FAILED
49#endif
50
51#ifdef __GNUC__
52# define ONIXS_ICEBOE_PREFETCH(ptr) __builtin_prefetch(ptr, 1, 1);
53#else
54# define ONIXS_ICEBOE_PREFETCH(ptr) /**/
55#endif
56
58{
59public:
61 : chunk_(init())
62 , ptr_(chunk_)
63 , available_(HugePageSize)
64 {
65 }
66
68 {
69 fini(chunk_);
70 }
71
72 void* allocate(size_t size)
73 {
74 size = round(size);
75
76 if(size > available_)
77 throwAllocatorExhausted();
78
79 void* const ptr = ptr_;
80 available_ -= size;
81 ptr_ = Messaging::advanceByBytes(ptr_, size);
82
83#ifdef __GNUC__
84 ::madvise(ptr, size, MADV_SEQUENTIAL);
85 ::madvise(ptr, size, MADV_WILLNEED);
86#endif
87
88 return ptr;
89 }
90
91 template <typename T>
92 T* allocate(size_t n)
93 {
94 return static_cast<T*>(allocate(sizeof(T) * n));
95 }
96
97private:
98 enum { HugePageSize = 1 << 21 };
99
100 void* init()
101 {
102 const std::size_t size = HugePageSize;
103 void* ptr = nullptr;
104
105#ifdef _WIN32
106 ptr = _aligned_malloc(size, 4096);
107#else
108
109# if ONIXS_USE_HUGE_PAGE
110 const int flags = MAP_PRIVATE | MAP_ANONYMOUS | MAP_LOCKED | MAP_POPULATE | MAP_HUGETLB;
111
112 ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, flags, -1, 0);
113
114 if (ptr == MAP_FAILED)
115 throwNoHugePage();
116# else
117 if (0 != posix_memalign(&ptr, 4096, size))
118 ptr = nullptr;
119# endif
120
121#endif
122 if (!ptr)
123 throw std::bad_alloc();
124
125 return ptr;
126 }
127
128 void fini(void* ptr) noexcept
129 {
130#ifdef _WIN32
131 _aligned_free(ptr);
132#else
133# if ONIXS_USE_HUGE_PAGE
134 munmap(ptr, HugePageSize);
135# else
136 free(ptr);
137# endif
138#endif
139 }
140
141 static size_t round(size_t n) noexcept
142 {
143 return (((n - 1) / 4096) + 1) * 4096;
144 }
145
146 [[noreturn]]
147 static void throwNoHugePage()
148 {
149 struct Exception : public std::bad_alloc
150 {
151 const char* what() const throw() override
152 {
153 return
154 "Unable to allocate a huge page. "
155 "Please enable it on your system (sudo sysctl -w vm.nr_hugepages=N), "
156 "or disable it's usage in the application (ONIXS_USE_HUGE_PAGE).";
157 }
158 };
159
160 throw Exception();
161 }
162
163 [[noreturn]]
164 static void throwAllocatorExhausted()
165 {
166 struct Exception : public std::bad_alloc
167 {
168 const char* what() const throw() override
169 {
170 return "The allocator is exhausted.";
171 }
172 };
173
174 throw Exception();
175 }
176
177 void * const chunk_;
178 void * ptr_;
179 size_t available_;
180};
181
182class BenchmarkSessionListener : public SessionListener
183{
184public:
185 explicit BenchmarkSessionListener(size_t numberOfMessages)
186 : allocator_()
187 , active_(false)
188 , packetGroupingDetected_()
189 , numberOfMessages_(numberOfMessages)
190 , receiveMarks_(allocator_.allocate<BenchmarkData::ReceiveMarks>(numberOfMessages))
191 , receiveCounter_(0)
192 , sendMarks_(allocator_.allocate<BenchmarkData::SendMarks>(numberOfMessages))
193 , sendCounter_(0)
194 {
195 for(size_t i = 0; i < numberOfMessages; ++i)
196 new (sendMarks_ + i) BenchmarkData::SendMarks();
197
198 for(size_t i = 0; i < numberOfMessages; ++i)
199 new (receiveMarks_ + i) BenchmarkData::ReceiveMarks();
200 }
201
202 ~BenchmarkSessionListener() override = default;
203
204 bool active() const noexcept
205 {
206 return active_;
207 }
208
209 bool packetGroupingDetected() const noexcept
210 {
211 return packetGroupingDetected_;
212 }
213
214 const BenchmarkData::SendMarks * sendMarks() const noexcept
215 {
216 return sendMarks_;
217 }
218
220 {
221 return receiveMarks_;
222 }
223
224 const BenchmarkData::Overhead & overhead() const noexcept
225 {
226 return overhead_;
227 }
228
230 {
232 Threading::ThisThread::spinWait(100000);
233
234 ReceivedDataTimestamp dummy;
236
237 const size_t iterations = numberOfMessages_ - 1;
238
239 for(size_t i = 0; i < iterations; i++)
240 {
242 onMessageSending(nullptr, 0, nullptr);
243 afterSending();
244 onReceivedBytes(nullptr, 0, dummy, nullptr);
245 onReceivedApplicationMessage();
246 }
247
248 const size_t median = iterations / 2;
249
250 std::vector<PerformanceCounter::Span> data;
251 data.reserve(iterations);
252
253 for(size_t i = 0; i < iterations; i++)
254 data.push_back(receiveMarks_[i].recvSpanNano());
255
256 std::sort(data.begin(), data.end());
257 overhead_.receive = data[median];
258
259 data.clear();
260 for(size_t i = 0; i < iterations; i++)
261 data.push_back(sendMarks_[i].sendSpanNano());
262
263 std::sort(data.begin(), data.end());
264 overhead_.send = data[median];
265
266 data.clear();
267 for(size_t i = 0; i < iterations; i++)
268 data.push_back(sendMarks_[i].overallSendSpanNano());
269
270 std::sort(data.begin(), data.end());
271 overhead_.overallSend = data[median];
272
273 data.clear();
274 for(size_t i = 0; i < iterations; i++)
275 data.push_back(BenchmarkData::oneWaySpanNano(sendMarks_[i], receiveMarks_[i]));
276
277 std::sort(data.begin(), data.end());
278 overhead_.oneWay = data[median];
279
281 active_ = false;
282 }
283
284 virtual void resetBenchmark() noexcept
285 {
286 for(size_t i = 0; i < numberOfMessages_; ++i)
287 {
288 sendMarks_[i] = BenchmarkData::SendMarks();
289 receiveMarks_[i] = BenchmarkData::ReceiveMarks();
290 }
291
292 sendCounter_ = 0;
293 receiveCounter_ = 0;
294 packetGroupingDetected_ = false;
295 active_ = true;
296
297 ONIXS_ICEBOE_PREFETCH(&sendMarks_[sendCounter_]);
298 }
299
300 ONIXS_ICEBOE_HOTPATH ONIXS_ICEBOE_FORCEINLINE
301 void beforeSending() noexcept
302 {
303 assert(sendCounter_ < numberOfMessages_);
304 PerformanceCounter::current(&sendMarks_[sendCounter_].sendStart);
305 }
306
307 ONIXS_ICEBOE_HOTPATH ONIXS_ICEBOE_FORCEINLINE
308 void afterSending() noexcept
309 {
310 assert(sendCounter_ < numberOfMessages_);
311 PerformanceCounter::current(&sendMarks_[sendCounter_++].overallSendFinish);
312
313 ONIXS_ICEBOE_PREFETCH(&sendMarks_[sendCounter_]);
314 }
315
316 void onStateChange(SessionStateId::Enum newState, SessionStateId::Enum, Session *) override
317 {
318 switch(newState)
319 {
320 case SessionStateId::Established:
321 active_ = true;
322 ONIXS_ICEBOE_PREFETCH(&receiveMarks_[receiveCounter_]);
323 break;
324
325 case SessionStateId::Disconnected:
326 active_ = false;
327 break;
328
329 default:
330 break;
331 }
332 }
333
335 void onExecutionReport_New(const Messaging::ExecutionReport_New, Session*) override
336 {
337 onReceivedApplicationMessage();
338 }
339
341 void onExecutionReport_Modify(const Messaging::ExecutionReport_Modify, Session*) override
342 {
343 onReceivedApplicationMessage();
344 }
345
347 void onExecutionReport_Cancel(const Messaging::ExecutionReport_Cancel, Session*) override
348 {
349 onReceivedApplicationMessage();
350 }
351
353 void onExecutionReport_Reject(const Messaging::ExecutionReport_Reject, Session*) override
354 {
355 onReceivedApplicationMessage();
356 }
357
359 void onReceivedBytes(const char * /*bytes*/, size_t /*size*/, const ReceivedDataTimestamp&, Session * /* sn*/) noexcept override
360 {
361 if ONIXS_ICEBOE_UNLIKELY(!active_)
362 return;
363
364 // More than one SBE message could be received in one IP packet.
365 assert(receiveCounter_ < numberOfMessages_);
366 PerformanceCounter::current(&receiveMarks_[receiveCounter_].recvStart);
367 }
368
370 void onMessageSending(char *, size_t, Session *) noexcept override
371 {
372 if ONIXS_ICEBOE_UNLIKELY(!active_)
373 return;
374
375 assert(sendCounter_ < numberOfMessages_);
376 PerformanceCounter::current(&sendMarks_[sendCounter_].sendFinish);
377 }
378
379 using SessionListener::onError;
380
382 SessionErrorReason::Enum /* reason */, const std::string & description, Session * /* sn*/, Messaging::SbeMessage) override
383 {
384 std::cerr << "\nSession-level error: " << description << std::endl;
385 active_ = false;
386 }
387
389 SessionWarningReason::Enum, const std::string & description, Session *, Messaging::SbeMessage) override
390 {
391 std::cerr << "\nSession-level warning: " << description << std::endl;
392 }
393
394protected:
396 {
397 active_ = false;
398 }
399
400private:
402 void onReceivedApplicationMessage() noexcept
403 {
404 if ONIXS_ICEBOE_UNLIKELY(!active_)
405 return;
406
407 BenchmarkData::ReceiveMarks & rm = receiveMarks_[receiveCounter_];
409
410 if ONIXS_ICEBOE_UNLIKELY(PerformanceCounter::isUndefinedValue(rm.recvStart))
411 {
412 // The received IP packet contained more than one message.
413 rm.recvStart = receiveMarks_[receiveCounter_ - 1].recvFinish;
414 packetGroupingDetected_ = true;
415 }
416
417 if ONIXS_ICEBOE_UNLIKELY(numberOfMessages_ == ++receiveCounter_)
419
420 ONIXS_ICEBOE_PREFETCH(&receiveMarks_[receiveCounter_]);
421 }
422
423 Allocator allocator_;
424 bool active_;
425 bool packetGroupingDetected_;
426
427 const size_t numberOfMessages_;
428
429 BenchmarkData::Overhead overhead_;
430
431 alignas(ONIXS_ICEBOE_HARDWARE_DESTRUCTIVE_INTERFACE_SIZE) BenchmarkData::ReceiveMarks * const receiveMarks_;
432 size_t receiveCounter_;
433
434 alignas(ONIXS_ICEBOE_HARDWARE_DESTRUCTIVE_INTERFACE_SIZE) BenchmarkData::SendMarks * const sendMarks_;
435 size_t sendCounter_;
436};
437
439{
440public:
441 explicit ReactorBenchmarkListener(size_t numberOfMessages)
442 : BenchmarkSessionListener(numberOfMessages)
443 , receivedAllMessages_()
444 {}
445
446 bool receivedAllMessages() const noexcept
447 {
448 return receivedAllMessages_;
449 }
450
451 void resetBenchmark() noexcept override
452 {
454 receivedAllMessages_ = false;
455 }
456
457 template <typename Stack>
459 Stack& stack, Session & session, MessageHolder<NewOrderRequest>& order,
460 size_t numberOfMessages, size_t sendPeriod, size_t warmupPeriod);
461
462protected:
463 void onReceivedAllMessages() noexcept override
464 {
466 receivedAllMessages_ = true;
467 }
468
469private:
470 bool receivedAllMessages_;
471};
472
473
474template <typename Stack>
477 Stack& stack, Session & session, MessageHolder<NewOrderRequest>& order,
478 size_t numberOfMessages, size_t sendPeriod, size_t warmupPeriod)
479{
481
482 // When the session is used with the OnixS C++ CME Market Data Handler,
483 // this value can be taken from the packet receiving time
484 Messaging::Timestamp sendingTime = UtcWatch::now();
485
486 for(uint64_t messageCounter = 0; messageCounter < numberOfMessages; ++messageCounter)
487 {
488 order->orderQty(messageCounter + 1);
489 order->clOrdId(messageCounter);
490
492 session.send(order, sendingTime);
493 afterSending();
494
495 stack.dispatchEvents();
496
497 sendingTime = UtcWatch::now();
498
499 if(warmupPeriod || sendPeriod)
500 {
501 const auto start = PerformanceCounter::current();
502
503 PerformanceCounter::Span elapsedMicroseconds = 0;
504 do
505 {
506 do
507 {
508 stack.dispatchEvents();
509 session.warmUp(order, sendingTime);
510 elapsedMicroseconds = PerformanceCounter::elapsedMicroseconds(start);
511 }
512 while(elapsedMicroseconds < static_cast<PerformanceCounter::Span>(warmupPeriod));
513
514 stack.dispatchEvents();
515 elapsedMicroseconds = PerformanceCounter::elapsedMicroseconds(start);
516 }
517 while(elapsedMicroseconds < static_cast<PerformanceCounter::Span>(sendPeriod));
518 }
519 }
520
521 while(active())
522 stack.dispatchEvents();
523}
#define MAP_FAILED
#define ONIXS_ICEBOE_PREFETCH(ptr)
#define ONIXS_ICEBOE_HOTPATH
Definition Compiler.h:157
T * allocate(size_t n)
void * allocate(size_t size)
ONIXS_ICEBOE_HOTPATH void onMessageSending(char *, size_t, Session *) noexcept override
ONIXS_ICEBOE_HOTPATH void onExecutionReport_Cancel(const Messaging::ExecutionReport_Cancel, Session *) override
void onStateChange(SessionStateId::Enum newState, SessionStateId::Enum, Session *) override
ONIXS_ICEBOE_HOTPATH ONIXS_ICEBOE_FORCEINLINE void afterSending() noexcept
void onWarning(SessionWarningReason::Enum, const std::string &description, Session *, Messaging::SbeMessage) override
const BenchmarkData::SendMarks * sendMarks() const noexcept
ONIXS_ICEBOE_HOTPATH void onReceivedBytes(const char *, size_t, const ReceivedDataTimestamp &, Session *) noexcept override
void onError(SessionErrorReason::Enum, const std::string &description, Session *, Messaging::SbeMessage) override
const BenchmarkData::ReceiveMarks * receiveMarks() const noexcept
virtual void resetBenchmark() noexcept
const BenchmarkData::Overhead & overhead() const noexcept
ONIXS_ICEBOE_HOTPATH void onExecutionReport_Reject(const Messaging::ExecutionReport_Reject, Session *) override
ONIXS_ICEBOE_HOTPATH ONIXS_ICEBOE_FORCEINLINE void beforeSending() noexcept
BenchmarkSessionListener(size_t numberOfMessages)
~BenchmarkSessionListener() override=default
ONIXS_ICEBOE_HOTPATH void onExecutionReport_New(const Messaging::ExecutionReport_New, Session *) override
ONIXS_ICEBOE_HOTPATH void onExecutionReport_Modify(const Messaging::ExecutionReport_Modify, Session *) override
bool packetGroupingDetected() const noexcept
static Span elapsedMicroseconds(const PerformanceCounter::Count &start)
static Count current()
Retrieves the current value of the high-resolution performance counter.
static bool isUndefinedValue(const Count &value)
ReactorBenchmarkListener(size_t numberOfMessages)
void resetBenchmark() noexcept override
void onReceivedAllMessages() noexcept override
bool receivedAllMessages() const noexcept
void collectSessionTimeMarks(Stack &stack, Session &session, MessageHolder< NewOrderRequest > &order, size_t numberOfMessages, size_t sendPeriod, size_t warmupPeriod)
constexpr std::enable_if<!details::HasMemberTraits< Value >::value, size_t >::type size() noexcept
Definition Memory.h:303
PerformanceCounter::Count recvStart
PerformanceCounter::Count recvFinish
static PerformanceCounter::Span oneWaySpanNano(const SendMarks &sm, const ReceiveMarks &rm)