OnixS C++ FIX Engine  4.12.0
API Documentation
Throttling SellSide Sample

This sample demonstrates the throttling of incoming messages on the acceptor's side.


Source code:


Listener.h:

class Listener : public OnixS::FIX::ISessionListener
{
public:
Listener();
/// Is called when the application-level message is received from the counterparty.
void onInboundApplicationMsg(Message & msg, Session * sn) ONIXS_FIXENGINE_FINAL;
/// Is called when the session-level message is received from the counterparty.
void onInboundSessionMsg(Message & msg, Session * sn) ONIXS_FIXENGINE_FINAL;
void onStateChange(SessionState::Enum newState, SessionState::Enum prevState,Session * sn) ONIXS_FIXENGINE_FINAL;
void onError(ErrorReason::Enum reason, const std::string & description, Session * sn) ONIXS_FIXENGINE_FINAL;
void onWarning(WarningReason::Enum reason, const std::string & description, Session * sn) ONIXS_FIXENGINE_FINAL;
Throttler throttler_;
size_t totalOrdersHandled_;
size_t totalMessagesRejected_;
private:
int ordersCounter_;
};

Listener.cpp:

#include "Listener.h"
#include "../../Common/Settings.h"
#include "../../Common/Helpers.h"
using namespace Settings;
Listener::Listener() : throttler_(1), totalOrdersHandled_(0), totalMessagesRejected_(0), ordersCounter_(0) {}
void Listener::onInboundApplicationMsg(Message & msg, Session * sn)
{
std::clog << "\nIncoming application-level message:\n" << msg << std::endl;
try
{
if(throttler_.tryThrottle())
{
// Reject the message
sn->sendReject(msg.seqNum(), "The message is rejected due to throttling limit exhausting");
++totalMessagesRejected_;
std::clog << "Rejection " << totalMessagesRejected_ << ": message " << msg.seqNum() << " rejected due to throttling limit exhausting." << std::endl;
return;
}
if(msg.type() == MsgType::Order_Single)
{
const Message & order = msg;
Message execReport(MsgType::Execution_Report, FixProtocolVersion);
execReport.set(Tags::OrderID, order.get(Tags::ClOrdID).toString());
const int EXEC_ID_SIZE = 128;
char execID[EXEC_ID_SIZE];
xsnprintf(execID, EXEC_ID_SIZE - 1, "ExecID_%d", ++ordersCounter_);
execReport.set(Tags::ExecID, execID)
.set(Tags::ExecType, "0")
.set(Tags::ExecTransType, "0") // New
.set(Tags::OrdStatus, "0") // New
.set(Tags::Symbol, order.get(Tags::Symbol).toString())
.set(Tags::Side, order.get(Tags::Side).toString())
.set(Tags::OrderQty, order.get(Tags::OrderQty).toString())
.set(Tags::CumQty, order.get(Tags::OrderQty).toString())
.set(Tags::AvgPx, 100.0)
.set(Tags::LastShares, 0)
.set(Tags::LastPx, 0)
.set(Tags::LeavesQty, order.get(Tags::OrderQty).toString());
sn->send(&execReport);
++totalOrdersHandled_;
}
}
catch(const std::exception & ex)
{
std::clog << "Exception during the processing of incoming message: " << ex.what() << std::endl;
}
}
void Listener::onInboundSessionMsg(Message & msg, Session * /*sn*/)
{
std::clog << "\nIncoming session-level message:\n" << msg << std::endl;
}
void Listener::onStateChange(SessionState::Enum newState, SessionState::Enum prevState,
Session * /*sn*/)
{
std::clog << "\nSession's state is changed, prevState=" << SessionState::toString(prevState)
<< ", newState=" << SessionState::toString(newState) << std::endl;
}
void Listener::onError(ErrorReason::Enum /*reason*/, const std::string & description, Session * /*sn*/)
{
std::cerr << "\nSession-level error:" << description << std::endl;
}
void Listener::onWarning(WarningReason::Enum /*reason*/, const std::string & description,
Session * /*sn*/)
{
std::cerr << "\nSession-level warning:" << description << std::endl;
}

ThrottlingSellSide.cpp:

#include "Listener.h"
#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
using namespace Settings;
int main()
{
std::clog << "ThrottlingSellSide sample." << std::endl << std::endl;
try
{
EngineSettings settings;
settings.listenPort(ListenPort)
.licenseStore(LicenseStore);
Engine::init(settings);
Listener listener;
Session session(TargetCompId, SenderCompId, FixProtocolVersion, &listener);
const size_t MessagesPerSecondLimit = 10;
const size_t ThrottlingIntervalMillisecond = 1000;
listener.throttler_.reset(MessagesPerSecondLimit, ThrottlingIntervalMillisecond);
session.logonAsAcceptor();
std::clog << "Awaiting session-initiator on port " << settings.listenPort() << "..." << std::endl;
std::clog << "Press any key to disconnect the session and terminate the application." << std::endl;
waitUntilEnterKey();
session.logout().shutdown();
std::clog << "Orders handled: " << listener.totalOrdersHandled_ << ", messages rejected: " << listener.totalMessagesRejected_ << std::endl;
Engine::shutdown();
}
catch(const std::exception & ex)
{
processSampleException(ex.what());
return 1;
}
return 0;
}