OnixS C++ FIX Engine  4.10.1
API Documentation
Session Scheduler Sample

This sample demonstrates all aspects of the session scheduler functionality.

Source code:

#include <memory>
#include <fstream>
#include <iostream>
#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
using namespace Settings;
using namespace OnixS::FIX;
using namespace OnixS::FIX::Scheduling;
namespace {
class MySessionListener : public ISessionListener
{
public:
void onInboundApplicationMsg(Message &, Session *) ONIXS_FIXENGINE_FINAL {}
};
};
struct SchedulingTimeSpan {
class Seconds
{
public:
Seconds(int span)
: span_(span) {
}
int span() const {
return span_;
}
private:
int span_;
};
};
{
const int SecondsPerMinute = 60;
const int MinutesPerHour = 60;
const int SecondsPerHour = SecondsPerMinute * MinutesPerHour;
int sinceMidnight = time.sinceMidnight() + seconds.span();
return TimeOfDay(
sinceMidnight / SecondsPerHour,
(sinceMidnight / SecondsPerMinute) % MinutesPerHour,
sinceMidnight % SecondsPerMinute);
}
class SessionStateChangeTracer : public ISessionListener
{
void onStateChange(SessionState::Enum newState, SessionState::Enum /*prevState*/,
Session * session) ONIXS_FIXENGINE_FINAL {
std::cout << static_cast<const std::string &>(*session) << " changed its state to " <<
SessionState::toString(newState) << '.' << std::endl;
}
void onInboundApplicationMsg(Message &, Session *) ONIXS_FIXENGINE_FINAL {
;
}
};
class SchedulingIssueDetector : public SessionSchedulerListener
{
public:
void onLoggingOut(const SessionScheduler & /*scheduler*/, Session * /*session*/, bool * /*allowLogout*/) ONIXS_FIXENGINE_FINAL
{}
void onInitiatorConnecting(const SessionScheduler & /*scheduler*/, Session * /*session*/, const Counterparty & /*counterparty*/) ONIXS_FIXENGINE_FINAL
{}
void onWarning(const SessionScheduler & /*scheduler*/, Session * session, const std::string & warningReason) ONIXS_FIXENGINE_FINAL
{
std::cout << "Scheduler reported a warning for the session " << static_cast<const std::string &>(*session) << ": " << warningReason;
}
void onError(const SessionScheduler & /*scheduler*/, Session * session, const std::string & errorReason) ONIXS_FIXENGINE_FINAL
{
std::cout << "Error occurred while scheduling session " << static_cast<const std::string &>(*session) << ": " << errorReason;
}
};
class Sample
{
public:
explicit Sample(SchedulerThreadingModel::Enum model):
threadingModel_(model), reactor_(threadingModel_ == SchedulerThreadingModel::ExternalThread ? new TCPStandard::Stack() : ONIXS_FIXENGINE_NULLPTR)
{
representYourself();
initializeFixEngine();
constructSessions();
constructScheduler();
}
~Sample() {
initiator_->shutdown();
initiator_.reset();
acceptor_->shutdown();
acceptor_.reset();
scheduler_.reset();
Engine::shutdown();
if(threadingModel_ == SchedulerThreadingModel::ExternalThread)
{
while(!reactor_->isQuiescent())
reactor_->dispatchEvents();
}
}
void run() {
acceptor_->resetLocalSequenceNumbers();
initiator_->resetLocalSequenceNumbers();
acceptor_->logonAsAcceptor();
std::cout << "Scheduling Initiator session " << static_cast<const std::string &>
(*initiator_) << " for automatic connection." << std::endl;
SessionSchedule initiatorSchedule = constructShortTimeActivitySchedule();
InitiatorConnectionSettings initiatorConnectivity("localhost", ListenPort);
scheduler_->add(initiator_.get(), initiatorSchedule, initiatorConnectivity);
std::cout << "Waiting for activity on scheduled session " << static_cast<const std::string &>
(*initiator_) << '.' << std::endl << std::endl;
waitUntilState(initiator_.get(), SessionState::Active);
waitUntilState(initiator_.get(), SessionState::Disconnected);
std::cout << std::endl << "Removing Initiator session " << static_cast<const std::string &>
(*initiator_) << " from scheduling service." << std::endl;
// Session had to 'pulse' connection till this time, so scheduling can be destroyed.
scheduler_->remove(initiator_.get());
// Clean-up.
acceptor_->logout();
}
void useConfigurationFile() {
initiator_->resetLocalSequenceNumbers();
SessionSchedulerOptions schedulerOptions;
schedulerOptions.configurationFile("SchedulerConfiguration.xml");
SessionScheduler scheduler(schedulerOptions);
scheduler.add(initiator_.get(), "ScheduleId", "ConnectionId");
OnixS::Threading::ThisThread::sleep(1000 * sessionActivityTimeInSeconds());
scheduler_->remove(initiator_.get());
}
private:
MySessionListener acceptorListener_;
SessionStateChangeTracer initiatorStateChangeTracer_;
SchedulingIssueDetector schedulingIssueDetector_;
void constructScheduler() {
SessionSchedulerOptions schedulerOptions;
schedulerOptions.eventListener(&schedulingIssueDetector_);
schedulerOptions.threadingModel(threadingModel_);
scheduler_.reset(new SessionScheduler(schedulerOptions));
}
void constructSessions() {
acceptor_.reset(new Session(TargetCompId, SenderCompId, FixProtocolVersion, &acceptorListener_));
initiator_.reset(new Session(reactor_.get(), SenderCompId, TargetCompId, FixProtocolVersion, &initiatorStateChangeTracer_));
}
static SessionSchedule constructShortTimeActivitySchedule() {
TimeOfDay now = TimeOfDay::now();
TimeOfDay logonTime = now + SchedulingTimeSpan::Seconds(5);
TimeOfDay logoutTime = logonTime + SchedulingTimeSpan::Seconds(sessionActivityTimeInSeconds());
DayOfWeek::Monday,
DayOfWeek::Sunday,
logonTime,
logoutTime,
SequenceNumberResetPolicy::Never);
}
void waitUntilState(Session * session, SessionState::Enum state) {
const unsigned oneSecondPause = 1000;
const int spinWaitMicrosecondPause = 10;
while(session->state() != state)
{
if(threadingModel_ == SchedulerThreadingModel::DedicatedThreads)
else
{
scheduler_->dispatchEvents();
reactor_->dispatchEvents();
OnixS::Threading::ThisThread::spinWait(spinWaitMicrosecondPause);
}
}
}
static void initializeFixEngine() {
EngineSettings settings;
settings.licenseStore(LicenseStore)
.listenPort(ListenPort);
Engine::init(settings);
}
static int sessionActivityTimeInSeconds() {
return 30;
}
void representYourself() {
std::cout << "OnixS C++ FIX Engine Session Scheduling Sample with " << SchedulerThreadingModel::toString(threadingModel_) << "." << std::endl << std::endl;
std::cout << "Usage: SessionScheduler threadingModel[DedicatedThreads][ExternalThread]" << std::endl << std::endl;
}
};
int main(int argc, char * argv[])
{
try {
const std::string ThreadingModel = (argc > 1 ? argv[1] : "DedicatedThreads");
Sample sample(ThreadingModel == "ExternalThread" ? SchedulerThreadingModel::ExternalThread : SchedulerThreadingModel::DedicatedThreads);
sample.run();
std::cout << std::endl << "Done." << std::endl;
return 0;
}
catch(const std::exception & ex) {
processSampleException(ex.what());
return 1;
}
}