OnixS C++ FIX Engine  4.12.0
API Documentation
Async Processing Sample

This sample demonstrates an asynchronous processing of incoming messages.

Source code:

#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
using namespace OnixS::Threading;
using namespace Settings;
namespace {
const std::string CounterpartyHost = "localhost";
const int CounterpartyPort = ListenPort;
const size_t PreferredPoolSize = 32;
const UInt32 MsgCount = 1000;
void log(const std::string & str)
{
std::clog << std::endl << str << std::endl;
}
class MessageItem : public ItemBase
{
public:
MessageItem(Session & session) :msg_(), session_(session)
{}
void process() ONIXS_FIXENGINE_OVERRIDE
{
log("[Initiator] - Asynchronous processing of the message: " + msg_.toString('|'));
Message cancelRequest(Values::MsgType::Order_Cancel_Request, FixProtocolVersion);
cancelRequest.set(Tags::OrigClOrdID, msg_.get(Tags::OrderID))
.set(Tags::Symbol, msg_.get(Tags::Symbol))
.set(Tags::Side, msg_.get(Tags::Side))
.set(Tags::TransactTime, Timestamp::utc(), TimestampFormat::YYYYMMDDHHMMSSMsec);
session_.send(&cancelRequest);
}
void clone(const Message & incomingMsg)
{
msg_ = incomingMsg;
}
private:
Message msg_;
Session & session_;
};
class Initiator : public OnixS::FIX::ISessionListener, public Thread, public PoolAllocatorBase
{
public:
Initiator():
Thread("AcceptorThread"),
initiatorSession_(SenderCompId, TargetCompId, FixProtocolVersion, false, this),
pool_(PreferredPoolSize, *this)
{
initiatorSession_.logonAsInitiator(CounterpartyHost, CounterpartyPort);
// Prepare a pool of Message object in advance and perform all necessary memory allocations.
warmUpMsgPool();
}
~Initiator() ONIXS_FIXENGINE_OVERRIDE
{
join();
}
Session & session()
{
return initiatorSession_;
}
void onInboundApplicationMsg(Message & msg, Session *) ONIXS_FIXENGINE_FINAL
{
log("[Initiator] - Incoming application-level message: " + msg.toString('|'));
if(msg.type() == Values::MsgType::Execution_Report)
{
// When an inbound callback is called take an object from the pool (no memory allocation).
pool_.get(msgPtr);
// Copy the incoming Message object to the Message object taken from the pool (no memory allocation just copying).
static_cast<MessageItem*>(msgPtr.get())->clone(msg);
// Enqueue the pointer to the copied Message object to a shared queue.
queue_.enqueue(msgPtr);
}
}
MessageItem * alloc() ONIXS_FIXENGINE_FINAL
{
return new MessageItem(initiatorSession_);
}
private:
void run() ONIXS_FIXENGINE_FINAL
{
const int DequeueTimeoutMs = 100;
while(!stopRequested())
{
// In the processing thread, get the Message object from the shared queue.
result = queue_.dequeue(msgPtr, DequeueTimeoutMs);
{
// Process the Message object and return the processed object to the pool.
msgPtr->process();
pool_.put(msgPtr);
}
}
// The processing thread is requested to stop, so let's process remaining messages and complete the thread procedure.
while((result = queue_.dequeue(msgPtr, DequeueTimeoutMs)) == OnixS::Threading::ThreadSafeQueueResult::Data)
{
msgPtr->process();
pool_.put(msgPtr);
}
}
void warmUpMsgPool()
{
for(size_t counter = 0; counter < pool_.preferredSize(); ++counter)
{
pool_.get(msgPtr);
queue_.enqueue(msgPtr);
}
while(queue_.dequeue(msgPtr, 0) == OnixS::Threading::ThreadSafeQueueResult::Data)
pool_.put(msgPtr);
}
Session initiatorSession_;
};
class ApplicationMessageListener : public OnixS::FIX::ISessionListener
{
public:
ApplicationMessageListener() :orderCancelRequestNumber(0) {}
void onInboundApplicationMsg(Message & msg, Session * session) ONIXS_FIXENGINE_FINAL
{
log("[Acceptor] - Synchronous processing of the message: " + msg.toString('|'));
if(msg.type() == Values::MsgType::Order_Single)
{
Message execReport(Values::MsgType::Execution_Report, FixProtocolVersion);
execReport.set(Tags::OrderID, msg.get(Tags::ClOrdID))
.set(Tags::Symbol, msg.get(Tags::Symbol))
.set(Tags::Side, msg.get(Tags::Side))
.set(Tags::OrderQty, msg.get(Tags::OrderQty))
.set(Tags::TransactTime, Timestamp::utc(), TimestampFormat::YYYYMMDDHHMMSSMsec);
session->send(&execReport);
}
else if(msg.type() == Values::MsgType::Order_Cancel_Request)
{
if(++orderCancelRequestNumber == MsgCount)
processingCompletion_.set();
}
}
void waitProcessingCompletion()
{
processingCompletion_.getFuture().get();
}
private:
Promise<void> processingCompletion_;
UInt32 orderCancelRequestNumber;
};
}
int main()
{
std::clog << "Asynchronous Processing Sample.\n";
try
{
EngineSettings settings;
settings.listenPort(ListenPort)
.licenseStore("../../license|../../../license");
Engine::init(settings);
ApplicationMessageListener acceptorListener;
Session acceptor(TargetCompId, SenderCompId, FixProtocolVersion, false, &acceptorListener);
acceptor.logonAsAcceptor();
Initiator initiator;
initiator.start();
waitUntilEnterKey("Press any key to start sending orders and process response messages asynchronously.");
Message order(Values::MsgType::Order_Single, FixProtocolVersion);
order.set(Tags::Symbol, "IBM")
.set(Tags::Side, Values::Side::Buy)
.set(Tags::OrderQty, 1000)
.set(Tags::OrdType, Values::OrdType::Market);
for(UInt32 counter = 1; counter <= MsgCount; ++counter)
{
order.set(Tags::ClOrdID, counter).set(Tags::TransactTime, Timestamp::utc(), TimestampFormat::YYYYMMDDHHMMSSMsec);
initiator.session().send(&order);
}
acceptorListener.waitProcessingCompletion();
waitUntilEnterKey("Press any key to close the sample.");
initiator.session().logout();
acceptor.logout();
initiator.stopAsync();
initiator.join();
std::clog << "\nClosing sample...\n" << std::endl;
Engine::shutdown();
}
catch(const std::exception & ex)
{
processSampleException(ex.what());
}
return 0;
}