OnixS C++ FIX Engine  4.12.0
API Documentation
Solarflare Specific Features

Solarflare Onload

Solarflare Onload is a sockets acceleration technology for the transparent acceleration of sockets-based applications.

On Linux, the FIX Engine uses Solarflare Onload Extensions API to support additional features of Solarflare network cards. If you use such network cards, in your system, you can get an additional benefit from advanced features of such network cards.

FIX Engine automatically detects if the corresponding Onload library installed in your system and if so, then loads it in run-time and provides an ability to use additional features. In this case, there should be the record, in the FIX Engine log, that the corresponding version of the Onload extension library is used.

Note
If you want to test Onload features on the loopback network interface, then EF_TCP_CLIENT_LOOPBACK and EF_TCP_SERVER_LOOPBACK parameters should be used, e.g. :
EF_TCP_CLIENT_LOOPBACK=1 EF_TCP_SERVER_LOOPBACK=1 onload ..

Please see the "Onload User Guide".

Currently, the FIX Engine supports the following Onload features:

When you use the OnixS::FIX::Session::warmUp method, the FIX Engine automatically enables this feature if possible. As a result, you can get the maximal warmup effect including the complete warm-up of the sending path of the TCP stack.

The onload_stackdump utility could be used to monitor the usage of this feature.

For example:

onload_stackdump lots | grep warm

In order to use this feature you need to use the overloaded OnixS::FIX::Session::send(FlatMessage * msg, SessionSendMode::Enum mode) method with the OnixS::FIX::SessionSendMode::OnloadZeroCopy mode. As a result, you can avoid the additional data copying from an application allocated buffer to the network adapter buffer and can decrease the sending latency.

This feature also is supported when one is sending messages in a batch. In this case, you need to use the overloaded OnixS::FIX::Session::send(FlatMessageBatch & msgs, SessionSendMode::Enum mode, size_t maxPacketSize) method.

Note
If a Solarflare network card is not present in the system and you use this feature then the usual socket send call will be performed.
The Zero-Copy feature requires the allocation of specific buffers from NIC. After each successful send, the corresponding buffers become invalid, so it needs to reallocate them. This reallocation is performed right after the send method is completed. This adds additional latency to the OnixS::FIX::Session::send overall latency.

Solarflare TCPDirect

Solarflare's TCPDirect is a highly accelerated network middleware. It is similar to Onload but provides lower latency.

Stack

Before a TCPDirect-enabled session is created, the application must create an OnixS::FIX::TCPDirect::Stack instance.

TCPDirect::Attributes attributes;
// The default interface name could be provided by environment variable
// ZF_ATTR="interface=<interface-name>", see TCPDirect specs,
//
// or could be set manually:
// attr.set("interface", "interface-name");
TCPDirect::Stack stack(attributes);

Attributes (OnixS::FIX::TCPDirect::Attributes) control the configuration of the stack and its behavior. Please see the "Solarflare TCPDirect User Guide" for a complete list of available attributes and their description.

Note
The application should create TCPDirect-enabled sessions in the same thread where it has created the corresponding OnixS::FIX::TCPDirect::Stack instance.

The pointer to the stack is provided in Session's constructor, for example:

SessionListener listener;
Session session(&stack, SenderCompId, TargetCompId, FixProtocolVersion, &listener);

By default, each TCPDirect stack can handle up to 64 TCP endpoints, so this is the maximum number of sessions that could be created using the same stack instance.

The max_tcp_endpoints attribute can set the maximum number of TCP endpoints. Please refer to Solarflare TCPDirect documentation.

See also max_tcp_listen_endpoints, max_tcp_syn_backlog, max_udp_rx_endpoints, max_udp_tx_endpoints attributes.

These attributes could be set to a minimum value to save hardware and software resources.

Note
The TCPDirect does not support loopback connections, so it needs to test it between different machines or NICs.
The TCPDirect requires the allocation of huge pages. Please see the "Solarflare TCPDirect User Guide" for details. Huge pages are generally enabled by default. One can use, e.g., the following command to check the current state of huge pages:
cat /sys/kernel/mm/transparent_hugepage/enabled

Asynchronous Logon and Logout

Applications must call asynchronous Session's methods to logon (OnixS::FIX::Session::logonAsInitiatorAsync) and logout (OnixS::FIX::Session::logoutAsync).

session.logonAsInitiatorAsync(CounterpartyHost, CounterpartyPort, HeartBtInt, ONIXS_FIXENGINE_NULLPTR, true);
// Dispatching events..
session.logoutAsync("End of connection");

Dispatching Network Events

Applications must call OnixS::FIX::TCPDirect::Stack::dispatchEvents frequently for each stack that is in use.

bool finished = false;
while (!finished)
stack.dispatchEvents();

Thread Safety

Warning
Access to the stack instance at each moment is possible from only one thread. The TCPDirect middleware does not use inter-threaded synchronization, does not serialize calls, and does not check the correctness of access to its API by the user. Therefore, the event dispatching and message sending should be performed from the same thread.

Shutdown

The TCPDirect stack requires finishing all outstanding work and handling all outstanding events. Before destroying the OnixS::FIX::TCPDirect::Stack instance, the following code should be executed:

while (!stack.isQuiescent())
stack.dispatchEvents();

Method OnixS::FIX::TCPDirect::Stack::isQuiescent returns a boolean value indicating whether a stack is quiescent.

This can be used to ensure that all connections have been closed gracefully before destroying a stack (or exiting the application). Destroying a stack while it is not quiescent is permitted by the API, but when doing so there is no guarantee that sent data has been acknowledged by the peer or even transmitted, and there is the possibility that peers' connections will be reset.

TCPDirect for Acceptor Sessions

There is an ability to use the TCPDirect mode for acceptor sessions too. However, to dispatch listening events and accept incoming TCP connections, one must pass the pointer(s) to the OnixS::FIX::TCPDirect::Stack object(s) to the OnixS::FIX::Engine::init() or OnixS::FIX::Engine::addListenPort() methods. The pointer to the same stack (if a few stack pointers are passed, any of them) should also be passed to the acceptor session's constructor. In this case, the FIX Engine does not create an additional thread to listen for incoming connections and uses the corresponding stack(s) to dispatch these events:

ifaces.push_back(ListenAddress);
EngineSettings engineSettings;
// Set listen local network interface, which corresponds to Solarflare's NIC.
engineSettings.localNetworkInterface(ifaces);
// Set the listen port.
engineSettings.listenPort(ListenPort);
TCPDirect::Attributes attributes;
// Set the interface name, which corresponds to Solarflare's NIC.
attributes.networkInterface(NetworkInterfaceName);
TCPDirect::Stack stack(attributes);
// Invoke the Engine::init with the given stack class instance.
Engine::init(engineSettings, &stack);
SessionListener listener;
Session acceptor(&stack, SenderCompId, TargetCompId, FixProtocolVersion, &listener);
acceptor.logonAsAcceptor();
bool finished = false;
// Dispatching events..
while (!finished)
stack.dispatchEvents();
acceptor.shutdown();
Engine::shutdown();
while (!stack.isQuiescent())
stack.dispatchEvents();

The TDPDirect limits the number of listen ports by the max_tcp_listen_endpoints attribute and acceptor sessions by the max_tcp_endpoints attribute per one stack. To overcome this limit, one can use more OnixS::FIX::TCPDirect::Stack objects. However, different reactors cannot process the same listen port(s). Therefore, each reactor and corresponding acceptor session(s) should use different listen port(s):

ifaces.push_back(ListenAddress);
EngineSettings engineSettings;
// Set listen local network interface, which corresponds to Solarflare's NIC.
engineSettings.localNetworkInterface(ifaces);
// Set listen ports.
ListenPorts listenPorts;
listenPorts.push_back(4200);
listenPorts.push_back(4201);
engineSettings.listenPorts(listenPorts);
TCPDirect::Attributes attributes;
// Set the interface name, which corresponds to Solarflare's NIC.
attributes.networkInterface(NetworkInterfaceName);
TCPDirect::Stack stack1(attributes);
TCPDirect::Stack stack2(attributes);
ISessionReactors reactors;
reactors.push_back(&stack1);
reactors.push_back(&stack2);
// Invoke the Engine::init with given stack class instances.
Engine::init(engineSettings, reactors);
SessionListener listener;
// Pass any of current stacks to the acceptor session's constructor.
Session acceptor(&stack1, SenderCompId, TargetCompId, FixProtocolVersion, &listener);
acceptor.logonAsAcceptor();
bool finished = false;
// Dispatching events..
while (!finished)
{
stack1.dispatchEvents();
stack2.dispatchEvents();
}
acceptor.shutdown();
Engine::shutdown();
while (!stack1.isQuiescent() || !stack2.isQuiescent())
{
stack1.dispatchEvents();
stack2.dispatchEvents();
}

Split the Sessions Processing by Different Stacks in Separate Threads

When the amount of high-loaded sessions is large, splitting sessions into groups and performing the event dispatching in separate threads makes sense and can improve performance. In this case, considering that the external thread reactors are not thread safe, each stack with corresponding sessions should be created in its thread.

Split the Initiator Sessions Processing by Different Stacks in Separate Threads

class InitiatorSessionReactorThread : public OnixS::Threading::Thread
{
typedef std::vector<SessionPtr> Sessions;
size_t sessionNumber_;
public:
explicit InitiatorSessionReactorThread(size_t sessionNumber) :
OnixS::Threading::Thread("SessionReactorThread"),
sessionNumber_(sessionNumber)
{}
~InitiatorSessionReactorThread() ONIXS_FIXENGINE_OVERRIDE
{
join();
}
protected:
void run() ONIXS_FIXENGINE_OVERRIDE
{
// Initialize stack.
TCPDirect::Attributes attributes;
// Set the interface name, which corresponds to Solarflare's NIC.
attributes.networkInterface(NetworkInterfaceName);
TCPDirect::Stack stack(attributes);
// Initialize sessions.
Sessions sessions;
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
sessions.push_back(createSession());
OnixS::Threading::SharedFuture<void> logonTask = sessions.back()->logonAsInitiatorAsync(CounterpartyHost, CounterpartyPort, HeartBtInt, ONIXS_FIXENGINE_NULLPTR, true);
while (!logonTask.isReady())
stack.dispatchEvents();
}
bool finished = false;
// Useful work and dispatching events...
while (!finished)
stack.dispatchEvents();
// Shutdown sessions.
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
OnixS::Threading::SharedFuture<void> logoutTask = sessions[sessionCounter]->logoutAsync("The session is disconnected");
while (!logoutTask.isReady())
stack.dispatchEvents();
sessions[sessionCounter]->shutdown();
}
while (!stack.isQuiescent())
stack.dispatchEvents();
}
};
ifaces.push_back(ListenAddress);
// Initialize engine in the main thread.
EngineSettings engineSettings;
// Set listen local network interface, which corresponds to Solarflare's NIC.
engineSettings.localNetworkInterface(ifaces);
Engine::init(engineSettings);
// Initialize and start the required amount of threads.
size_t SessionNumber = 25;
size_t SessionPerThread = 5;
size_t ThreadNumber = SessionNumber / SessionPerThread;
typedef std::vector<InitiatorSessionReactorThreadPtr> ReactorThreads;
ReactorThreads reactorThreads;
for(size_t reactorThreadCounter = 0; reactorThreadCounter < ThreadNumber; ++reactorThreadCounter)
{
reactorThreads.push_back(new InitiatorSessionReactorThread(SessionPerThread));
reactorThreads.back()->start();
}
// Waiting for threads completion.
for(size_t reactorThreadCounter = 0; reactorThreadCounter < ThreadNumber; ++reactorThreadCounter)
{
reactorThreads[reactorThreadCounter]->join();
}
// Shutdown the FIX Engine.
Engine::shutdown();

Split the Acceptor Sessions Processing by Different Stacks in Separate Threads

To split acceptor Sessions by separate threads, the OnixS::FIX::Engine::addListenPort() method needs to be used to pass the corresponding listen port(s) and reactor(s). Each thread should use a separate listen port(s) because the same listen port(s) cannot be processed by different reactors. When the thread is completed, the corresponding listen port(s) and reactor(s) must be removed using the OnixS::FIX::Engine::removeListenPort() method.

class AcceptorSessionReactorThread : public OnixS::Threading::Thread
{
typedef std::vector<SessionPtr> Sessions;
size_t sessionNumber_;
public:
explicit AcceptorSessionReactorThread(size_t sessionNumber) :
OnixS::Threading::Thread("SessionReactorThread"),
sessionNumber_(sessionNumber)
{}
~AcceptorSessionReactorThread() ONIXS_FIXENGINE_OVERRIDE
{
join();
}
protected:
void run() ONIXS_FIXENGINE_OVERRIDE
{
// Initialize stack.
TCPDirect::Attributes attributes;
// Set the interface name, which corresponds to Solarflare's NIC.
attributes.networkInterface(NetworkInterfaceName);
TCPDirect::Stack stack(attributes);
// Initialize the listen port(s) for the current reactor.
// Each thread should use a separate listen port(s).
// The stack will process IO events only for the given port(s).
Engine::instance()->addListenPort(CurrentThreadListenPort, &stack);
// Initialize sessions.
Sessions sessions;
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
sessions.push_back(createSession());
sessions.back()->logonAsAcceptor();
}
bool finished = false;
// Useful work and dispatching events...
while (!finished)
stack.dispatchEvents();
// Disconnect sessions.
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
OnixS::Threading::SharedFuture<void> logoutTask = sessions[sessionCounter]->logoutAsync("The session is disconnected");
while (!logoutTask.isReady())
stack.dispatchEvents();
}
// It needs to remove the current thread listen ports and associated reactor.
// It is necessary because access to the stack should only be done from the same thread.
Engine::instance()->removeListenPort(CurrentThreadListenPort);
// Shutdown sessions.
for (size_t sessionCounter = 0; sessionCounter < sessionNumber_; ++sessionCounter)
{
sessions[sessionCounter]->shutdown();
}
while (!stack.isQuiescent())
stack.dispatchEvents();
}
};
ifaces.push_back(ListenAddress);
// Initialize engine in the main thread.
EngineSettings engineSettings;
// Set listen local network interface, which corresponds to Solarflare's NIC.
engineSettings.localNetworkInterface(ifaces);
Engine::init(engineSettings);
// Initialize and start the required amount of threads.
size_t SessionNumber = 25;
size_t SessionPerThread = 5;
size_t ThreadNumber = SessionNumber / SessionPerThread;
typedef std::vector<AcceptorSessionReactorThreadPtr> ReactorThreads;
ReactorThreads reactorThreads;
for(size_t reactorThreadCounter = 0; reactorThreadCounter < ThreadNumber; ++reactorThreadCounter)
{
reactorThreads.push_back(new AcceptorSessionReactorThread(SessionPerThread));
reactorThreads.back()->start();
}
// Waiting for threads completion.
for(size_t reactorThreadCounter = 0; reactorThreadCounter < ThreadNumber; ++reactorThreadCounter)
{
reactorThreads[reactorThreadCounter]->join();
}
// Shutdown the FIX Engine.
Engine::shutdown();
Note
It needs explicitly configure the listen local network interface, which corresponds to Solarflare's NIC. This can be done by the "OnixS::FIX::EngineSettings::localNetworkInterface() setting.

See Also