Sometimes, it is more effective to perform the event dispatching in a loop in the main application thread. For this purpose, one can use the OnixS::FIX::ThreadingModel::ExternalThread mode.
Stack
Before an externally-managed session is created, the application must create an OnixS::FIX::ISessionReactor instance. There are two reactor classes, which can be used:
TCPStandard::Stack stack;
The pointer to the stack is provided in Session's constructor, for example:
SessionListener listener;
Session session(&stack, SenderCompId, TargetCompId, FixProtocolVersion, &listener);
Asynchronous Logon and Logout
Applications must call asynchronous Session's methods to logon (OnixS::FIX::Session::logonAsInitiatorAsync) and logout (OnixS::FIX::Session::logoutAsync).
session.
logonAsInitiatorAsync(CounterpartyHost, CounterpartyPort, HeartBtInt, ONIXS_FIXENGINE_NULLPTR,
true);
Dispatching Network Events
Applications must call OnixS::FIX::ISessionReactor::dispatchEvents frequently for each stack that is in use.
bool finished = false;
while (!finished)
stack.dispatchEvents();
Thread Safety
- Warning
- Access to the stack instance at each moment is possible from only one thread. The TCPStandard/TCPDirect middlewares does not use inter-threaded synchronization, does not serialize calls, and does not check the correctness of access to its API by the user. Therefore, the event dispatching and message sending should be performed from the same thread.
Shutdown
The TCPStandard/TCPDirect stack requires finishing all outstanding work and handling all outstanding events. Before destroying the stack instance, the following code should be executed:
while (!stack.isQuiescent())
stack.dispatchEvents();
Method OnixS::FIX::ISessionReactor::isQuiescent returns a boolean value indicating whether a stack is quiescent.
This can be used to ensure that all connections have been closed gracefully before destroying a stack (or exiting the application). Destroying a stack while it is not quiescent is permitted by the API, but when doing so there is no guarantee that sent data has been acknowledged by the peer or even transmitted, and there is the possibility that peers' connections will be reset.
ExternalThread Mode for Acceptor Sessions
There is an ability to use the ExternalThread mode for acceptor sessions too. However, to dispatch listening events and accept incoming TCP connections, one must pass the pointer(s) to the stack(s) to the OnixS::FIX::Engine::init() or OnixS::FIX::Engine::addListenPort() methods. The pointer to the same stack (if a few stack pointers are passed, any of them) should also be passed to the acceptor session's constructor. In this case, the FIX Engine does not create an additional thread to listen for incoming connections and uses the corresponding stack(s) to dispatch these events:
TCPStandard::Stack stack;
EngineSettings engineSettings;
Engine::init(engineSettings, &stack);
SessionListener listener;
Session acceptor(&stack, SenderCompId, TargetCompId, FixProtocolVersion, &listener);
acceptor.logonAsAcceptor();
bool finished = false;
while (!finished)
stack.dispatchEvents();
acceptor.shutdown();
Engine::shutdown();
while(!stack.isQuiescent())
stack.dispatchEvents();
Split the Sessions Processing by Different Stacks in Separate Threads
When the amount of high-loaded sessions is large, splitting sessions into groups and performing the event dispatching in separate threads makes sense and can improve performance. In this case, considering that the external thread reactors are not thread safe, each stack with corresponding sessions should be created in its thread.
Split the Initiator Sessions Processing by Different Stacks in Separate Threads
{
typedef std::vector<SessionPtr> Sessions;
size_t sessionNumber_;
public:
explicit InitiatorSessionReactorThread(size_t sessionNumber) :
OnixS::Threading::Thread(
"SessionReactorThread"),
sessionNumber_(sessionNumber)
{}
~InitiatorSessionReactorThread() ONIXS_FIXENGINE_OVERRIDE
{
}
protected:
void run() ONIXS_FIXENGINE_OVERRIDE
{
TCPStandard::Stack stack;
Sessions sessions;
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
sessions.push_back(createSession());
stack.dispatchEvents();
}
bool finished = false;
while (!finished)
stack.dispatchEvents();
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
stack.dispatchEvents();
sessions[sessionCounter]->shutdown();
}
while (!stack.isQuiescent())
stack.dispatchEvents();
}
};
EngineSettings engineSettings;
Engine::init(engineSettings);
size_t SessionNumber = 25;
size_t SessionPerThread = 5;
size_t ThreadNumber = SessionNumber / SessionPerThread;
typedef std::vector<InitiatorSessionReactorThreadPtr> ReactorThreads;
ReactorThreads reactorThreads;
for(size_t reactorThreadCounter = 0; reactorThreadCounter < ThreadNumber; ++reactorThreadCounter)
{
reactorThreads.push_back(new InitiatorSessionReactorThread(SessionPerThread));
reactorThreads.back()->start();
}
for(size_t reactorThreadCounter = 0; reactorThreadCounter < ThreadNumber; ++reactorThreadCounter)
{
reactorThreads[reactorThreadCounter]->join();
}
Engine::shutdown();
Split the Acceptor Sessions Processing by Different Stacks in Separate Threads
To split acceptor Sessions by separate threads, the OnixS::FIX::Engine::addListenPort() method needs to be used to pass the corresponding listen port(s) and reactor(s). Each thread should use a separate listen port(s) because the same listen port(s) cannot be processed by different reactors. When the thread is completed, the corresponding listen port(s) and reactor(s) must be removed using the OnixS::FIX::Engine::removeListenPort() method.
{
typedef std::vector<SessionPtr> Sessions;
size_t sessionNumber_;
public:
explicit AcceptorSessionReactorThread(size_t sessionNumber) :
OnixS::Threading::Thread(
"SessionReactorThread"),
sessionNumber_(sessionNumber)
{}
~AcceptorSessionReactorThread() ONIXS_FIXENGINE_OVERRIDE
{
}
protected:
void run() ONIXS_FIXENGINE_OVERRIDE
{
TCPStandard::Stack stack;
Engine::instance()->addListenPort(CurrentThreadListenPort, &stack);
Sessions sessions;
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
sessions.push_back(createSession());
sessions.back()->logonAsAcceptor();
}
bool finished = false;
while (!finished)
stack.dispatchEvents();
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
stack.dispatchEvents();
}
Engine::instance()->removeListenPort(CurrentThreadListenPort);
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
sessions[sessionCounter]->shutdown();
}
while (!stack.isQuiescent())
stack.dispatchEvents();
}
};
EngineSettings engineSettings;
Engine::init(engineSettings);
size_t SessionNumber = 25;
size_t SessionPerThread = 5;
size_t ThreadNumber = SessionNumber / SessionPerThread;
typedef std::vector<AcceptorSessionReactorThreadPtr> ReactorThreads;
ReactorThreads reactorThreads;
for(size_t reactorThreadCounter = 0; reactorThreadCounter < ThreadNumber; ++reactorThreadCounter)
{
reactorThreads.push_back(new AcceptorSessionReactorThread(SessionPerThread));
reactorThreads.back()->start();
}
for(size_t reactorThreadCounter = 0; reactorThreadCounter < ThreadNumber; ++reactorThreadCounter)
{
reactorThreads[reactorThreadCounter]->join();
}
Engine::shutdown();
Using Different Sessions with Different Threading Models
It is possible to use initiator sessions with different threading models. For example, if the Engine is configured to use the default threading model (OnixS::FIX::ThreadingModel::DedicatedThreads), and the reactor instance is passed to the session's constructor, then this particular session will use the OnixS::FIX::ThreadingModel::ExternalThread threading model. All other sessions will use the threading model configured for the whole FIX Engine. However, acceptor sessions can be used in the same threading mode only. This is because an acceptor connection is created when no corresponding acceptor session is identified. Therefore, the threading model is determined by whether the reactor instance(s) is passed or not to the OnixS::FIX::Engine::init() or OnixS::FIX::Engine::addListenPort() methods. If you pass the reactor instance(s) to the OnixS::FIX::Engine::init() or OnixS::FIX::Engine::addListenPort() methods, all acceptor connections and sessions will use the OnixS::FIX::ThreadingModel::ExternalThread threading model. If you do not pass the reactor instance(s) to the OnixS::FIX::Engine::init()or OnixS::FIX::Engine::addListenPort() methods, all acceptor connections and sessions will use the configured for the whole FIX Engine threading model.
See Also