This sample demonstrates all aspects of the resend functionality.
#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
using namespace Settings;
namespace {
const std::string CounterpartyHost = "localhost";
const int CounterpartyPort = ListenPort;
const int HeartBtInt = 30;
const size_t OrderCount = 5;
const size_t SequenceGapValue = 3;
typedef void(*MenuFunc)(const std::string &);
typedef std::pair<MenuFunc, std::string> MenuFuncItem;
typedef std::vector<MenuFuncItem> MenuFuncItems;
{
public:
void onInboundApplicationMsg(Message & msg, Session *) ONIXS_FIXENGINE_FINAL {
std::clog << "\n[" << TargetCompId << "] - Inbound application-level message" << (msg.hasFlag(
Tags::PossDupFlag) ? " with PossDupFlag=Y:\n" : ":\n") << msg << std::endl;
}
void onOutboundApplicationMsg(Message & msg, Session *) ONIXS_FIXENGINE_FINAL {
std::clog << "\n[" << TargetCompId << "] - Outbound application-level message:\n" << msg <<
std::endl;
}
void onInboundSessionMsg(Message & msg, Session *) ONIXS_FIXENGINE_FINAL {
if(msg.type() == Values::MsgType::Sequence_Reset)
std::clog << "\n[" << TargetCompId << "] - Inbound <Sequence Reset> message, MsgSeqNum=" <<
msg.get(Tags::MsgSeqNum).toString() << ", NewSeqNo=" << msg.get(Tags::NewSeqNo).toString() <<
":\n" << msg << std::endl;
else std::clog << "\n[" << TargetCompId << "] - Inbound session-level message" << (msg.hasFlag(
Tags::PossDupFlag) ? " with PossDupFlag=Y:\n" : ":\n") << msg << std::endl;
}
void onOutboundSessionMsg(Message & msg, Session *) ONIXS_FIXENGINE_FINAL {
if(msg.type() == Values::MsgType::Resend_Request)
std::clog << "\n[" << TargetCompId << "] - Outbound <Resend Request> message, BeginSeqNo=" <<
msg.get(Tags::BeginSeqNo).toString() << ", EndSeqNo=" << msg.get(Tags::EndSeqNo).toString() <<
":\n" << msg << std::endl;
else std::clog << "\n[" << TargetCompId << "] - Outbound session-level message:\n" << msg <<
std::endl;
}
void onStateChange(SessionState::Enum newState, SessionState::Enum prevState,
Session *) ONIXS_FIXENGINE_FINAL {
std::clog
<< "\n[" << TargetCompId << "] - Session's state is changed, prevState="
<< SessionState::toString(prevState)
<< ", newState="
<< SessionState::toString(newState)
<< std::endl;
}
void onWarning(WarningReason::Enum , const std::string & description, Session *) ONIXS_FIXENGINE_FINAL {
std::clog << "\n[" << TargetCompId << "] - Session warning:" << description << std::endl;
}
};
{
public:
InitiatorListener(): resendMode_(false) {}
void onInboundApplicationMsg(Message &, Session *) ONIXS_FIXENGINE_FINAL {}
bool onResendRequest(Message & msg, Session *) ONIXS_FIXENGINE_FINAL {
std::clog << "\n[" << SenderCompId << "] - ResendRequest is called for message with MsgSeqNum="
<< msg.get(Tags::MsgSeqNum).toString() << ", " << std::boolalpha << resendMode_ <<
" is returned." << std::endl;
return resendMode_;
}
void resendMode(bool value) {
resendMode_ = value;
}
private:
bool resendMode_;
};
void simulateSequenceGap(Session & acceptor, Session & initiator, const std::string & caseName)
{
acceptor
.resetLocalSequenceNumbers()
.logonAsAcceptor();
initiator.resetLocalSequenceNumbers();
std::clog << "\nConnecting to Acceptor on host=" << CounterpartyHost << " port=" <<
CounterpartyPort << "...\n";
initiator.logonAsInitiator(CounterpartyHost, CounterpartyPort, HeartBtInt);
std::clog << "\nSending " << OrderCount << " regular orders...\n";
Message order(Values::MsgType::Order_Single, FixProtocolVersion);
for(size_t orderCounter = 0; orderCounter < OrderCount; ++orderCounter) {
order.set(Tags::ClOrdID, ++orderId);
initiator.send(&order);
}
waitUntilEnterKey("Press any key to check [" + caseName + "] ...");
std::clog << "\nLet's decrease the incoming sequence number of Acceptor by " << SequenceGapValue
<< " and send new " << OrderCount << " orders to simulate a sequence gap.\n";
acceptor.inSeqNum(acceptor.inSeqNum() - SequenceGapValue);
for(size_t orderCounter = 0; orderCounter < OrderCount; ++orderCounter) {
order.set(Tags::ClOrdID, ++orderId);
initiator.send(&order);
}
waitUntilEnterKey("Press any key to disconnect Initiator and complete the case...");
initiator.logout("The session is disconnected by Initiator");
acceptor.logout();
}
void case1(const std::string & caseName)
{
AcceptorListener acceptorListener;
Session acceptor(TargetCompId, SenderCompId, FixProtocolVersion, &acceptorListener);
Session initiator(SenderCompId, TargetCompId, FixProtocolVersion, ONIXS_FIXENGINE_NULLPTR);
simulateSequenceGap(acceptor, initiator, caseName);
}
void case2(const std::string & caseName)
{
AcceptorListener acceptorListener;
Session acceptor(TargetCompId, SenderCompId, FixProtocolVersion, &acceptorListener);
InitiatorListener initiatorListener;
Session initiator(SenderCompId, TargetCompId, FixProtocolVersion, &initiatorListener);
simulateSequenceGap(acceptor, initiator, caseName);
}
void case3(const std::string & caseName)
{
AcceptorListener acceptorListener;
Session acceptor(TargetCompId, SenderCompId, FixProtocolVersion, &acceptorListener);
InitiatorListener initiatorListener;
initiatorListener.resendMode(true);
Session initiator(SenderCompId, TargetCompId, FixProtocolVersion, &initiatorListener);
simulateSequenceGap(acceptor, initiator, caseName);
}
void case4(const std::string & caseName)
{
AcceptorListener acceptorListener;
Session acceptor(TargetCompId, SenderCompId, FixProtocolVersion, &acceptorListener);
acceptor.resendRequestMaximumRange(SequenceGapValue - 1);
InitiatorListener initiatorListener;
initiatorListener.resendMode(true);
Session initiator(SenderCompId, TargetCompId, FixProtocolVersion, &initiatorListener);
simulateSequenceGap(acceptor, initiator, caseName);
}
void case5(const std::string & caseName)
{
AcceptorListener acceptorListener;
Session acceptor(TargetCompId, SenderCompId, FixProtocolVersion, &acceptorListener);
InitiatorListener initiatorListener;
initiatorListener.resendMode(true);
Session initiator(SenderCompId, TargetCompId, FixProtocolVersion, &initiatorListener);
initiator.resendingQueueSize(SequenceGapValue - 1);
simulateSequenceGap(acceptor, initiator, caseName);
}
void case6(const std::string & caseName)
{
AcceptorListener acceptorListener;
Session acceptor(TargetCompId, SenderCompId, FixProtocolVersion, &acceptorListener);
acceptor.requestOnlyMissedMessages(true);
InitiatorListener initiatorListener;
initiatorListener.resendMode(true);
Session initiator(SenderCompId, TargetCompId, FixProtocolVersion, &initiatorListener);
simulateSequenceGap(acceptor, initiator, caseName);
}
void case7(const std::string & caseName)
{
AcceptorListener acceptorListener;
Session acceptor(TargetCompId, SenderCompId, FixProtocolVersion, &acceptorListener);
acceptor.requestOnlyMissedMessages(true);
acceptor.resendRequestMaximumRange(SequenceGapValue - 1);
InitiatorListener initiatorListener;
initiatorListener.resendMode(true);
Session initiator(SenderCompId, TargetCompId, FixProtocolVersion, &initiatorListener);
simulateSequenceGap(acceptor, initiator, caseName);
}
};
int main()
{
std::clog << "ResendingMessages sample.\n";
try {
EngineSettings settings;
settings.listenPort(ListenPort)
.licenseStore(LicenseStore);
Engine::init(settings);
MenuFuncItems menuItems;
menuItems.push_back(std::make_pair(&case1,
"Default resend functionality without using any specific settings and listeners in the Initiator"));
menuItems.push_back(std::make_pair(&case2,
"Default resend functionality with using the onResendRequest callback which returns false"));
menuItems.push_back(std::make_pair(&case3,
"Default resend functionality with using the onResendRequest callback which returns true"));
menuItems.push_back(std::make_pair(&case4,
"Default resend functionality with a limitation of the resend request range"));
menuItems.push_back(std::make_pair(&case5,
"Default resend functionality with a limitation of the resending queue size"));
menuItems.push_back(std::make_pair(&case6,
"Second option of the resend functionality (requestOnlyMissedMessages=true) with using the onResendRequest callback which returns true"));
menuItems.push_back(std::make_pair(&case7,
"Second option of the resend functionality (requestOnlyMissedMessages=true) with a limitation of the resend request range"));
while(true) {
std::clog <<
"\nSelect resend functionality case you want to perform or press <Enter> to exit: \n" <<
std::endl;
for(size_t index = 0; index < menuItems.size(); ++index)
std::cout << (index + 1) << ") " << menuItems[index].second << std::endl;
std::string userInput;
std::getline(std::cin, userInput);
if(userInput.empty()) break;
size_t selectedItemNumber = std::atoi(userInput.c_str()) - 1;
if(selectedItemNumber < menuItems.size())
(menuItems[selectedItemNumber].first)(menuItems[selectedItemNumber].second);
else
std::clog << "\nInvalid command requested. Repeat your choice.\n";
}
std::clog << "\nClosing sample...\n" << std::endl;
Engine::shutdown();
}
catch(const std::exception & ex) {
processSampleException(ex.what());
}
return 0;
}