OnixS C++ ICE Binary Order Entry Handler 1.1.1
API Documentation
Loading...
Searching...
No Matches
Benchmark.cpp File Reference

Go to the source code of this file.

Functions

int main (int argc, char *argv[])

Function Documentation

◆ main()

int main ( int argc,
char * argv[] )

Definition at line 42 of file Benchmark.cpp.

43{
44 // `--help` to show options.
45 const AppConfiguration<
53 > cfg{"Benchmark", argc, argv};
54
55 try
56 {
57 const auto numberOfMessages = cfg.numberOfMessages();
58 const auto warmupInterval = cfg.warmupInterval();
59 const auto intervalBetweenSending = (std::max)(cfg.intervalBetweenSending(), warmupInterval);
60 const auto storageType = cfg.storage();
61 const auto receivingThreadAffinity = cfg.receivingThreadCpu();
62
63 ThisThread::affinity(cfg.mainThreadCpu());
65
66 auto settings = fillSettings(cfg);
67 const auto emulator = createEmulator(settings, cfg);
68
69 // Request Binary Order Gateway credentials from Binary Utility Service Gateway
70 const auto bgwCredentials = receiveBgwCredentials(settings, cfg.host(), cfg.port());
71
72 settings
73 .receiveSpinningTimeout(1000 * 1000)
74 .useSpinLock(true)
75 .logBeforeSending(false);
76
77 class BenchmarkListener final : public BenchmarkSessionListener
78 {
79 public:
80 explicit BenchmarkListener(size_t numberOfMessages)
81 : BenchmarkSessionListener(numberOfMessages)
82 {}
83
84 using SessionListener::onError;
85
86 void onError(
87 SessionErrorReason::Enum reason, const std::string & description, Session * session, Messaging::SbeMessage msg) override
88 {
89 BenchmarkSessionListener::onError(reason, description, session, msg);
90 receivedAllMessages_.set_exception(std::make_exception_ptr(std::runtime_error(description)));
91 }
92
94 void collectSessionTimeMarks(Session & session, MessageHolder<NewOrderRequest> & order,
95 size_t numberOfMessages, size_t sendPeriodUsec, size_t warmupPeriodUsec)
96 {
97 receivedAllMessages_ = std::promise<void>{};
98
100
101 auto sendingTime = UtcWatch::now();
102
103 for(uint64_t messageCounter = 0; messageCounter < numberOfMessages; ++messageCounter)
104 {
105 order->orderQty(messageCounter + 1);
106 order->clOrdId(messageCounter);
107
109 session.send(order, sendingTime);
110 afterSending();
111
112 sendingTime = UtcWatch::now();
113
114 if(warmupPeriodUsec)
115 {
116 const auto start = PerformanceCounter::current();
117
118 do
119 {
120 ThisThread::spinWait(warmupPeriodUsec);
121 session.warmUp(order, sendingTime);
122 }
123 while(PerformanceCounter::elapsedMicroseconds(start) < static_cast<PerformanceCounter::Span>(sendPeriodUsec));
124 }
125 else if(sendPeriodUsec)
126 {
127 ThisThread::spinWait(sendPeriodUsec);
128 }
129 }
130
131 waitUntilReceivedAllMessages();
132 }
133
134 private:
135 void onReceivedAllMessages() override
136 {
138 receivedAllMessages_.set_value();
139 }
140
141 void waitUntilReceivedAllMessages()
142 {
143 auto future = receivedAllMessages_.get_future();
144
145 if(future.wait_for(std::chrono::seconds{120}) != std::future_status::ready)
146 receivedAllMessages_.set_exception(std::make_exception_ptr(std::runtime_error("Operation timed out.")));
147
148 future.get();
149 }
150
151 std::promise<void> receivedAllMessages_;
152 }
153 listener{numberOfMessages};
154
155 printBenchmarkSettings(std::clog, numberOfMessages, intervalBetweenSending, warmupInterval, storageType);
156
157 const auto bgwSession= std::make_unique<BgwSession>(settings, &listener, cfg.storage());
158
159 const auto guard = setPriorityAndPolicy(bgwSession.get());
160
161 listener.measureOverhead();
162
163 bgwSession->
164 receivingThreadAffinity(receivingThreadAffinity)
165 .sendingThreadAffinity(Default::AuxiliarySendingThreadAffinity)
166 .connect(bgwCredentials.host(), bgwCredentials.port(), bgwCredentials.ipSessionToken());
167
168 auto order = Helper::createOrder(cfg.traderId());
169
170 std::clog << "\nWarm-up phase to make first calls faster..." << std::endl;
171 listener.collectSessionTimeMarks(*bgwSession, order, numberOfMessages, intervalBetweenSending, warmupInterval);
172
173 std::clog << "\nMeasurement phase..." << std::endl;
174 listener.collectSessionTimeMarks(*bgwSession, order, numberOfMessages, intervalBetweenSending, warmupInterval);
175
176 TcpInfo tcpInfo;
177 const bool hasTcpInfo = bgwSession->getTcpInfo(tcpInfo);
178
179 bgwSession->disconnect();
180
181 if(listener.packetGroupingDetected())
182 std::clog << "Attention: packet grouping detected! Increase the interval between sending!" << std::endl;
183
184 BenchmarkData::reportResults("Latency", listener.sendMarks(), listener.receiveMarks(), numberOfMessages, listener.overhead());
185
186 if(hasTcpInfo)
187 std::clog << "\n\n" << tcpInfo.toString();
188 }
189 catch (const std::exception& ex)
190 {
191 std::cerr << "\nEXCEPTION: " << ex.what() << std::endl;
192 return 1;
193 }
194
195 return 0;
196}
#define ONIXS_ICEBOE_HOTPATH
Definition Compiler.h:157
ONIXS_ICEBOE_HOTPATH ONIXS_ICEBOE_FORCEINLINE void afterSending() noexcept
void onError(SessionErrorReason::Enum, const std::string &description, Session *, Messaging::SbeMessage) override
virtual void resetBenchmark() noexcept
ONIXS_ICEBOE_HOTPATH ONIXS_ICEBOE_FORCEINLINE void beforeSending() noexcept
Session & warmUp(Messaging::MessageHolder< SbeMessageType, MaxMessageSize > &msg, Messaging::Timestamp sendingTime=UtcWatch::now(), int warmupFlags=0)
Warms up the sending path.
Definition Session.h:674
Session & send(Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg, Messaging::Timestamp sendingTime=UtcWatch::now())
Sends the message.
Definition Session.h:659
static void affinity(const CpuIndexes &cpuIndexes)
Sets the processor affinity mask for the current thread. The thread is rescheduled after this call.
static void spinWait(size_t microseconds)
Executes a single instruction during the given number of microseconds.
static Span elapsedMicroseconds(const PerformanceCounter::Count &start)
static Count current()
Retrieves the current value of the high-resolution performance counter.
static MessageHolder< NewOrderRequest > createOrder(const std::string &traderId)
Definition Helpers.h:104
constexpr Threading::CpuIndex AuxiliarySendingThreadAffinity
Definition Defaults.h:42
std::shared_ptr< void > setPriorityAndPolicy(Session *session=nullptr)
Definition Helpers.h:150
std::pair< std::unique_ptr< GatewayEmulatorThread< BusSessionGatewayListener > >, std::unique_ptr< GatewayEmulatorThread< GatewayListener > > > createEmulator(const SessionSettings &settings, const ConnectivityConfiguration &cfg, bool tcpDirect=false)
Definition Emulator.h:157
BgwCredentials receiveBgwCredentials(SessionSettings settings, std::string host, Port port)
Definition Listener.h:264
SessionSettings fillSettings(const LogonConfiguration &logonCfg, const ConnectivityConfiguration &connCfg, const SettingsConfiguration &settingsCfg)
Definition Settings.h:32
void printBenchmarkSettings(std::ostream &o, size_t numberOfMessages, size_t intervalBetweenSending, size_t warmupInterval, SessionStorageType::Enum storageType)
Definition Helpers.h:182
static void reportResults(const std::string &name, const SendMarks *sendMarksArray, const ReceiveMarks *receiveMarksArray, std::size_t count, const Overhead &overhead)
TCP state information.
Definition TcpInfo.h:30
std::string toString() const
static void manageSignals() noexcept
Definition Signal.h:46