This sample demonstrates an asynchronous processing of incoming messages.
#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
using namespace Settings;
namespace {
const std::string CounterpartyHost = "localhost";
const int CounterpartyPort = ListenPort;
const size_t PreferredPoolSize = 32;
void log(const std::string & str)
{
std::clog << std::endl << str << std::endl;
}
{
public:
MessageItem(
Session & session) :msg_(), session_(session)
{}
void process() ONIXS_FIXENGINE_OVERRIDE
{
log("[Initiator] - Asynchronous processing of the message: " + msg_.toString('|'));
Message cancelRequest(Values::MsgType::Order_Cancel_Request, FixProtocolVersion);
cancelRequest.set(Tags::OrigClOrdID, msg_.get(Tags::OrderID))
.set(Tags::Symbol, msg_.get(Tags::Symbol))
.set(Tags::Side, msg_.get(Tags::Side))
.set(Tags::TransactTime, Timestamp::utc(), TimestampFormat::YYYYMMDDHHMMSSMsec);
session_.send(&cancelRequest);
}
void clone(
const Message & incomingMsg)
{
msg_ = incomingMsg;
}
private:
};
{
public:
Initiator():
initiatorSession_(SenderCompId, TargetCompId, FixProtocolVersion, false, this),
pool_(PreferredPoolSize, *this)
{
initiatorSession_.logonAsInitiator(CounterpartyHost, CounterpartyPort);
warmUpMsgPool();
}
~Initiator() ONIXS_FIXENGINE_OVERRIDE
{
join();
}
{
return initiatorSession_;
}
void onInboundApplicationMsg(
Message & msg,
Session *) ONIXS_FIXENGINE_FINAL
{
log(
"[Initiator] - Incoming application-level message: " + msg.
toString(
'|'));
if(msg.
type() == Values::MsgType::Execution_Report)
{
pool_.get(msgPtr);
static_cast<MessageItem*>(msgPtr.get())->clone(msg);
queue_.enqueue(msgPtr);
}
}
MessageItem * alloc() ONIXS_FIXENGINE_FINAL
{
return new MessageItem(initiatorSession_);
}
private:
void run() ONIXS_FIXENGINE_FINAL
{
const int DequeueTimeoutMs = 100;
while(!stopRequested())
{
result = queue_.dequeue(msgPtr, DequeueTimeoutMs);
{
msgPtr->process();
pool_.put(msgPtr);
}
}
{
msgPtr->process();
pool_.put(msgPtr);
}
}
void warmUpMsgPool()
{
for(size_t counter = 0; counter < pool_.preferredSize(); ++counter)
{
pool_.get(msgPtr);
queue_.enqueue(msgPtr);
}
pool_.put(msgPtr);
}
};
{
public:
ApplicationMessageListener() :orderCancelRequestNumber(0) {}
void onInboundApplicationMsg(
Message & msg,
Session * session) ONIXS_FIXENGINE_FINAL
{
log(
"[Acceptor] - Synchronous processing of the message: " + msg.
toString(
'|'));
if(msg.
type() == Values::MsgType::Order_Single)
{
Message execReport(Values::MsgType::Execution_Report, FixProtocolVersion);
execReport.set(Tags::OrderID, msg.
get(Tags::ClOrdID))
.
set(Tags::Symbol, msg.
get(Tags::Symbol))
.
set(Tags::Side, msg.
get(Tags::Side))
.
set(Tags::OrderQty, msg.
get(Tags::OrderQty))
.set(Tags::TransactTime, Timestamp::utc(), TimestampFormat::YYYYMMDDHHMMSSMsec);
session->
send(&execReport);
}
else if(msg.
type() == Values::MsgType::Order_Cancel_Request)
{
if(++orderCancelRequestNumber == MsgCount)
processingCompletion_.set();
}
}
void waitProcessingCompletion()
{
processingCompletion_.getFuture().get();
}
private:
UInt32 orderCancelRequestNumber;
};
}
int main()
{
std::clog << "Asynchronous Processing Sample.\n";
try
{
Engine::init(settings);
ApplicationMessageListener acceptorListener;
Session acceptor(TargetCompId, SenderCompId, FixProtocolVersion,
false, &acceptorListener);
acceptor.logonAsAcceptor();
Initiator initiator;
initiator.start();
waitUntilEnterKey("Press any key to start sending orders and process response messages asynchronously.");
Message order(Values::MsgType::Order_Single, FixProtocolVersion);
order.
set(Tags::Symbol,
"IBM")
.
set(Tags::Side, Values::Side::Buy)
.
set(Tags::OrderQty, 1000)
.
set(Tags::OrdType, Values::OrdType::Market);
for(
UInt32 counter = 1; counter <= MsgCount; ++counter)
{
order.
set(Tags::ClOrdID, counter).
set(Tags::TransactTime, Timestamp::utc(), TimestampFormat::YYYYMMDDHHMMSSMsec);
initiator.session().send(&order);
}
acceptorListener.waitProcessingCompletion();
waitUntilEnterKey("Press any key to close the sample.");
initiator.session().logout();
acceptor.logout();
initiator.stopAsync();
initiator.join();
std::clog << "\nClosing sample...\n" << std::endl;
Engine::shutdown();
}
catch(const std::exception & ex)
{
processSampleException(ex.what());
}
return 0;
}