Benchmark Throughput Sample
Source code
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using OnixS.Fix;
namespace Benchmark.ThroughputManySessions
{
using static Console;
internal static class ThroughputManySessions
{
class InitiatorSessionInfo
{
public readonly ManualResetEventSlim TestingIsFinished = new(false);
public readonly ManualResetEventSlim WarmupIsFinished = new(false);
}
class AcceptorSessionInfo
{
public long ReceiveStart { get; set; }
public long ReceiveFinish { get; set; }
}
private static Dictionary<Session, InitiatorSessionInfo> initiators = new();
private static Dictionary<Session, AcceptorSessionInfo> acceptors = new();
#if DEBUG
private const int WarmupNumberOfMessages = 100;
private const int NumberOfMessages = 100;
private const int NumberOfSessions = 10;
#else
private const int WarmupNumberOfMessages = 3000;
private const int NumberOfMessages = 10000;
private const int NumberOfSessions = 500;
#endif
/// <summary>
/// The main entry point for the application.
/// </summary>
[STAThread]
private static void Main()
{
WriteLine(GetProductName());
try
{
Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.RealTime;
}
catch (Exception ex)
{
WriteLine("\nWarning: unable to set real-time priority to this process - " + ex.Message);
WriteLine(RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
? "To enable process priority control run this benchmark under administrator"
: "To enable process priority control run this benchmark using 'sudo dotnet Throughput.dll'");
WriteLine();
}
try
{
var settings = new EngineSettings()
{
Dictionary = "ThroughputFixDictionary.xml",
LicenseStore = GetLicenseStoreFolder(),
LogBeforeSending = false,
LogInboundMessages = false,
LogOutboundMessages = false,
ReceiveSpinningTimeout = 10000,
ReceiveBufferSize = 65536,
ResendingQueueSize = 0,
SendBufferSize = 65536,
TcpNoDelay = false,
ValidateRepeatingGroupEntryCount = false,
ValidateCheckSum = false
};
settings.ListenPorts.Add(10450);
Engine.Init(settings);
var rawMessage =
"8=FIX.4.2\u00019=3\u000135=D\u000149=OnixS\u000156=CME\u000134=01\u000152=20120709-10:10:54\u0001" +
"11=90001008\u000121=1\u000155=ABC\u000154=1\u000138=100\u000140=1\u000160=20120709-10:10:54\u000110=000\u0001";
var dictionary = Engine.Instance.MessageInfoDictionaryManager["ThroughputDictionaryId"];
var message = Message.Parse(rawMessage, dictionary, MessageValidationFlags.StrictCreated);
message.Validate();
var order = new SerializedMessage(message);
const string SenderCompId = "Acceptor";
const string TargetCompId = "Initiator";
for (int i = 0; i < NumberOfSessions; i++)
{
Session acceptor = new(SenderCompId + i, TargetCompId + i, dictionary, false, SessionStorageType.MemoryBasedStorage);
acceptor.InboundApplicationMessage += AcceptorInboundApplicationMsgEvent;
acceptor.Error += OnError;
acceptor.Warning += OnWarning;
acceptor.ReuseInboundMessage = true;
acceptor.ReuseEventArguments = true;
acceptor.MessageMode = MessageMode.FlatMessage; // need to set this before ReuseInboundMessage to decrease memory consumption
acceptor.MemoryPoolSettings.NumberOfGroupInstances = 0;
acceptor.MemoryPoolSettings.NumberOfGroups = 0;
acceptor.MemoryPoolSettings.LengthOfGroupInstanceArray = 0;
acceptor.ReuseInboundMessage = true;
acceptor.ReuseEventArguments = true;
acceptor.ResendingQueueSize = 0;
acceptor.LogonAsAcceptor();
acceptors.Add(acceptor, new AcceptorSessionInfo());
Session initiator = new(TargetCompId + i, SenderCompId + i, dictionary, false, SessionStorageType.MemoryBasedStorage);
initiator.InboundApplicationMessage += InitiatorInboundApplicationMsgEvent;
initiator.Error += OnError;
initiator.Warning += OnWarning;
initiator.MessageGrouping = 200;
initiator.MessageMode = MessageMode.FlatMessage; // need to set this before ReuseInboundMessage to decrease memory consumption
initiator.MemoryPoolSettings.NumberOfGroupInstances = 0;
initiator.MemoryPoolSettings.NumberOfGroups = 0;
initiator.MemoryPoolSettings.LengthOfGroupInstanceArray = 0;
initiator.ReuseInboundMessage = true;
initiator.ReuseEventArguments = true;
initiator.ResendingQueueSize = 0;
initiator.LogonAsInitiator(IPAddress.Loopback, settings.ListenPorts[0], 30);
initiators.Add(initiator, new InitiatorSessionInfo());
}
for (var i = 0; i < WarmupNumberOfMessages; ++i)
foreach (var item in initiators)
item.Key.Send(order);
var email = new Message(OnixS.Fix.Fix42.MsgType.News, dictionary);
email.Set(OnixS.Fix.Fix42.Tag.Headline, "Warmup is finished");
foreach (var item in initiators)
item.Key.Send(email);
foreach (var item in initiators)
item.Value.WarmupIsFinished.Wait();
var sendStart = Stopwatch.GetTimestamp();
for (var i = 0; i < NumberOfMessages; ++i)
foreach (var item in initiators)
item.Key.Send(order);
var sendFinish = Stopwatch.GetTimestamp();
email.Set(OnixS.Fix.Fix42.Tag.Headline, "Testing is finished");
foreach (var item in initiators)
item.Key.Send(email);
foreach (var item in initiators)
item.Value.TestingIsFinished.Wait();
foreach (var item in initiators)
{
item.Key.Logout();
item.Key.Dispose();
}
foreach (var item in acceptors)
{
item.Key.Logout();
item.Key.Dispose();
}
var sendMessagesPerSec = Stopwatch.Frequency * NumberOfSessions * NumberOfMessages / (sendFinish - sendStart);
long receiveStart = acceptors.Values.Min(info => info.ReceiveStart);
long receiveFinish = acceptors.Values.Max(info => info.ReceiveFinish);
var receiveMessagesPerSec = Stopwatch.Frequency * NumberOfSessions * NumberOfMessages / (receiveFinish - receiveStart);
WriteLine($"Throughput on the sending side: {string.Format("{0:n0}", sendMessagesPerSec)} msg/sec");
WriteLine($"Throughput on the receiving side: {string.Format("{0:n0}", receiveMessagesPerSec)} msg/sec");
Engine.Shutdown();
}
catch (Exception ex)
{
WriteLine("Exception: " + ex);
}
}
private static void InitiatorInboundApplicationMsgEvent(object sender, InboundMessageEventArgs args)
{
IMessage message = args.FlatMessage;
if (message.CompareType(OnixS.Fix.Fix42.MsgType.News))
{
if (message.Get(OnixS.Fix.Fix42.Tag.Headline) == "Warmup is finished")
{
initiators[(Session)sender].WarmupIsFinished.Set();
}
else if (message.Get(OnixS.Fix.Fix42.Tag.Headline) == "Testing is finished")
{
initiators[(Session)sender].TestingIsFinished.Set();
}
}
}
private static void AcceptorInboundApplicationMsgEvent(object sender, InboundMessageEventArgs args)
{
var info = acceptors[(Session)sender];
IMessage message = args.FlatMessage;
if (message.CompareType(OnixS.Fix.Fix42.MsgType.News))
{
if (message.Get(OnixS.Fix.Fix42.Tag.Headline) == "Warmup is finished")
{
((Session)sender).Send(message);
info.ReceiveStart = Stopwatch.GetTimestamp();
}
else if (message.Get(OnixS.Fix.Fix42.Tag.Headline) == "Testing is finished")
{
info.ReceiveFinish = Stopwatch.GetTimestamp();
((Session)sender).Send(message);
}
}
}
private static void OnError(object sender, SessionErrorEventArgs e)
{
WriteLine("Session Error: " + e);
}
private static void OnWarning(object sender, SessionWarningEventArgs e)
{
WriteLine("Session Warning: " + e);
}
private static string GetLicenseStoreFolder()
{
string path = Path.Join(AppContext.BaseDirectory, "../../../../../../license");
if (Directory.Exists(path))
return path;
// We assume to run after `dotnet publish` using the default path.
return Path.Join(AppContext.BaseDirectory, "../../../../../../../license");
}
private static string GetProductName()
{
var assemblyName = typeof(Engine).Assembly.GetName();
return $"Throughput Benchmark Sample\n\n{assemblyName.Name} version {assemblyName.Version}\n";
}
}
}