43{
44
53 > cfg{"Benchmark", argc, argv};
54
55 try
56 {
57 const auto numberOfMessages = cfg.numberOfMessages();
58 const auto warmupInterval = cfg.warmupInterval();
59 const auto intervalBetweenSending = (std::max)(cfg.intervalBetweenSending(), warmupInterval);
60 const auto storageType = cfg.storage();
61 const auto receivingThreadAffinity = cfg.receivingThreadCpu();
62
65
68
69
71
72 settings
73 .receiveSpinningTimeout(1000 * 1000)
74 .useSpinLock(true)
75 .logBeforeSending(false);
76
78 {
79 public:
80 explicit BenchmarkListener(size_t numberOfMessages)
81 : BenchmarkSessionListener(numberOfMessages)
82 {}
83
84 using SessionListener::onError;
85
87 SessionErrorReason::Enum reason,
const std::string & description, Session * session, Messaging::SbeMessage msg)
override
88 {
90 receivedAllMessages_.set_exception(std::make_exception_ptr(std::runtime_error(description)));
91 }
92
94 void collectSessionTimeMarks(Session & session, MessageHolder<NewOrderRequest> & order,
95 size_t numberOfMessages, size_t sendPeriodUsec, size_t warmupPeriodUsec)
96 {
97 receivedAllMessages_ = std::promise<void>{};
98
100
102
103 for(uint64_t messageCounter = 0; messageCounter < numberOfMessages; ++messageCounter)
104 {
105 order->orderQty(messageCounter + 1);
106 order->clOrdId(messageCounter);
107
109 session.
send(order, sendingTime);
111
113
114 if(warmupPeriodUsec)
115 {
117
118 do
119 {
121 session.
warmUp(order, sendingTime);
122 }
124 }
125 else if(sendPeriodUsec)
126 {
128 }
129 }
130
131 waitUntilReceivedAllMessages();
132 }
133
134 private:
136 {
138 receivedAllMessages_.set_value();
139 }
140
141 void waitUntilReceivedAllMessages()
142 {
143 auto future = receivedAllMessages_.get_future();
144
145 if(future.wait_for(std::chrono::seconds{120}) != std::future_status::ready)
146 receivedAllMessages_.set_exception(std::make_exception_ptr(std::runtime_error("Operation timed out.")));
147
148 future.get();
149 }
150
151 std::promise<void> receivedAllMessages_;
152 }
153 listener{numberOfMessages};
154
156
157 const auto bgwSession= std::make_unique<BgwSession>(settings, &listener, cfg.storage());
158
160
161 listener.measureOverhead();
162
163 bgwSession->
164 receivingThreadAffinity(receivingThreadAffinity)
166 .connect(bgwCredentials.host(), bgwCredentials.port(), bgwCredentials.ipSessionToken());
167
169
170 std::clog << "\nWarm-up phase to make first calls faster..." << std::endl;
171 listener.collectSessionTimeMarks(*bgwSession, order, numberOfMessages, intervalBetweenSending, warmupInterval);
172
173 std::clog << "\nMeasurement phase..." << std::endl;
174 listener.collectSessionTimeMarks(*bgwSession, order, numberOfMessages, intervalBetweenSending, warmupInterval);
175
177 const bool hasTcpInfo = bgwSession->getTcpInfo(tcpInfo);
178
179 bgwSession->disconnect();
180
181 if(listener.packetGroupingDetected())
182 std::clog << "Attention: packet grouping detected! Increase the interval between sending!" << std::endl;
183
185
186 if(hasTcpInfo)
187 std::clog <<
"\n\n" << tcpInfo.
toString();
188 }
189 catch (const std::exception& ex)
190 {
191 std::cerr << "\nEXCEPTION: " << ex.what() << std::endl;
192 return 1;
193 }
194
195 return 0;
196}
#define ONIXS_ICEBOE_HOTPATH
ONIXS_ICEBOE_HOTPATH ONIXS_ICEBOE_FORCEINLINE void afterSending() noexcept
void onError(SessionErrorReason::Enum, const std::string &description, Session *, Messaging::SbeMessage) override
virtual void resetBenchmark() noexcept
virtual void onReceivedAllMessages()
ONIXS_ICEBOE_HOTPATH ONIXS_ICEBOE_FORCEINLINE void beforeSending() noexcept
Session & warmUp(Messaging::MessageHolder< SbeMessageType, MaxMessageSize > &msg, Messaging::Timestamp sendingTime=UtcWatch::now(), int warmupFlags=0)
Warms up the sending path.
Session & send(Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg, Messaging::Timestamp sendingTime=UtcWatch::now())
Sends the message.
static void affinity(const CpuIndexes &cpuIndexes)
Sets the processor affinity mask for the current thread. The thread is rescheduled after this call.
static void spinWait(size_t microseconds)
Executes a single instruction during the given number of microseconds.
static MessageHolder< NewOrderRequest > createOrder(const std::string &traderId)
constexpr Threading::CpuIndex AuxiliarySendingThreadAffinity
std::shared_ptr< void > setPriorityAndPolicy(Session *session=nullptr)
std::pair< std::unique_ptr< GatewayEmulatorThread< BusSessionGatewayListener > >, std::unique_ptr< GatewayEmulatorThread< GatewayListener > > > createEmulator(const SessionSettings &settings, const ConnectivityConfiguration &cfg, bool tcpDirect=false)
BgwCredentials receiveBgwCredentials(SessionSettings settings, std::string host, Port port)
SessionSettings fillSettings(const LogonConfiguration &logonCfg, const ConnectivityConfiguration &connCfg, const SettingsConfiguration &settingsCfg)
void printBenchmarkSettings(std::ostream &o, size_t numberOfMessages, size_t intervalBetweenSending, size_t warmupInterval, SessionStorageType::Enum storageType)
static void reportResults(const std::string &name, const SendMarks *sendMarksArray, const ReceiveMarks *receiveMarksArray, std::size_t count, const Overhead &overhead)
std::string toString() const
static void manageSignals() noexcept