• Version 1.16.0
Show / Hide Table of Contents

Advanced Async Processing Sample Project

This sample demonstrates how to process incoming messages in a separate thread.

Initiator sends 'NewOrderSingle' messages to Acceptor, and the Acceptor responds with an 'ExecutionReport' message. All incoming Initiator messages are stored in a message pool. The helper thread takes them from the pool, generates an 'OrderCancelRequest' message, and sends it to Acceptor. When all messages are processed, the sample will be closed.

© Onix Solutions

Source code


using System;
using System.Collections.Concurrent;
using OnixS.Fix;
using System.Globalization;
using System.IO;
using OnixS.Fix.Fix44;
using System.Net;
using System.Threading;
using NLog.Extensions.Logging;

namespace AsyncProcessing
{
    internal static class AsyncProcessing
    {
        private static string GetLicenseStoreFolder()
        {
            string path = Path.Join(AppContext.BaseDirectory, "../../../../../license");

            if (Directory.Exists(path))
                return path;

            // expecting it run after dotnet publish using default paths
            return Path.Join(AppContext.BaseDirectory, "../../../../../../license");
        }

        const ProtocolVersion FixVersion = ProtocolVersion.Fix42;
        const int ListenPort = 4500;
        const string CounterpartyHost = "localhost";
        const int CounterpartyPort = ListenPort;
        const int HeartBtInt = 30;
        const string SenderCompId = "Initiator";
        const string TargetCompId = "Acceptor";
        const int MsgCount = 1000;
        const int PreferredPoolSize = 32;

        static void WaitUntilAnyKey(string waitText)
        {
            Console.Write("Press any key " + waitText);

            Console.ReadKey(true);
        }

        static Message CreateExecutionReportPattern()
        {
            Message execReport = new Message(MsgType.ExecutionReport, FixVersion);
            execReport.Set(Tag.OrderID, "ClOrdID")
                      .Set(Tag.Symbol, "IBM")
                      .Set(Tag.Side, Side.Buy)
                      .Set(Tag.OrderQty, 1000)
                      .Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);

            return execReport;
        }

        class Acceptor
        {
            private readonly AutoResetEvent processingCompletion = new AutoResetEvent(false);
            private int orderCancelRequestNumber = 0;
            private readonly Session session = new Session(TargetCompId, SenderCompId, FixVersion);

            public Acceptor()
            {
                Session.InboundApplicationMessage += OnInboundApplicationMessage;
            }

            public Session Session { get { return session; } }

            public void WaitProcessingCompletion() { processingCompletion.WaitOne(); }

            void OnInboundApplicationMessage(Object sender, InboundMessageEventArgs e)
            {
                Console.WriteLine("\n[" + TargetCompId + "] - Synchronous processing of the message: " + e.Message.ToString('|'));

                if (e.Message.Type == MsgType.NewOrderSingle)
                {
                    Message execReport = CreateExecutionReportPattern();

                    execReport.Set(Tag.OrderID, e.Message.Get(Tag.ClOrdID))
                              .Set(Tag.Symbol, e.Message.Get(Tag.Symbol))
                              .Set(Tag.Side, e.Message.Get(Tag.Side))
                              .Set(Tag.OrderQty, e.Message.Get(Tag.OrderQty))
                              .Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);

                    ((Session)sender).Send(execReport);
                }
                else if (e.Message.Type == MsgType.OrderCancelRequest)
                {
                    if (++orderCancelRequestNumber == MsgCount)
                        processingCompletion.Set();
                }
            }
        }

        class MessagePool
        {
            private readonly ConcurrentBag<Message> messages = new();

            public Message GetObject(Message sourceMsg)
            {
                Message item;

                if (messages.TryTake(out item))
                {
                    item.Init(sourceMsg);
                    return item;
                }

                return new Message(sourceMsg);
            }

            public void PutObject(Message msg)
            {
                messages.Add(msg);
            }
        }

        class Initiator
        {
            private readonly MessagePool pool = new MessagePool();
            private readonly BlockingCollection<Message> queue = new BlockingCollection<Message>();
            private readonly Thread thread = null;
            private readonly Session session = new Session(SenderCompId, TargetCompId, FixVersion);

            public Initiator()
            {
                // Prepare a pool of Message object in advance and perform all necessary memory allocations.
                WarmUpMsgPool();

                Session.InboundApplicationMessage += OnInboundApplicationMessage;
                thread = new Thread(ThreadProc);
                thread.Start();
            }

            public Session Session { get { return session; } }

            public void JoinThread()
            {
                thread.Join();
            }

            public void StopThread()
            {
                queue.CompleteAdding();
            }

            void WarmUpMsgPool()
            {
                Message incomingMsgPattern = CreateExecutionReportPattern();

                for (int counter = 0; counter < PreferredPoolSize; ++counter)
                {
                    Message poolMsg = pool.GetObject(incomingMsgPattern);
                    queue.Add(poolMsg);
                }

                Message msg;
                while (queue.TryTake(out msg))
                    pool.PutObject(msg);
            }

            void OnInboundApplicationMessage(Object sender, InboundMessageEventArgs e)
            {
                Console.WriteLine("\n[" + SenderCompId + "] - Inbound application-level message: " + e.Message.ToString('|'));

                if (e.Message.Type == MsgType.ExecutionReport)
                {
                    // When an inbound callback is called take an object from the pool,
                    // and clone the incoming Message object to the Message object taken from the pool (no managed memory allocation just copying).
                    Message msg = pool.GetObject(e.Message);
                    // Enqueue the copied Message object to a shared queue.
                    queue.Add(msg);
                }
            }

            void ThreadProc()
            {
                // In the processing thread, get Message objects from the shared queue.
                foreach (Message msg in queue.GetConsumingEnumerable())
                {
                    // Process the Message object asynchronously.
                    Console.WriteLine("\n[" + SenderCompId + "] - Asynchronous processing of the message: " + msg.ToString('|'));

                    Message cancelRequest = new Message(MsgType.OrderCancelRequest, FixVersion);

                    cancelRequest.Set(Tag.OrigClOrdID, msg.Get(Tag.OrderID))
                                 .Set(Tag.Symbol, msg.Get(Tag.Symbol))
                                 .Set(Tag.Side, msg.Get(Tag.Side))
                                 .Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);

                    Session.Send(cancelRequest);

                    // Return the processed object to the pool.
                    pool.PutObject(msg);
                }
            }
        }

        /// <summary>
        /// The main entry point for the application.
        /// </summary>
        private static int Main()
        {
            try
            {
                Console.WriteLine("Asynchronous Processing Sample.");

                EngineSettings settings = new EngineSettings
                {
                    ListenPorts = { ListenPort },
                    LicenseStore = GetLicenseStoreFolder(),
                    LoggerProvider = new NLogLoggerProvider()
                };

                Engine.Init(settings);

                Acceptor acceptor = new Acceptor();
                acceptor.Session.LogonAsAcceptor();

                Initiator initiator = new Initiator();
                initiator.Session.LogonAsInitiator(CounterpartyHost, CounterpartyPort, HeartBtInt);

                WaitUntilAnyKey("to start sending orders and process response messages asynchronously.");

                Message order = new Message(MsgType.NewOrderSingle, FixVersion);

                order.Set(Tag.Symbol, "IBM")
                     .Set(Tag.Side, Side.Buy)
                     .Set(Tag.OrderQty, 1000)
                     .Set(Tag.OrdType, OrdType.Market);

                for (int counter = 1; counter <= MsgCount; ++counter)
                {
                    order.Set(Tag.ClOrdID, counter)
                         .Set(Tag.TransactTime, DateTime.UtcNow, TimestampFormat.YYYYMMDDHHMMSSMsec);
                    initiator.Session.Send(order);
                }

                acceptor.WaitProcessingCompletion();

                WaitUntilAnyKey("to close the sample.");

                initiator.Session.Logout("The session is disconnected by Initiator");
                acceptor.Session.Logout();

                initiator.StopThread();
                initiator.JoinThread();

                Console.WriteLine("\nClosing sample...\n");

                Engine.Shutdown();
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception: " + ex);

                return 1;
            }
            finally
            {
                // From https://github.com/NLog/NLog/wiki/Tutorial:
                // NET Application running on Mono / Linux are required to stop threads / timers before entering application shutdown phase.
                // Failing to do this will cause unhandled exceptions and segmentation faults, and other unpredictable behavior.
                NLog.LogManager.Shutdown(); // Flush and close down internal threads and timers
            }

            return 0;
        }
    }
}

In this article
Back to top Copyright © Onix Solutions.
Generated by DocFX