OnixS C++ FIX Engine  4.11.0
API Documentation
ExternalThread Mode

Sometimes, it is more effective to perform the event dispatching in a loop in the main application thread. For this purpose, one can use the OnixS::FIX::ThreadingModel::ExternalThread mode.

Stack

Before an externally-managed session is created, the application must create an OnixS::FIX::ISessionReactor instance. There are two reactor classes, which can be used:

TCPStandard::Stack stack;

The pointer to the stack is provided in Session's constructor, for example:

SessionListener listener;
Session session(&stack, SenderCompId, TargetCompId, FixProtocolVersion, &listener);

Asynchronous Logon and Logout

Applications must call asynchronous Session's methods to logon (OnixS::FIX::Session::logonAsInitiatorAsync) and logout (OnixS::FIX::Session::logoutAsync).

session.logonAsInitiatorAsync(CounterpartyHost, CounterpartyPort, HeartBtInt, ONIXS_FIXENGINE_NULLPTR, true);
// Dispatching events..
session.logoutAsync("End of connection");

Dispatching Network Events

Applications must call OnixS::FIX::ISessionReactor::dispatchEvents frequently for each stack that is in use.

bool finished = false;
while (!finished)
stack.dispatchEvents();

Thread Safety

Warning
Access to the stack instance at each moment is possible from only one thread. The TCPStandard/TCPDirect middlewares does not use inter-threaded synchronization, does not serialize calls, and does not check the correctness of access to its API by the user. Therefore, the event dispatching and message sending should be performed from the same thread.

Shutdown

The TCPStandard/TCPDirect stack requires finishing all outstanding work and handling all outstanding events. Before destroying the stack instance, the following code should be executed:

while (!stack.isQuiescent())
stack.dispatchEvents();

Method OnixS::FIX::ISessionReactor::isQuiescent returns a boolean value indicating whether a stack is quiescent.

This can be used to ensure that all connections have been closed gracefully before destroying a stack (or exiting the application). Destroying a stack while it is not quiescent is permitted by the API, but when doing so there is no guarantee that sent data has been acknowledged by the peer or even transmitted, and there is the possibility that peers' connections will be reset.

ExternalThread Mode for Acceptor Sessions

There is an ability to use the ExternalThread mode for acceptor sessions too. However, to dispatch listening events and accept incoming TCP connections, one must pass the pointer to the stack to the OnixS::FIX::Engine::init() method. The pointer to the same stack should also be passed to the acceptor session's constructor. In this case, the FIX Engine does not create an additional thread to listen for incoming connections and uses the corresponding stack to dispatch these events:

TCPStandard::Stack stack;
EngineSettings engineSettings;
// Invoke the Engine::init with the given stack class instance.
Engine::init(engineSettings, &stack);
SessionListener listener;
Session acceptor(&stack, SenderCompId, TargetCompId, FixProtocolVersion, &listener);
acceptor.logonAsAcceptor();
bool finished = false;
// Dispatching events..
while (!finished)
stack.dispatchEvents();
acceptor.shutdown();
Engine::shutdown();
while(!stack.isQuiescent())
stack.dispatchEvents();

Split the Sessions Processing by Different Stacks in Separate Threads

When the amount of high-loaded sessions is large, splitting sessions into groups and performing the event dispatching in separate threads makes sense and can improve performance. In this case, considering that the external thread reactors are not thread safe, each stack with corresponding sessions should be created in its thread:

class SessionReactorThread : public OnixS::Threading::Thread
{
typedef std::vector<SessionPtr> Sessions;
size_t sessionNumber_;
public:
explicit SessionReactorThread(size_t sessionNumber) :
OnixS::Threading::Thread("SessionReactorThread"),
sessionNumber_(sessionNumber)
{}
~SessionReactorThread() ONIXS_FIXENGINE_OVERRIDE
{
join();
}
bool isCompleted()
{
return isCompleted_.getFuture().isReady();
}
protected:
void run() ONIXS_FIXENGINE_OVERRIDE
{
// Initialize stack.
TCPStandard::Stack stack;
// Initialize sessions.
Sessions sessions;
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
sessions.push_back(createSession());
OnixS::Threading::SharedFuture<void> logonTask = sessions.back()->logonAsInitiatorAsync(CounterpartyHost, CounterpartyPort, HeartBtInt, ONIXS_FIXENGINE_NULLPTR, true);
while (!logonTask.isReady())
stack.dispatchEvents();
}
bool finished = false;
// Useful work and dispatching events...
while (!finished)
stack.dispatchEvents();
// Shutdown sessions.
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
OnixS::Threading::SharedFuture<void> logoutTask = sessions[sessionCounter]->logoutAsync("The session is disconnected");
while (!logoutTask.isReady())
stack.dispatchEvents();
sessions[sessionCounter]->shutdown();
}
isCompleted_.set();
while (!stack.isQuiescent())
stack.dispatchEvents();
}
};
// Initialize engine in the main thread.
EngineSettings engineSettings;
Engine::init(engineSettings);
// Initialize and start the required amount of threads.
size_t SessionNumber = 25;
size_t SessionPerThread = 5;
size_t ThreadNumber = SessionNumber / SessionPerThread;
typedef std::vector<SessionReactorThreadPtr> ReactorThreads;
ReactorThreads reactorThreads;
for(size_t reactorThreadCounter = 0; reactorThreadCounter < ThreadNumber; ++reactorThreadCounter)
{
reactorThreads.push_back(new SessionReactorThread(SessionPerThread));
reactorThreads.back()->start();
}
// Waiting for threads completion.
for(size_t reactorThreadCounter = 0; reactorThreadCounter < ThreadNumber; ++reactorThreadCounter)
{
reactorThreads[reactorThreadCounter]->join();
}
// Shutdown the FIX Engine.
Engine::shutdown();

Using Different Sessions with Different Threading Models

It is possible to use initiator sessions with different threading models. For example, if the Engine is configured to use the default threading model (OnixS::FIX::ThreadingModel::DedicatedThreads), and the reactor instance is passed to the session's constructor, then this particular session will use the OnixS::FIX::ThreadingModel::ExternalThread threading model. All other sessions will use the threading model configured for the whole FIX Engine. However, acceptor sessions can be used in the same threading mode only. This is because an acceptor connection is created when no corresponding acceptor session is identified. Therefore, the threading model is determined by whether the reactor instance is passed or not to the OnixS::FIX::Engine::init() method. If you pass the reactor instance to the OnixS::FIX::Engine::init() method, all acceptor connections and sessions will use the OnixS::FIX::ThreadingModel::ExternalThread threading model. If you do not pass the reactor instance to the OnixS::FIX::Engine::init() method, all acceptor connections and sessions will use the configured for the whole FIX Engine threading model.

See Also