Async Processing Sample
Source code
using System;
using System.Collections.Concurrent;
using OnixS.Fix;
using System.Globalization;
using System.IO;
using OnixS.Fix.Fix44;
using System.Net;
using System.Threading;
using NLog.Extensions.Logging;
namespace AsyncProcessing
internal static class AsyncProcessing
private static string GetLicenseStoreFolder()
string path = Path.Join(AppContext.BaseDirectory, "../../../../../license");
if (Directory.Exists(path))
return path;
// expecting it run after dotnet publish using default paths
return Path.Join(AppContext.BaseDirectory, "../../../../../../license");
const ProtocolVersion FixVersion = ProtocolVersion.Fix42;
const int ListenPort = 4500;
const string CounterpartyHost = "localhost";
const int CounterpartyPort = ListenPort;
const int HeartBtInt = 30;
const string SenderCompId = "Initiator";
const string TargetCompId = "Acceptor";
const int MsgCount = 1000;
const int PreferredPoolSize = 32;
static void WaitUntilAnyKey(string waitText)
Console.Write("Press any key " + waitText);
static Message CreateExecutionReportPattern()
Message execReport = new Message(MsgType.ExecutionReport, FixVersion);
execReport.Set(Tag.OrderID, "ClOrdID")
.Set(Tag.Symbol, "IBM")
.Set(Tag.Side, Side.Buy)
.Set(Tag.OrderQty, 1000)
.Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);
return execReport;
class Acceptor
private readonly AutoResetEvent processingCompletion = new AutoResetEvent(false);
private int orderCancelRequestNumber = 0;
private readonly Session session = new Session(TargetCompId, SenderCompId, FixVersion);
public Acceptor()
Session.InboundApplicationMessage += OnInboundApplicationMessage;
public Session Session { get { return session; } }
public void WaitProcessingCompletion() { processingCompletion.WaitOne(); }
void OnInboundApplicationMessage(Object sender, InboundMessageEventArgs e)
Console.WriteLine("\n[" + TargetCompId + "] - Synchronous processing of the message: " + e.Message.ToString('|'));
if (e.Message.Type == MsgType.NewOrderSingle)
Message execReport = CreateExecutionReportPattern();
execReport.Set(Tag.OrderID, e.Message.Get(Tag.ClOrdID))
.Set(Tag.Symbol, e.Message.Get(Tag.Symbol))
.Set(Tag.Side, e.Message.Get(Tag.Side))
.Set(Tag.OrderQty, e.Message.Get(Tag.OrderQty))
.Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);
else if (e.Message.Type == MsgType.OrderCancelRequest)
if (++orderCancelRequestNumber == MsgCount)
class MessagePool
private readonly ConcurrentBag<Message> messages = new();
public Message GetObject(Message sourceMsg)
Message item;
if (messages.TryTake(out item))
return item;
return new Message(sourceMsg);
public void PutObject(Message msg)
class Initiator
private readonly MessagePool pool = new MessagePool();
private readonly BlockingCollection<Message> queue = new BlockingCollection<Message>();
private readonly Thread thread = null;
private readonly Session session = new Session(SenderCompId, TargetCompId, FixVersion);
public Initiator()
// Prepare a pool of Message object in advance and perform all necessary memory allocations.
Session.InboundApplicationMessage += OnInboundApplicationMessage;
thread = new Thread(ThreadProc);
public Session Session { get { return session; } }
public void JoinThread()
public void StopThread()
void WarmUpMsgPool()
Message incomingMsgPattern = CreateExecutionReportPattern();
for (int counter = 0; counter < PreferredPoolSize; ++counter)
Message poolMsg = pool.GetObject(incomingMsgPattern);
Message msg;
while (queue.TryTake(out msg))
void OnInboundApplicationMessage(Object sender, InboundMessageEventArgs e)
Console.WriteLine("\n[" + SenderCompId + "] - Inbound application-level message: " + e.Message.ToString('|'));
if (e.Message.Type == MsgType.ExecutionReport)
// When an inbound callback is called take an object from the pool,
// and clone the incoming Message object to the Message object taken from the pool (no managed memory allocation just copying).
Message msg = pool.GetObject(e.Message);
// Enqueue the copied Message object to a shared queue.
void ThreadProc()
// In the processing thread, get Message objects from the shared queue.
foreach (Message msg in queue.GetConsumingEnumerable())
// Process the Message object asynchronously.
Console.WriteLine("\n[" + SenderCompId + "] - Asynchronous processing of the message: " + msg.ToString('|'));
Message cancelRequest = new Message(MsgType.OrderCancelRequest, FixVersion);
cancelRequest.Set(Tag.OrigClOrdID, msg.Get(Tag.OrderID))
.Set(Tag.Symbol, msg.Get(Tag.Symbol))
.Set(Tag.Side, msg.Get(Tag.Side))
.Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);
// Return the processed object to the pool.
/// <summary>
/// The main entry point for the application.
/// </summary>
private static int Main()
Console.WriteLine("Asynchronous Processing Sample.");
EngineSettings settings = new EngineSettings
ListenPorts = { ListenPort },
LicenseStore = GetLicenseStoreFolder(),
LoggerProvider = new NLogLoggerProvider()
Acceptor acceptor = new Acceptor();
Initiator initiator = new Initiator();
initiator.Session.LogonAsInitiator(CounterpartyHost, CounterpartyPort, HeartBtInt);
WaitUntilAnyKey("to start sending orders and process response messages asynchronously.");
Message order = new Message(MsgType.NewOrderSingle, FixVersion);
order.Set(Tag.Symbol, "IBM")
.Set(Tag.Side, Side.Buy)
.Set(Tag.OrderQty, 1000)
.Set(Tag.OrdType, OrdType.Market);
for (int counter = 1; counter <= MsgCount; ++counter)
order.Set(Tag.ClOrdID, counter)
.Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);
WaitUntilAnyKey("to close the sample.");
initiator.Session.Logout("The session is disconnected by Initiator");
Console.WriteLine("\nClosing sample...\n");
catch (Exception ex)
Console.WriteLine("Exception: " + ex);
return 1;
// From
// NET Application running on Mono / Linux are required to stop threads / timers before entering application shutdown phase.
// Failing to do this will cause unhandled exceptions and segmentation faults, and other unpredictable behavior.
NLog.LogManager.Shutdown(); // Flush and close down internal threads and timers
return 0;