OnixS C++ CME iLink 3 Binary Order Entry Handler 1.19.0
API Documentation
Loading...
Searching...
No Matches
Session Scheduler Sample

This sample demonstrates the usage of the Session Scheduler functionality.

Source code

#include "../Common/Helpers.h"
using namespace OnixS::CME;
using namespace OnixS::CME::iLink3;
TimeOfDay operator+ (TimeOfDay time, Scheduling::Seconds seconds)
{
const Scheduling::Seconds SecondsPerMinute = 60;
const int MinutesPerHour = 60;
const Scheduling::Seconds SecondsPerHour = SecondsPerMinute * MinutesPerHour;
const Scheduling::Seconds sinceMidnight = time.sinceMidnight() + seconds;
return TimeOfDay(
sinceMidnight / SecondsPerHour,
(sinceMidnight / SecondsPerMinute) % MinutesPerHour,
sinceMidnight % SecondsPerMinute);
}
class SessionStateChangeTracer : public SessionListener
{
void onStateChange(SessionStateId::Enum newState, SessionStateId::Enum /*prevState*/, Session * session) override
{
std::cout << session->toString() << " changed its state to " << SessionStateId::toString(newState) << '.' << std::endl;
}
};
class SchedulingIssueDetector : public SessionSchedulerListener
{
public:
SchedulingIssueDetector() = default;
~SchedulingIssueDetector() override = default;
void onLoggingOut(const SessionScheduler & /*scheduler*/, Session * /*session*/, bool * /*allowLogout*/) override
{
}
void onWarning(const SessionScheduler & /*scheduler*/, Session * session, const std::string & warningReason) override
{
std::cout << "Scheduler reported a warning for the session " << session->toString() << ": " << warningReason;
}
void onError(const SessionScheduler & /*scheduler*/, Session * session, const std::string & errorReason) override
{
std::cout << "Error occurred while scheduling session " << session->toString() << ": " << errorReason;
}
};
class Sample
{
public:
explicit Sample(SchedulerThreadingModel::Enum model)
: session_(nullptr)
, scheduler_(nullptr)
, reactor_(model == SchedulerThreadingModel::External ? new TcpStandardStack() : nullptr)
{
about();
}
~Sample()
{
if(reactor_)
{
if(session_)
session_->disconnectAsync();
while(!reactor_->isQuiescent())
reactor_->dispatchEvents();
}
else
{
if(session_)
session_->disconnect();
}
delete session_;
delete scheduler_;
delete reactor_;
}
void run(int marketSegmentId, const std::string& host, Scheduling::Port port)
{
constructSession(marketSegmentId);
constructScheduler();
std::cout << "Scheduling session " << session_->toString() << " for automatic connection." << std::endl;
const SessionSchedule sessionSchedule = constructShortTimeActivitySchedule();
const SessionConnectionSettings sessionConnectivity(host, port);
scheduler_->add(session_, sessionSchedule, sessionConnectivity);
std::cout << "Waiting for activity on scheduled session " << session_->toString() << '.' << std::endl << std::endl;
// Should sleep for a while to let Scheduler connect session.
waitUntilState(session_, SessionStateId::Established);
waitUntilState(session_, SessionStateId::Disconnected);
std::cout << std::endl << "Removing session " << session_->toString() << " from scheduling service." << std::endl;
// The session had to 'pulse' connection till this time so that scheduling could be destroyed.
scheduler_->remove(session_);
}
void useConfigurationFile()
{
SessionSchedulerOptions schedulerOptions;
schedulerOptions.configurationFile("SchedulerConfiguration.xml");
SessionScheduler scheduler(schedulerOptions);
scheduler.add(session_, "ScheduleId", "ConnectionId");
Threading::ThisThread::sleep(1000 * sessionActivityTimeInSeconds());
scheduler_->remove(session_);
}
private:
Session* session_;
SessionStateChangeTracer sessionStateChangeTracer_;
SessionScheduler* scheduler_;
SchedulingIssueDetector schedulingIssueDetector_;
TcpStandardStack* reactor_;
void constructScheduler()
{
SessionSchedulerOptions schedulerOptions;
schedulerOptions.eventListener(&schedulingIssueDetector_);
if(reactor_)
schedulerOptions.threadingModel(SchedulerThreadingModel::External);
scheduler_ = new SessionScheduler(schedulerOptions);
}
void constructSession(int marketSegmentId)
{
const SessionSettings settings = Samples::fillSettings(true);
session_ = reactor_ ? new Session(*reactor_, settings, marketSegmentId, &sessionStateChangeTracer_) : new Session(settings, marketSegmentId, &sessionStateChangeTracer_);
}
static SessionSchedule constructShortTimeActivitySchedule()
{
const TimeOfDay now = TimeOfDay::now();
const TimeOfDay logonTime = now + Scheduling::Seconds(5);
const TimeOfDay logoutTime = logonTime + Scheduling::Seconds(sessionActivityTimeInSeconds());
return SessionSchedule(DayOfWeek::now(), DayOfWeek::now(), logonTime, logoutTime);
}
void waitUntilState(Session* session, SessionStateId::Enum state) {
const unsigned oneSecondPause = 1000;
const int spinWaitMicrosecondPause = 10;
while(session->state() != state)
{
if(reactor_)
{
scheduler_->dispatchEvents();
reactor_->dispatchEvents();
Threading::ThisThread::spinWait(spinWaitMicrosecondPause);
}
else
Threading::ThisThread::sleep(oneSecondPause);
}
}
static int sessionActivityTimeInSeconds()
{
return 30;
}
void about()
{
std::cout << "OnixS C++ CME iLink 3 Session Scheduling Sample with " << SchedulerThreadingModel::toString(reactor_ ? SchedulerThreadingModel::External : SchedulerThreadingModel::Dedicated) << " Threading Model." << std::endl << std::endl;
}
};
void usage()
{
std::cerr << "usage: [MarketSegmentId] [Host] [Port] [ThreadingModel]" << std::endl;
}
int main(int argc, char * argv[])
{
try
{
int marketSegmentId = 0;
std::string host;
iLink3::Port port = 0;
std::string threadingModel;
if(argc >= 2)
marketSegmentId = atoi(argv[1]);
if(argc >= 3)
host = argv[2];
if(argc >= 4)
port = atoi(argv[3]);
if(argc >= 5)
threadingModel = argv[4];
if(marketSegmentId == 0 || host.empty() || port == 0)
{
usage();
return 1;
}
Sample sample(threadingModel == "External" ? SchedulerThreadingModel::External : SchedulerThreadingModel::Dedicated);
sample.run(marketSegmentId, host, port);
std::cout << std::endl << "Done." << std::endl;
}
catch(const std::exception & ex)
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}
return 0;
}