OnixS C++ FIX Engine  4.12.0
API Documentation
Throughput Sample

This sample shows how to measure the throughput.


Source code:


Listener.h:

#include "Defines.h"
#include "../PerformanceCounter.h"
class MySessionListener : public OnixS::FIX::ISessionListener
{
public:
MySessionListener()
: ready_(0), warmupReady_(0), counter_(0) {
OnixS::Sample::PerformanceCounter::setToUndefinedValue(&receiveStart_);
OnixS::Sample::PerformanceCounter::setToUndefinedValue(&receiveFinish_);
}
void onInboundApplicationMsg(Message &, Session *) ONIXS_FIXENGINE_FINAL {
size_t counter = ++counter_;
if(counter == WarmupNumberOfMessages) {
warmupReady_.release();
OnixS::Sample::PerformanceCounter::current(&receiveStart_);
return;
}
if(counter == WarmupNumberOfMessages + NumberOfMessages) {
OnixS::Sample::PerformanceCounter::current(&receiveFinish_);
ready_.release();
return;
}
}
OnixS::Sample::PerformanceCounter::Count receiveStart_;
OnixS::Sample::PerformanceCounter::Count receiveFinish_;
private:
size_t counter_;
};

Throughput.cpp:

#include "Defines.h"
#include "Listener.h"
#include "../PerformanceCounter.h"
#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
using namespace Settings;
using namespace std;
using namespace OnixS::Sample;
const OnixS::Threading::CpuIndex MainThreadAffinity = 0;
const OnixS::Threading::CpuIndex AcceptorSendingThreadAffinity = 1;
const OnixS::Threading::CpuIndex AcceptorReceivingThreadAffinity = 2;
const OnixS::Threading::CpuIndex InitiatorSendingThreadAffinity = 3;
const OnixS::Threading::CpuIndex InitiatorReceivingThreadAffinity = 4;
int main()
{
clog << "OnixS Throughput Benchmark Sample." << endl << endl;
#ifndef NDEBUG
cerr << "Please use RELEASE build to measure throughput." << endl;
return 1;
#endif
try {
EngineSettings settings;
settings.licenseStore(LicenseStore)
.listenPort(ListenPort)
.dictionaryFile("ThroughputFixDictionary.xml")
.validateRequiredFields(false)
.validateUnknownFields(false)
.validateUnknownMessages(false);
Engine::init(settings);
Dictionary dictionary("ThroughputDictionaryId");
const string rawMessage =
"8=FIX.4.2\0019=3\00135=D\00149=OnixS\00156=CME\00134=01\00152=20120709-10:10:54\001"
"11=90001008\00121=1\00155=ABC\00154=1\00138=100\00140=1\00160=20120709-10:10:54\00110=000\001";
FlatMessage order(rawMessage.data(), rawMessage.size());
MySessionListener listener;
Session acceptor(TargetCompId, SenderCompId, dictionary, &listener,
SessionStorageType::MemoryBased);
acceptor.sendingThreadAffinity(AcceptorSendingThreadAffinity).receivingThreadAffinity(AcceptorReceivingThreadAffinity);
acceptor.logonAsAcceptor();
Session initiator(SenderCompId, TargetCompId, dictionary, &listener,
SessionStorageType::MemoryBased);
initiator.sendingThreadAffinity(InitiatorSendingThreadAffinity).receivingThreadAffinity(InitiatorReceivingThreadAffinity);
initiator.messageGrouping(200)
.logonAsInitiator("localhost", ListenPort, 30);
cout << "Warm-up stage..." << endl;
for(size_t i = 0; i < WarmupNumberOfMessages; ++i)
initiator.send(&order);
PerformanceCounter::Count sendStart;
PerformanceCounter::Count sendFinish;
listener.warmupReady_.acquire();
cout << "Measurement stage..." << endl;
PerformanceCounter::current(&sendStart);
for(size_t i = 0; i < NumberOfMessages; ++i)
initiator.send(&order);
PerformanceCounter::current(&sendFinish);
listener.ready_.acquire();
acceptor.logout();
initiator.logout();
acceptor.shutdown();
initiator.shutdown();
double sendMessagesPerSec = 1000000.0 * NumberOfMessages / PerformanceCounter::usSpan(
sendFinish, sendStart);
double receiveMessagesPerSec = 1000000.0 * NumberOfMessages / PerformanceCounter::usSpan(
listener.receiveFinish_, listener.receiveStart_);
clog << endl
<< "Throughput on send side: " << sendMessagesPerSec << " (msg/sec)" << endl
<< "Throughput on receive side: " << receiveMessagesPerSec << " (msg/sec)" << endl;
Engine::shutdown();
}
catch(const std::exception & ex) {
processSampleException(ex.what());
}
return 0;
}