Benchmark Latency Sample
Source code
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using OnixS.Fix;
using OnixS.Fix.Fix42;
namespace Benchmark.Latency
{
using static Console;
internal static class Latency
{
private static SessionTimeMarks[] marks;
private static bool active;
private static int counter;
private static readonly ManualResetEventSlim ready = new(true);
/// <summary>
/// The main entry point for the application.
/// </summary>
[STAThread]
private static int Main(string[] args)
{
WriteLine(GetProductName());
WriteLine("\n\tUsage: Latency (numberOfMessages) (sendPeriodUsec) (warmupPeriodUsec)\n");
#if DEBUG
const int DefaultNumberOfMessages = 1000;
Console.WriteLine("\n\n\nWARNING: Avoid using the Debug build for benchmarking. The Debug version runs 10-100 times slower!!!\n\n\n");
return 2;
#else
const int DefaultNumberOfMessages = 10000;
#endif
var numberOfMessages = args.Length > 0 ? int.Parse(args[0]) : DefaultNumberOfMessages;
var sendPeriodUsec = args.Length > 1 ? int.Parse(args[1]) : 100;
var warmupPeriodUsec = args.Length > 2 ? int.Parse(args[2]) : 100;
WriteLine($"\tCurrent parameters: numberOfMessages={numberOfMessages}; sendPeriodUsec={sendPeriodUsec}; warmupPeriodUsec={warmupPeriodUsec}.");
try
{
Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.RealTime;
}
catch (Exception ex)
{
Write($"\nWARNING: unable to the set real-time process priority: {ex.Message}.\n To set the real-time priority run this benchmark ");
WriteLine(RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? "under administrator." : "with root access (e.g. using 'sudo').");
}
try
{
var settings = new EngineSettings
{
LicenseStore = GetLicenseStoreFolder(),
Dictionary = "LowLatencyDictionary.xml"
};
settings.ListenPorts.Add(10450);
Engine.Init(settings);
var dictionary = Engine.Instance.MessageInfoDictionaryManager["LowLatencyDictionaryId"];
var rawMessage = "8=FIX.4.2\u00019=3\u000135=D\u000149=OnixS\u000156=CME\u000134=01\u0001" +
"52=20120709-10:10:54\u000111=90001008\u0001109=ClientID\u000121=1\u0001" +
"55=ABC\u000154=1\u000138=100\u000140=1\u000160=20120709-10:10:54\u000110=000\u0001";
// Let's validate the message to make sure that all fields are described in the data dictionary.
var sourceMessage = Message.Parse(rawMessage, dictionary, MessageValidationFlags.StrictCreated);
// Use Serialized Message for the best latency.
var order = new SerializedMessage(sourceMessage);
marks = new SessionTimeMarks[numberOfMessages];
const string SenderCompId = "Acceptor";
const string TargetCompId = "Initiator";
const int MaximumNumberOfWarmupMessages = 10000;
var numberOfWarmupMessages = numberOfMessages > MaximumNumberOfWarmupMessages ? MaximumNumberOfWarmupMessages : numberOfMessages;
// Warm-up to avoid JIT compilation issues and make the first calls fast.
// Separate sessions are used for warm-up. These session should be disposed before real sessions start to work.
CreateSessionsAndMeasure("WU" + SenderCompId, "WU" + TargetCompId, dictionary, order, numberOfWarmupMessages, sendPeriodUsec, warmupPeriodUsec);
// Measurement.
CreateSessionsAndMeasure(SenderCompId, TargetCompId, dictionary, order, numberOfMessages, sendPeriodUsec, warmupPeriodUsec);
ReportResults("Latency", marks, numberOfMessages);
}
catch (Exception ex)
{
WriteLine("Exception: " + ex);
return 1;
}
finally
{
if (Engine.IsInitialized)
Engine.Shutdown();
}
return 0;
}
private static void CreateSessionsAndMeasure(string SenderCompId, string TargetCompId, IMessageInfoDictionary dictionary, SerializedMessage order,
int numberOfMessages, int sendPeriodUsec, int warmupPeriodUsec)
{
using var acceptor = new Session(SenderCompId, TargetCompId, dictionary, false, SessionStorageType.MemoryBasedStorage);
// See https://ref.onixs.biz/net-core-fix-engine-guide/articles/low-latency-best-practices.html.
ConfigureLowLatencySettings(acceptor);
acceptor.InboundApplicationMessage += OnInboundApplicationMessage;
acceptor.BytesReceived += OnBytesReceived;
acceptor.Error += OnError;
acceptor.Warning += OnWarning;
acceptor.ReceivingThreadAffinity = [1];
acceptor.SendingThreadAffinity = [2];
acceptor.LogonAsAcceptor();
using var initiator = new Session(TargetCompId, SenderCompId, dictionary, false, SessionStorageType.MemoryBasedStorage);
ConfigureLowLatencySettings(initiator);
initiator.MessageSending += OnSendingMessage;
initiator.Error += OnError;
initiator.Warning += OnWarning;
initiator.ReceivingThreadAffinity = [3];
initiator.SendingThreadAffinity = [4];
initiator.LogonAsInitiator(IPAddress.Loopback, Engine.Instance.Settings.ListenPorts[0], 0);
active = true;
Measure(initiator, order, numberOfMessages, sendPeriodUsec, warmupPeriodUsec);
active = false;
ready.Wait();
initiator.Logout();
acceptor.Logout();
}
/// <summary>
/// Configures low-latency settings.
///
/// See https://ref.onixs.biz/net-core-fix-engine-guide/articles/low-latency-best-practices.html.
/// </summary>
/// <param name="session"></param>
private static void ConfigureLowLatencySettings(Session session)
{
session.ReceiveSpinningTimeout = 10000;
session.ResendingQueueSize = 0;
session.ReuseEventArguments = true;
session.ReuseInboundMessage = true;
session.ValidateRepeatingGroupEntryCount = false;
session.ValidateCheckSum = false;
}
private static void SpinWait(int microseconds)
{
var start = TimestampHelper.Ticks;
while (TimestampHelper.ElapsedMicroseconds(start) < microseconds)
Thread.SpinWait(10);
}
private static void OnBytesReceived(ReadOnlySpan<byte> args)
{
if (!active)
return;
marks[counter].RecvStart = TimestampHelper.Ticks;
}
private static void OnInboundApplicationMessage(object sender, InboundMessageEventArgs args)
{
if (!active)
return;
marks[counter].RecvFinish = TimestampHelper.Ticks;
++counter;
ready.Set();
}
private static void OnSendingMessage(object sender, MessageSendingEventArgs bytes)
{
if (!active)
return;
marks[counter].SendFinish = TimestampHelper.Ticks;
}
private static void OnError(object sender, SessionErrorEventArgs e)
{
WriteLine("Session Error: " + e);
}
private static void OnWarning(object sender, SessionWarningEventArgs e)
{
WriteLine("Session Warning: " + e);
}
private static void Measure(Session session, SerializedMessage order, int numberOfMessages, int sendPeriodUsec, int warmupPeriodUsec)
{
counter = 0;
var clientIdRef = order.Find(Tag.ClientID);
if (!clientIdRef.IsValid)
throw new LatencyException("clientID field is not found");
var clientIdKey = order.AllocateKey(clientIdRef);
const string ShortClientId = "ClientID";
const string LongClientId = "ClientIDClientIDClientIDClientID";
for (var i = 0; i < numberOfMessages; ++i)
{
ready.Wait();
ready.Reset();
order.Set(clientIdKey, i % 2 == 0 ? ShortClientId : LongClientId);
var currentMarkIndex = counter;
marks[currentMarkIndex].SendStart = TimestampHelper.Ticks;
session.Send(order);
marks[currentMarkIndex].OverallSendFinish = TimestampHelper.Ticks;
if (warmupPeriodUsec > 0)
{
var start = TimestampHelper.Ticks;
do
{
SpinWait(warmupPeriodUsec);
session.WarmUp(order);
} while (TimestampHelper.ElapsedMicroseconds(start) < sendPeriodUsec);
}
else if (sendPeriodUsec > 0)
{
SpinWait(sendPeriodUsec);
}
}
}
private static void ProcessAndReportDurations(string durationName, List<double> durations)
{
durations.Sort();
WriteLine(
"\t{0,-22} min: {1,-8:F} median: {2,-8:F} 99%: {3,-8:F}",
durationName,
durations[0],
durations[durations.Count / 2],
durations[Convert.ToInt32(Math.Ceiling(durations.Count * 99 / 100.0)) - 1]
);
}
private static void ReportResults(string testName, SessionTimeMarks[] marksArray, int count)
{
var baseName = testName + '_';
using StreamWriter receiveStream = new(baseName + "recv.csv", true)
, sendStream = new(baseName + "send.csv", true)
, overallSendStream = new(baseName + "overallsend.csv", true)
, oneWayStream = new(baseName + "oneway.csv", true)
, sendAndReceiveStream = new(baseName + "sendrecv.csv", true);
var sendDurations = new List<double>();
var overallSendDurations = new List<double>();
var receiveDurations = new List<double>();
var sendAndReceiveDurations = new List<double>();
var oneWayDurations = new List<double>();
const int MaxNumberStringLength = 26;
var buff = new char[MaxNumberStringLength];
ReadOnlySpan<char> Format(double value)
{
const string NumberFormat = "0.000";
if (value.TryFormat(buff, out var length, NumberFormat))
return new ReadOnlySpan<char>(buff, 0, length);
return value.ToString(NumberFormat);
}
for (var i = 0; i < count; ++i)
{
var m = marksArray[i];
var receiveDuration = m.RecvSpan / TimestampHelper.TicksPerMicrosecond;
receiveDurations.Add(receiveDuration);
receiveStream.WriteLine(Format(receiveDuration));
var sendDuration = m.SendSpan / TimestampHelper.TicksPerMicrosecond;
sendDurations.Add(sendDuration);
sendStream.WriteLine(Format(sendDuration));
var overallSendDuration = m.OverallSendSpan / TimestampHelper.TicksPerMicrosecond;
overallSendDurations.Add(overallSendDuration);
overallSendStream.WriteLine(Format(overallSendDuration));
var sendAndReceiveDuration = sendDuration + receiveDuration;
sendAndReceiveDurations.Add(sendAndReceiveDuration);
sendAndReceiveStream.WriteLine(Format(sendAndReceiveDuration));
var oneWayDuration = m.OneWaySpan / TimestampHelper.TicksPerMicrosecond;
oneWayDurations.Add(oneWayDuration);
oneWayStream.WriteLine(Format(oneWayDuration));
}
WriteLine("\r\n{0} (microseconds): ", testName);
ProcessAndReportDurations("Engine Receive", receiveDurations);
ProcessAndReportDurations("Engine Send", sendDurations);
ProcessAndReportDurations("Engine Send+Receive", sendAndReceiveDurations);
ProcessAndReportDurations("Overall Send", overallSendDurations);
ProcessAndReportDurations("One-Way (Round-Trip/2)", oneWayDurations);
}
private static string GetLicenseStoreFolder()
{
var path = Path.Join(AppContext.BaseDirectory, "../../../../../../license");
if (Directory.Exists(path))
return path;
// expecting it run after dotnet publish using default paths
return Path.Join(AppContext.BaseDirectory, "../../../../../../../license");
}
private static string GetProductName()
{
var assemblyName = typeof(Engine).Assembly.GetName();
return $"Latency Benchmark Sample\n\n{assemblyName.Name} version {assemblyName.Version}";
}
}
[Serializable]
public class LatencyException : Exception
{
public LatencyException(string message)
: base(message)
{
}
}
}