42int main(
int argc,
char* argv[])
53 > cfg{
"Benchmark", argc, argv};
57 const auto numberOfMessages = cfg.numberOfMessages();
58 const auto warmupInterval = cfg.warmupInterval();
59 const auto intervalBetweenSending = (std::max)(cfg.intervalBetweenSending(), warmupInterval);
60 const auto storageType = cfg.storage();
61 const auto receivingThreadAffinity = cfg.receivingThreadCpu();
73 .receiveSpinningTimeout(1000 * 1000)
75 .logBeforeSending(
false);
80 explicit BenchmarkListener(
size_t numberOfMessages)
90 receivedAllMessages_.set_exception(std::make_exception_ptr(std::runtime_error(description)));
95 size_t numberOfMessages,
size_t sendPeriodUsec,
size_t warmupPeriodUsec)
97 receivedAllMessages_ = std::promise<void>{};
103 for(uint64_t messageCounter = 0; messageCounter < numberOfMessages; ++messageCounter)
105 order->orderQty(messageCounter + 1);
106 order->clOrdId(messageCounter);
109 session.
send(order, sendingTime);
121 session.
warmUp(order, sendingTime);
125 else if(sendPeriodUsec)
131 waitUntilReceivedAllMessages();
138 receivedAllMessages_.set_value();
141 void waitUntilReceivedAllMessages()
143 auto future = receivedAllMessages_.get_future();
145 if(future.wait_for(std::chrono::seconds{120}) != std::future_status::ready)
146 receivedAllMessages_.set_exception(std::make_exception_ptr(std::runtime_error(
"Operation timed out.")));
151 std::promise<void> receivedAllMessages_;
153 listener{numberOfMessages};
157 const auto bgwSession= std::make_unique<BgwSession>(settings, &listener, cfg.storage());
161 listener.measureOverhead();
164 receivingThreadAffinity(receivingThreadAffinity)
166 .connect(bgwCredentials.host(), bgwCredentials.port(), bgwCredentials.ipSessionToken());
170 std::clog <<
"\nWarm-up phase to make first calls faster..." << std::endl;
171 listener.collectSessionTimeMarks(*bgwSession, order, numberOfMessages, intervalBetweenSending, warmupInterval);
173 std::clog <<
"\nMeasurement phase..." << std::endl;
174 listener.collectSessionTimeMarks(*bgwSession, order, numberOfMessages, intervalBetweenSending, warmupInterval);
177 const bool hasTcpInfo = bgwSession->getTcpInfo(tcpInfo);
179 bgwSession->disconnect();
181 if(listener.packetGroupingDetected())
182 std::clog <<
"Attention: packet grouping detected! Increase the interval between sending!" << std::endl;
187 std::clog <<
"\n\n" << tcpInfo.
toString();
189 catch (
const std::exception& ex)
191 std::cerr <<
"\nEXCEPTION: " << ex.what() << std::endl;
Session & warmUp(Messaging::MessageHolder< SbeMessageType, MaxMessageSize > &msg, Messaging::Timestamp sendingTime=UtcWatch::now(), int warmupFlags=0)
Warms up the sending path.
std::pair< std::unique_ptr< GatewayEmulatorThread< BusSessionGatewayListener > >, std::unique_ptr< GatewayEmulatorThread< GatewayListener > > > createEmulator(const SessionSettings &settings, const ConnectivityConfiguration &cfg, bool tcpDirect=false)