This sample demonstrates all aspects of the session scheduler functionality.
#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
using namespace Settings;
namespace {
{
public:
};
};
struct SchedulingTimeSpan {
{
public:
: span_(span) {
}
int span() const {
return span_;
}
private:
int span_;
};
};
{
const int SecondsPerMinute = 60;
const int MinutesPerHour = 60;
const int SecondsPerHour = SecondsPerMinute * MinutesPerHour;
sinceMidnight / SecondsPerHour,
(sinceMidnight / SecondsPerMinute) % MinutesPerHour,
sinceMidnight % SecondsPerMinute);
}
{
void onStateChange(SessionState::Enum newState, SessionState::Enum ,
std::cout << static_cast<const std::string &>(*session) << " changed its state to " <<
SessionState::toString(newState) << '.' << std::endl;
}
;
}
};
{
public:
{}
void onInitiatorConnecting(
const SessionScheduler & , Session * ,
const Counterparty & )
ONIXS_FIXENGINE_FINAL
{}
void onWarning(
const SessionScheduler & , Session * session,
const std::string & warningReason)
ONIXS_FIXENGINE_FINAL
{
std::cout << "Scheduler reported a warning for the session " << static_cast<const std::string &>(*session) << ": " << warningReason;
}
void onError(
const SessionScheduler & , Session * session,
const std::string & errorReason)
ONIXS_FIXENGINE_FINAL
{
std::cout << "Error occurred while scheduling session " << static_cast<const std::string &>(*session) << ": " << errorReason;
}
};
class Sample
{
public:
explicit Sample(SchedulerThreadingModel::Enum model):
threadingModel_(model), reactor_(threadingModel_ == SchedulerThreadingModel::ExternalThread ? new TCPStandard::Stack() :
ONIXS_FIXENGINE_NULLPTR)
{
representYourself();
initializeFixEngine();
constructSessions();
constructScheduler();
}
~Sample() {
initiator_->shutdown();
initiator_.reset();
acceptor_->shutdown();
acceptor_.reset();
scheduler_.reset();
Engine::shutdown();
if(threadingModel_ == SchedulerThreadingModel::ExternalThread)
{
while(!reactor_->isQuiescent())
reactor_->dispatchEvents();
}
}
void run() {
acceptor_->resetLocalSequenceNumbers();
initiator_->resetLocalSequenceNumbers();
acceptor_->logonAsAcceptor();
std::cout << "Scheduling Initiator session " << static_cast<const std::string &>
(*initiator_) << " for automatic connection." << std::endl;
SessionSchedule initiatorSchedule = constructShortTimeActivitySchedule();
InitiatorConnectionSettings initiatorConnectivity("localhost", ListenPort);
scheduler_->add(initiator_.get(), initiatorSchedule, initiatorConnectivity);
std::cout << "Waiting for activity on scheduled session " << static_cast<const std::string &>
(*initiator_) << '.' << std::endl << std::endl;
waitUntilState(initiator_.get(), SessionState::Active);
waitUntilState(initiator_.get(), SessionState::Disconnected);
std::cout << std::endl << "Removing Initiator session " << static_cast<const std::string &>
(*initiator_) << " from scheduling service." << std::endl;
scheduler_->remove(initiator_.get());
acceptor_->logout();
}
void useConfigurationFile() {
initiator_->resetLocalSequenceNumbers();
SessionSchedulerOptions schedulerOptions;
SessionScheduler scheduler(schedulerOptions);
scheduler.add(initiator_.get(), "ScheduleId", "ConnectionId");
scheduler_->remove(initiator_.get());
}
private:
SchedulerThreadingModel::Enum threadingModel_;
MySessionListener acceptorListener_;
SessionStateChangeTracer initiatorStateChangeTracer_;
SchedulingIssueDetector schedulingIssueDetector_;
void constructScheduler() {
SessionSchedulerOptions schedulerOptions;
scheduler_.reset(new SessionScheduler(schedulerOptions));
}
void constructSessions() {
acceptor_.reset(new Session(TargetCompId, SenderCompId, FixProtocolVersion, &acceptorListener_));
initiator_.reset(new Session(reactor_.get(), SenderCompId, TargetCompId, FixProtocolVersion, &initiatorStateChangeTracer_));
}
static SessionSchedule constructShortTimeActivitySchedule() {
TimeOfDay now = TimeOfDay::now();
TimeOfDay logonTime = now + SchedulingTimeSpan::Seconds(5);
TimeOfDay logoutTime = logonTime + SchedulingTimeSpan::Seconds(sessionActivityTimeInSeconds());
return SessionSchedule(
DayOfWeek::Monday,
DayOfWeek::Sunday,
logonTime,
logoutTime,
SessionDuration::Day,
SequenceNumberResetPolicy::Never);
}
void waitUntilState(Session * session, SessionState::Enum state) {
const unsigned oneSecondPause = 1000;
const int spinWaitMicrosecondPause = 10;
while(session->
state() != state)
{
if(threadingModel_ == SchedulerThreadingModel::DedicatedThreads)
else
{
scheduler_->dispatchEvents();
reactor_->dispatchEvents();
}
}
}
static void initializeFixEngine() {
EngineSettings settings;
Engine::init(settings);
}
static int sessionActivityTimeInSeconds() {
return 30;
}
void representYourself() {
std::cout << "OnixS C++ FIX Engine Session Scheduling Sample with " << SchedulerThreadingModel::toString(threadingModel_) << "." << std::endl << std::endl;
std::cout << "Usage: SessionScheduler threadingModel[DedicatedThreads][ExternalThread]" << std::endl << std::endl;
}
};
int main(int argc, char * argv[])
{
try {
const std::string
ThreadingModel = (argc > 1 ? argv[1] :
"DedicatedThreads");
sample.run();
std::cout << std::endl << "Done." << std::endl;
return 0;
}
catch(const std::exception & ex) {
processSampleException(ex.what());
return 1;
}
}