OnixS C++ FIX Engine  4.11.0
API Documentation
Throttling BuySide Sample

This sample demonstrates the throttling of outgoing messages on the initiator's side.


Source code:


Listener.h:

class Listener : public OnixS::FIX::ISessionListener
{
public:
Listener() :orderHandlingComplete_(0), totalOrdersHandled_(0), totalMessagesRejected_(0) {}
/// Is called when the application-level message is received from the counterparty.
void onInboundApplicationMsg(Message & msg, Session * sn) ONIXS_FIXENGINE_FINAL;
/// Is called when the session-level message is received from the counterparty.
void onInboundSessionMsg(Message & msg, Session * sn) ONIXS_FIXENGINE_FINAL;
void onStateChange(SessionState::Enum newState, SessionState::Enum prevState, Session * sn) ONIXS_FIXENGINE_FINAL;
void onError(ErrorReason::Enum reason, const std::string & description, Session * sn) ONIXS_FIXENGINE_FINAL;
void onWarning(WarningReason::Enum reason, const std::string & description, Session * sn) ONIXS_FIXENGINE_FINAL;
OnixS::Threading::Semaphore orderHandlingComplete_;
size_t totalOrdersHandled_;
size_t totalMessagesRejected_;
};

Listener.cpp:

#include "Listener.h"
void Listener::onInboundApplicationMsg(Message & msg, Session * /*sn*/)
{
std::clog << "\nIncoming application-level message:\n" << msg << std::endl;
if(msg.type() == MsgType::Execution_Report)
{
++totalOrdersHandled_;
orderHandlingComplete_.release();
}
}
void Listener::onInboundSessionMsg(Message & msg, Session * /*sn*/)
{
std::clog << "\nIncoming session-level message:\n" << msg << std::endl;
if(msg.type() == MsgType::Reject)
{
++totalMessagesRejected_;
std::clog << "Rejection " << totalMessagesRejected_ << ": message " << msg.get(Tags::RefSeqNum).toString() << " rejected." << msg << std::endl;
orderHandlingComplete_.release();
}
}
void Listener::onStateChange(SessionState::Enum newState, SessionState::Enum prevState,
Session * /*sn*/)
{
std::clog
<< "\nSession's state is changed, prevState="
<< SessionState::toString(prevState)
<< ", newState="
<< SessionState::toString(newState)
<< std::endl;
}
void Listener::onError(ErrorReason::Enum /*reason*/, const std::string & description, Session * /*sn*/)
{
std::cerr << "\nSession-level error:" << description << std::endl;
}
void Listener::onWarning(WarningReason::Enum /*reason*/, const std::string & description,
Session * /*sn*/)
{
std::cerr << "\nSession-level warning:" << description << std::endl;
}

ThrottlingBuySide.cpp:

#include "Listener.h"
#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
using namespace Settings;
int main()
{
std::clog << "ThrottlingBuySide sample." << std::endl << std::endl;
try
{
EngineSettings settings;
settings.licenseStore(LicenseStore);
Engine::init(settings);
Listener listener;
Session session(SenderCompId, TargetCompId, FixProtocolVersion, &listener);
const std::string CounterpartyHost = "localhost";
const int CounterpartyPort = ListenPort;
const int HeartBtInt = 5;
const size_t MessagesPerSecondLimitDelta = 3;
const size_t MessagesPerSecondLimit = 10 - MessagesPerSecondLimitDelta;
const size_t ThrottlingIntervalMillisecond = 1000;
session.throttlingLimit(MessagesPerSecondLimit, ThrottlingIntervalMillisecond);
session.logonAsInitiator(CounterpartyHost, CounterpartyPort, HeartBtInt);
std::clog << "Press any key to send throttled orders..." << std::endl;
waitUntilEnterKey();
Message order(MsgType::Order_Single, FixProtocolVersion);
setOrderFields(&order);
const int NumOfMessages = 100;
for(int msgCounter = 0; msgCounter < NumOfMessages; ++msgCounter)
{
session.throttle();
session.send(&order);
listener.orderHandlingComplete_.acquire();
}
std::clog << "Orders handled: " << listener.totalOrdersHandled_ << ", messages rejected: " << listener.totalMessagesRejected_ << std::endl;
std::clog << "Press any key to disconnect the session and terminate the application." << std::endl;
waitUntilEnterKey();
session.logout("The session is disconnected by ThrottlingBuySide").shutdown();
Engine::shutdown();
}
catch(const std::exception & ex)
{
processSampleException(ex.what());
return 1;
}
return 0;
}