• Version 1.16.0
Show / Hide Table of Contents

Throughput Many Sessions Benchmark Sample Project

This sample demonstrates how to measure the throughput on sending and receiving sides with many sessions.

© Onix Solutions

Source code


using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using OnixS.Fix;

namespace Benchmark.ThroughputManySessions
{
    using static Console;

    internal static class ThroughputManySessions
    {
        class InitiatorSessionInfo
        {
            public readonly ManualResetEventSlim TestingIsFinished = new(false);
            public readonly ManualResetEventSlim WarmupIsFinished = new(false);
        }

        class AcceptorSessionInfo
        {
            public long ReceiveStart { get; set; }
            public long ReceiveFinish { get; set; }
        }

        private static Dictionary<Session, InitiatorSessionInfo> initiators = new();
        private static Dictionary<Session, AcceptorSessionInfo> acceptors = new();

#if DEBUG
        private const int WarmupNumberOfMessages = 100;
        private const int NumberOfMessages = 100;
        private const int NumberOfSessions = 10;
#else
        private const int WarmupNumberOfMessages = 3000;
        private const int NumberOfMessages = 10000;
        private const int NumberOfSessions = 500;
#endif
        /// <summary>
        ///     The main entry point for the application.
        /// </summary>
        [STAThread]
        private static void Main()
        {
            WriteLine(GetProductName());

            try
            {
                Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.RealTime;
            }
            catch (Exception ex)
            {
                WriteLine("\nWarning: unable to set real-time priority to this process - " + ex.Message);

                WriteLine(RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
                              ? "To enable process priority control run this benchmark under administrator"
                              : "To enable process priority control run this benchmark using 'sudo dotnet Throughput.dll'");
                WriteLine();
            }

            try
            {
                var settings = new EngineSettings()
                {
                    Dictionary = "ThroughputFixDictionary.xml",
                    LicenseStore = GetLicenseStoreFolder(),
                    LogBeforeSending = false,
                    LogInboundMessages = false,
                    LogOutboundMessages = false,
                    ReceiveSpinningTimeout = 10000,
                    ReceiveBufferSize = 65536,
                    ResendingQueueSize = 0,
                    SendBufferSize = 65536,
                    TcpNoDelay = false,
                    ValidateRepeatingGroupEntryCount = false,
                    ValidateCheckSum = false
                };
                settings.ListenPorts.Add(10450);

                Engine.Init(settings);

                var rawMessage =
                    "8=FIX.4.2\u00019=3\u000135=D\u000149=OnixS\u000156=CME\u000134=01\u000152=20120709-10:10:54\u0001" +
                    "11=90001008\u000121=1\u000155=ABC\u000154=1\u000138=100\u000140=1\u000160=20120709-10:10:54\u000110=000\u0001";

                var dictionary = Engine.Instance.MessageInfoDictionaryManager["ThroughputDictionaryId"];

                var message = Message.Parse(rawMessage, dictionary, MessageValidationFlags.StrictCreated);
                message.Validate();
                var order = new SerializedMessage(message);

                const string SenderCompId = "Acceptor";
                const string TargetCompId = "Initiator";

                for (int i = 0; i < NumberOfSessions; i++)
                {
                    Session acceptor = new(SenderCompId + i, TargetCompId + i, dictionary, false, SessionStorageType.MemoryBasedStorage);
                    acceptor.InboundApplicationMessage += AcceptorInboundApplicationMsgEvent;
                    acceptor.Error += OnError;
                    acceptor.Warning += OnWarning;
                    acceptor.ReuseInboundMessage = true;
                    acceptor.ReuseEventArguments = true;
                    acceptor.MessageMode = MessageMode.FlatMessage; // need to set this before ReuseInboundMessage to decrease memory consumption
                    acceptor.MemoryPoolSettings.NumberOfGroupInstances = 0;
                    acceptor.MemoryPoolSettings.NumberOfGroups = 0;
                    acceptor.MemoryPoolSettings.LengthOfGroupInstanceArray = 0;
                    acceptor.ReuseInboundMessage = true;
                    acceptor.ReuseEventArguments = true;
                    acceptor.ResendingQueueSize = 0;

                    acceptor.LogonAsAcceptor();
                    acceptors.Add(acceptor, new AcceptorSessionInfo());

                    Session initiator = new(TargetCompId + i, SenderCompId + i, dictionary, false, SessionStorageType.MemoryBasedStorage);
                    initiator.InboundApplicationMessage += InitiatorInboundApplicationMsgEvent;
                    initiator.Error += OnError;
                    initiator.Warning += OnWarning;
                    initiator.MessageGrouping = 200;
                    initiator.MessageMode = MessageMode.FlatMessage; // need to set this before ReuseInboundMessage to decrease memory consumption
                    initiator.MemoryPoolSettings.NumberOfGroupInstances = 0;
                    initiator.MemoryPoolSettings.NumberOfGroups = 0;
                    initiator.MemoryPoolSettings.LengthOfGroupInstanceArray = 0;
                    initiator.ReuseInboundMessage = true;
                    initiator.ReuseEventArguments = true;
                    initiator.ResendingQueueSize = 0;

                    initiator.LogonAsInitiator(IPAddress.Loopback, settings.ListenPorts[0], 30);
                    initiators.Add(initiator, new InitiatorSessionInfo());
                }

                for (var i = 0; i < WarmupNumberOfMessages; ++i)
                    foreach (var item in initiators)
                        item.Key.Send(order);

                var email = new Message(OnixS.Fix.Fix42.MsgType.News, dictionary);
                email.Set(OnixS.Fix.Fix42.Tag.Headline, "Warmup is finished");

                foreach (var item in initiators)
                    item.Key.Send(email);

                foreach (var item in initiators)
                    item.Value.WarmupIsFinished.Wait();


                var sendStart = Stopwatch.GetTimestamp();

                for (var i = 0; i < NumberOfMessages; ++i)
                    foreach (var item in initiators)
                        item.Key.Send(order);

                var sendFinish = Stopwatch.GetTimestamp();

                email.Set(OnixS.Fix.Fix42.Tag.Headline, "Testing is finished");
                foreach (var item in initiators)
                    item.Key.Send(email);

                foreach (var item in initiators)
                    item.Value.TestingIsFinished.Wait();

                foreach (var item in initiators)
                {
                    item.Key.Logout();
                    item.Key.Dispose();
                }

                foreach (var item in acceptors)
                {
                    item.Key.Logout();
                    item.Key.Dispose();
                }

                var sendMessagesPerSec = Stopwatch.Frequency * NumberOfSessions * NumberOfMessages / (sendFinish - sendStart);

                long receiveStart = acceptors.Values.Min(info => info.ReceiveStart);
                long receiveFinish = acceptors.Values.Max(info => info.ReceiveFinish);

                var receiveMessagesPerSec = Stopwatch.Frequency * NumberOfSessions * NumberOfMessages / (receiveFinish - receiveStart);

                WriteLine($"Throughput on the sending side: {string.Format("{0:n0}", sendMessagesPerSec)} msg/sec");
                WriteLine($"Throughput on the receiving side: {string.Format("{0:n0}", receiveMessagesPerSec)} msg/sec");

                Engine.Shutdown();
            }
            catch (Exception ex)
            {
                WriteLine("Exception: " + ex);
            }
        }

        private static void InitiatorInboundApplicationMsgEvent(object sender, InboundMessageEventArgs args)
        {
            IMessage message = args.FlatMessage;

            if (message.CompareType(OnixS.Fix.Fix42.MsgType.News))
            {
                if (message.Get(OnixS.Fix.Fix42.Tag.Headline) == "Warmup is finished")
                {
                    initiators[(Session)sender].WarmupIsFinished.Set();
                }
                else if (message.Get(OnixS.Fix.Fix42.Tag.Headline) == "Testing is finished")
                {
                    initiators[(Session)sender].TestingIsFinished.Set();
                }
            }

        }

        private static void AcceptorInboundApplicationMsgEvent(object sender, InboundMessageEventArgs args)
        {
            var info = acceptors[(Session)sender];

            IMessage message = args.FlatMessage;

            if (message.CompareType(OnixS.Fix.Fix42.MsgType.News))
            {
                if (message.Get(OnixS.Fix.Fix42.Tag.Headline) == "Warmup is finished")
                {
                    ((Session)sender).Send(message);
                    info.ReceiveStart = Stopwatch.GetTimestamp();
                }
                else if (message.Get(OnixS.Fix.Fix42.Tag.Headline) == "Testing is finished")
                {
                    info.ReceiveFinish = Stopwatch.GetTimestamp();
                    ((Session)sender).Send(message);
                }
            }
        }

        private static void OnError(object sender, SessionErrorEventArgs e)
        {
            WriteLine("Session Error: " + e);
        }

        private static void OnWarning(object sender, SessionWarningEventArgs e)
        {
            WriteLine("Session Warning: " + e);
        }

        private static string GetLicenseStoreFolder()
        {
            string path = Path.Join(AppContext.BaseDirectory, "../../../../../../license");

            if (Directory.Exists(path))
                return path;

            // We assume to run after `dotnet publish` using the default path.
            return Path.Join(AppContext.BaseDirectory, "../../../../../../../license");
        }

        private static string GetProductName()
        {
            var assemblyName = typeof(Engine).Assembly.GetName();
            return $"Throughput Benchmark Sample\n\n{assemblyName.Name} version {assemblyName.Version}\n";
        }
    }
}

In this article
Back to top Copyright © Onix Solutions.
Generated by DocFX