Async Processing Sample
Source code
using System;
using System.Collections.Concurrent;
using OnixS.Fix;
using System.Globalization;
using System.IO;
using OnixS.Fix.Fix44;
using System.Net;
using System.Threading;
using NLog.Extensions.Logging;
namespace AsyncProcessing
{
internal static class AsyncProcessing
{
private static string GetLicenseStoreFolder()
{
string path = Path.Join(AppContext.BaseDirectory, "../../../../../license");
if (Directory.Exists(path))
return path;
// expecting it run after dotnet publish using default paths
return Path.Join(AppContext.BaseDirectory, "../../../../../../license");
}
const ProtocolVersion FixVersion = ProtocolVersion.Fix42;
const int ListenPort = 4500;
const string CounterpartyHost = "localhost";
const int CounterpartyPort = ListenPort;
const int HeartBtInt = 30;
const string SenderCompId = "Initiator";
const string TargetCompId = "Acceptor";
const int MsgCount = 1000;
const int PreferredPoolSize = 32;
static void WaitUntilAnyKey(string waitText)
{
Console.Write("Press any key " + waitText);
Console.ReadKey(true);
}
static Message CreateExecutionReportPattern()
{
Message execReport = new Message(MsgType.ExecutionReport, FixVersion);
execReport.Set(Tag.OrderID, "ClOrdID")
.Set(Tag.Symbol, "IBM")
.Set(Tag.Side, Side.Buy)
.Set(Tag.OrderQty, 1000)
.Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);
return execReport;
}
class Acceptor
{
private readonly AutoResetEvent processingCompletion = new AutoResetEvent(false);
private int orderCancelRequestNumber = 0;
private readonly Session session = new Session(TargetCompId, SenderCompId, FixVersion);
public Acceptor()
{
Session.InboundApplicationMessage += OnInboundApplicationMessage;
}
public Session Session { get { return session; } }
public void WaitProcessingCompletion() { processingCompletion.WaitOne(); }
void OnInboundApplicationMessage(Object sender, InboundMessageEventArgs e)
{
Console.WriteLine("\n[" + TargetCompId + "] - Synchronous processing of the message: " + e.Message.ToString('|'));
if (e.Message.Type == MsgType.NewOrderSingle)
{
Message execReport = CreateExecutionReportPattern();
execReport.Set(Tag.OrderID, e.Message.Get(Tag.ClOrdID))
.Set(Tag.Symbol, e.Message.Get(Tag.Symbol))
.Set(Tag.Side, e.Message.Get(Tag.Side))
.Set(Tag.OrderQty, e.Message.Get(Tag.OrderQty))
.Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);
((Session)sender).Send(execReport);
}
else if (e.Message.Type == MsgType.OrderCancelRequest)
{
if (++orderCancelRequestNumber == MsgCount)
processingCompletion.Set();
}
}
}
class MessagePool
{
private readonly ConcurrentBag<Message> messages = new();
public Message GetObject(Message sourceMsg)
{
Message item;
if (messages.TryTake(out item))
{
item.Init(sourceMsg);
return item;
}
return new Message(sourceMsg);
}
public void PutObject(Message msg)
{
messages.Add(msg);
}
}
class Initiator
{
private readonly MessagePool pool = new MessagePool();
private readonly BlockingCollection<Message> queue = new BlockingCollection<Message>();
private readonly Thread thread = null;
private readonly Session session = new Session(SenderCompId, TargetCompId, FixVersion);
public Initiator()
{
// Prepare a pool of Message object in advance and perform all necessary memory allocations.
WarmUpMsgPool();
Session.InboundApplicationMessage += OnInboundApplicationMessage;
thread = new Thread(ThreadProc);
thread.Start();
}
public Session Session { get { return session; } }
public void JoinThread()
{
thread.Join();
}
public void StopThread()
{
queue.CompleteAdding();
}
void WarmUpMsgPool()
{
Message incomingMsgPattern = CreateExecutionReportPattern();
for (int counter = 0; counter < PreferredPoolSize; ++counter)
{
Message poolMsg = pool.GetObject(incomingMsgPattern);
queue.Add(poolMsg);
}
Message msg;
while (queue.TryTake(out msg))
pool.PutObject(msg);
}
void OnInboundApplicationMessage(Object sender, InboundMessageEventArgs e)
{
Console.WriteLine("\n[" + SenderCompId + "] - Inbound application-level message: " + e.Message.ToString('|'));
if (e.Message.Type == MsgType.ExecutionReport)
{
// When an inbound callback is called take an object from the pool,
// and clone the incoming Message object to the Message object taken from the pool (no managed memory allocation just copying).
Message msg = pool.GetObject(e.Message);
// Enqueue the copied Message object to a shared queue.
queue.Add(msg);
}
}
void ThreadProc()
{
// In the processing thread, get Message objects from the shared queue.
foreach (Message msg in queue.GetConsumingEnumerable())
{
// Process the Message object asynchronously.
Console.WriteLine("\n[" + SenderCompId + "] - Asynchronous processing of the message: " + msg.ToString('|'));
Message cancelRequest = new Message(MsgType.OrderCancelRequest, FixVersion);
cancelRequest.Set(Tag.OrigClOrdID, msg.Get(Tag.OrderID))
.Set(Tag.Symbol, msg.Get(Tag.Symbol))
.Set(Tag.Side, msg.Get(Tag.Side))
.Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);
Session.Send(cancelRequest);
// Return the processed object to the pool.
pool.PutObject(msg);
}
}
}
/// <summary>
/// The main entry point for the application.
/// </summary>
private static int Main()
{
try
{
Console.WriteLine("Asynchronous Processing Sample.");
EngineSettings settings = new EngineSettings
{
ListenPorts = { ListenPort },
LicenseStore = GetLicenseStoreFolder(),
LoggerProvider = new NLogLoggerProvider()
};
Engine.Init(settings);
Acceptor acceptor = new Acceptor();
acceptor.Session.LogonAsAcceptor();
Initiator initiator = new Initiator();
initiator.Session.LogonAsInitiator(CounterpartyHost, CounterpartyPort, HeartBtInt);
WaitUntilAnyKey("to start sending orders and process response messages asynchronously.");
Message order = new Message(MsgType.NewOrderSingle, FixVersion);
order.Set(Tag.Symbol, "IBM")
.Set(Tag.Side, Side.Buy)
.Set(Tag.OrderQty, 1000)
.Set(Tag.OrdType, OrdType.Market);
for (int counter = 1; counter <= MsgCount; ++counter)
{
order.Set(Tag.ClOrdID, counter)
.Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);
initiator.Session.Send(order);
}
acceptor.WaitProcessingCompletion();
WaitUntilAnyKey("to close the sample.");
initiator.Session.Logout("The session is disconnected by Initiator");
acceptor.Session.Logout();
initiator.StopThread();
initiator.JoinThread();
Console.WriteLine("\nClosing sample...\n");
Engine.Shutdown();
}
catch (Exception ex)
{
Console.WriteLine("Exception: " + ex);
return 1;
}
finally
{
// From https://github.com/NLog/NLog/wiki/Tutorial:
// NET Application running on Mono / Linux are required to stop threads / timers before entering application shutdown phase.
// Failing to do this will cause unhandled exceptions and segmentation faults, and other unpredictable behavior.
NLog.LogManager.Shutdown(); // Flush and close down internal threads and timers
}
return 0;
}
}
}