• Version 1.16.0
Show / Hide Table of Contents

Throughput Benchmark Sample Project

This sample demonstrates how to measure the throughput on sending and receiving sides.

© Onix Solutions

Source code


using OnixS.Fix;
using OnixS.Fix.Fix42;
using System;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;

namespace Benchmark.Throughput
{
    using static Console;

    internal static class Throughput
    {
        private static readonly ManualResetEventSlim testingIsFinished = new(false);
        private static readonly ManualResetEventSlim warmupIsFinished = new(false);

        private static long receiveStart;
        private static long receiveFinish;

#if DEBUG
        private const int WarmupNumberOfMessages = 1000;
        private const int NumberOfMessages = 1000;
#else
        private const int WarmupNumberOfMessages = 300000;
        private const int NumberOfMessages = 1000000;
#endif
        enum Mode
        {
            Localhost,
            Initiator,
            Acceptor
        }

        /// <summary>
        ///     The main entry point for the application.
        /// </summary>
        [STAThread]
        private static void Main(string[] argv)
        {
            WriteLine(GetProductName());

            try
            {
                Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.RealTime;
            }
            catch (Exception ex)
            {
                WriteLine("\nWarning: unable to set real-time priority to this process - " + ex.Message);

                WriteLine(RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
                              ? "To enable process priority control run this benchmark under administrator"
                              : "To enable process priority control run this benchmark using 'sudo dotnet Throughput.dll'");
                WriteLine();
            }

            var mode = Mode.Localhost;
            var host = "127.0.0.1";

            try
            {
                if (argv.Length > 0)
                {
                    mode = (Mode)Enum.Parse(typeof(Mode), argv[0]);

                    if (mode == Mode.Initiator)
                    {
                        if (argv.Length > 1)
                        {
                            host = argv[1];
                        }
                        else
                        {
                            throw new Exception("The Initiator needs the host name to connect to as the second argument");
                        }
                    }
                }

                var settings = new EngineSettings()
                {
                    Dictionary = "ThroughputFixDictionary.xml",
                    LicenseStore = GetLicenseStoreFolder(),
                    LogBeforeSending = false,
                    LogInboundMessages = false,
                    LogOutboundMessages = false,
                    ReceiveSpinningTimeout = 10000,
                    ReceiveBufferSize = 10 * 1024 * 1024,
                    ResendingQueueSize = 0,
                    SendBufferSize = 10 * 1024 * 1024,
                    TcpNoDelay = false,
                    ValidateRepeatingGroupEntryCount = false,
                    ValidateCheckSum = false
                };

                settings.LicenseStores.Add(".");

                const int Port = 10450;

                if (mode != Mode.Initiator)
                {
                    settings.ListenPorts.Add(Port);
                }

                Engine.Init(settings);

                IMessageInfoDictionary dictionary = Engine.Instance.MessageInfoDictionaryManager["ThroughputDictionaryId"];

                const string SenderCompId = "Acceptor";
                const string TargetCompId = "Initiator";

                Session acceptor = null;

                if (mode != Mode.Initiator)
                {
                    acceptor = new(SenderCompId, TargetCompId, dictionary, false, SessionStorageType.MemoryBasedStorage);

                    acceptor.InboundApplicationMessage += InboundApplicationMsgEvent;
                    acceptor.Error += OnError;
                    acceptor.Warning += OnWarning;
                    acceptor.ReuseInboundMessage = true;
                    acceptor.ReuseEventArguments = true;
                    acceptor.ReceivingThreadAffinity = [1];
                    acceptor.SendingThreadAffinity = [2];

                    WriteLine($"Acceptor is waiting for the incoming connection on port {Port}..");

                    acceptor.LogonAsAcceptor();

                }

                Session initiator = null;

                if (mode != Mode.Acceptor)
                {
                    initiator = new(TargetCompId, SenderCompId, dictionary, false, SessionStorageType.MemoryBasedStorage);

                    initiator.Error += OnError;
                    initiator.Warning += OnWarning;
                    initiator.MessageGrouping = 200;
                    initiator.ReuseInboundMessage = true;
                    initiator.ReceivingThreadAffinity = [3];
                    initiator.SendingThreadAffinity = [4];

                    initiator.LogonAsInitiator(host, Port, 30);

                    SerializedMessage order = CreateOrder(dictionary);

                    for (var i = 0; i < WarmupNumberOfMessages; ++i)
                        initiator.Send(order);

                    var email = new Message(MsgType.News, dictionary);
                    email.Set(Tag.Headline, "Warmup is finished");
                    initiator.Send(email);

                    if (mode == Mode.Localhost)
                        warmupIsFinished.Wait();

                    var sendStart = Stopwatch.GetTimestamp();

                    for (var i = 0; i < NumberOfMessages; ++i)
                        initiator.Send(order);

                    var sendFinish = Stopwatch.GetTimestamp();

                    email.Set(Tag.Headline, "Testing is finished");
                    initiator.Send(email);

                    var sendMessagesPerSec = Stopwatch.Frequency * NumberOfMessages / (sendFinish - sendStart);
                    WriteLine($"Throughput on the sending side: {string.Format("{0:n0}", sendMessagesPerSec)} msg/sec");
                }

                if (mode != Mode.Initiator) {
                    testingIsFinished.Wait();
                    var receiveMessagesPerSec = Stopwatch.Frequency * NumberOfMessages / (receiveFinish - receiveStart);
                    WriteLine($"Throughput on the receiving side: {string.Format("{0:n0}", receiveMessagesPerSec)} msg/sec");
                }

                acceptor?.Logout();
                initiator?.Logout();

                Engine.Shutdown();
            }
            catch (Exception ex)
            {
                WriteLine("Exception: " + ex);
            }
        }

        private static SerializedMessage CreateOrder(IMessageInfoDictionary dictionary)
        {
            var rawMessage =
                "8=FIX.4.2\u00019=3\u000135=D\u000149=OnixS\u000156=CME\u000134=01\u000152=20120709-10:10:54\u0001" +
                "11=90001008\u000121=1\u000155=ABC\u000154=1\u000138=100\u000140=1\u000160=20120709-10:10:54\u000110=000\u0001";


            var message = Message.Parse(rawMessage, dictionary, MessageValidationFlags.StrictCreated);
            message.Validate();
            var order = new SerializedMessage(message);
            return order;
        }

        private static void InboundApplicationMsgEvent(object sender, InboundMessageEventArgs args)
        {
            Message message = args.Message;

            if (message.CompareType(MsgType.News))
            {
                string headline = message[Tag.Headline];

                if (headline == "Warmup is finished")
                {
                    warmupIsFinished.Set();
                    receiveStart = Stopwatch.GetTimestamp();
                }
                else if (headline == "Testing is finished")
                {
                    receiveFinish = Stopwatch.GetTimestamp();
                    testingIsFinished.Set();
                }
            }
        }

        private static void OnError(object sender, SessionErrorEventArgs e)
        {
            WriteLine("Session Error: " + e);
        }

        private static void OnWarning(object sender, SessionWarningEventArgs e)
        {
            WriteLine("Session Warning: " + e);
        }

        private static string GetLicenseStoreFolder()
        {
            string path = Path.Join(AppContext.BaseDirectory, "../../../../../../license");

            if (Directory.Exists(path))
                return path;

            // We assume to run after `dotnet publish` using the default path.
            return Path.Join(AppContext.BaseDirectory, "../../../../../../../license");
        }

        private static string GetProductName()
        {
            var assemblyName = typeof(Engine).Assembly.GetName();
            return $"Throughput Benchmark Sample\n\n{assemblyName.Name} version {assemblyName.Version}\n";
        }
    }
}

In this article
Back to top Copyright © Onix Solutions.
Generated by DocFX