Benchmark Throughput Sample
Source code
using OnixS.Fix;
using OnixS.Fix.Fix42;
using System;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
namespace Benchmark.Throughput
{
using static Console;
internal static class Throughput
{
private static readonly ManualResetEventSlim testingIsFinished = new(false);
private static readonly ManualResetEventSlim warmupIsFinished = new(false);
private static long receiveStart;
private static long receiveFinish;
#if DEBUG
private const int WarmupNumberOfMessages = 1000;
private const int NumberOfMessages = 1000;
#else
private const int WarmupNumberOfMessages = 300000;
private const int NumberOfMessages = 1000000;
#endif
enum Mode
{
Localhost,
Initiator,
Acceptor
}
/// <summary>
/// The main entry point for the application.
/// </summary>
[STAThread]
private static void Main(string[] argv)
{
WriteLine(GetProductName());
try
{
Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.RealTime;
}
catch (Exception ex)
{
WriteLine("\nWarning: unable to set real-time priority to this process - " + ex.Message);
WriteLine(RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
? "To enable process priority control run this benchmark under administrator"
: "To enable process priority control run this benchmark using 'sudo dotnet Throughput.dll'");
WriteLine();
}
var mode = Mode.Localhost;
var host = "127.0.0.1";
try
{
if (argv.Length > 0)
{
mode = (Mode)Enum.Parse(typeof(Mode), argv[0]);
if (mode == Mode.Initiator)
{
if (argv.Length > 1)
{
host = argv[1];
}
else
{
throw new Exception("The Initiator needs the host name to connect to as the second argument");
}
}
}
var settings = new EngineSettings()
{
Dictionary = "ThroughputFixDictionary.xml",
LicenseStore = GetLicenseStoreFolder(),
LogBeforeSending = false,
LogInboundMessages = false,
LogOutboundMessages = false,
ReceiveSpinningTimeout = 10000,
ReceiveBufferSize = 10 * 1024 * 1024,
ResendingQueueSize = 0,
SendBufferSize = 10 * 1024 * 1024,
TcpNoDelay = false,
ValidateRepeatingGroupEntryCount = false,
ValidateCheckSum = false
};
settings.LicenseStores.Add(".");
const int Port = 10450;
if (mode != Mode.Initiator)
{
settings.ListenPorts.Add(Port);
}
Engine.Init(settings);
IMessageInfoDictionary dictionary = Engine.Instance.MessageInfoDictionaryManager["ThroughputDictionaryId"];
const string SenderCompId = "Acceptor";
const string TargetCompId = "Initiator";
Session acceptor = null;
if (mode != Mode.Initiator)
{
acceptor = new(SenderCompId, TargetCompId, dictionary, false, SessionStorageType.MemoryBasedStorage);
acceptor.InboundApplicationMessage += InboundApplicationMsgEvent;
acceptor.Error += OnError;
acceptor.Warning += OnWarning;
acceptor.ReuseInboundMessage = true;
acceptor.ReuseEventArguments = true;
acceptor.ReceivingThreadAffinity = [1];
acceptor.SendingThreadAffinity = [2];
WriteLine($"Acceptor is waiting for the incoming connection on port {Port}..");
acceptor.LogonAsAcceptor();
}
Session initiator = null;
if (mode != Mode.Acceptor)
{
initiator = new(TargetCompId, SenderCompId, dictionary, false, SessionStorageType.MemoryBasedStorage);
initiator.Error += OnError;
initiator.Warning += OnWarning;
initiator.MessageGrouping = 200;
initiator.ReuseInboundMessage = true;
initiator.ReceivingThreadAffinity = [3];
initiator.SendingThreadAffinity = [4];
initiator.LogonAsInitiator(host, Port, 30);
SerializedMessage order = CreateOrder(dictionary);
for (var i = 0; i < WarmupNumberOfMessages; ++i)
initiator.Send(order);
var email = new Message(MsgType.News, dictionary);
email.Set(Tag.Headline, "Warmup is finished");
initiator.Send(email);
if (mode == Mode.Localhost)
warmupIsFinished.Wait();
var sendStart = Stopwatch.GetTimestamp();
for (var i = 0; i < NumberOfMessages; ++i)
initiator.Send(order);
var sendFinish = Stopwatch.GetTimestamp();
email.Set(Tag.Headline, "Testing is finished");
initiator.Send(email);
var sendMessagesPerSec = Stopwatch.Frequency * NumberOfMessages / (sendFinish - sendStart);
WriteLine($"Throughput on the sending side: {string.Format("{0:n0}", sendMessagesPerSec)} msg/sec");
}
if (mode != Mode.Initiator) {
testingIsFinished.Wait();
var receiveMessagesPerSec = Stopwatch.Frequency * NumberOfMessages / (receiveFinish - receiveStart);
WriteLine($"Throughput on the receiving side: {string.Format("{0:n0}", receiveMessagesPerSec)} msg/sec");
}
acceptor?.Logout();
initiator?.Logout();
Engine.Shutdown();
}
catch (Exception ex)
{
WriteLine("Exception: " + ex);
}
}
private static SerializedMessage CreateOrder(IMessageInfoDictionary dictionary)
{
var rawMessage =
"8=FIX.4.2\u00019=3\u000135=D\u000149=OnixS\u000156=CME\u000134=01\u000152=20120709-10:10:54\u0001" +
"11=90001008\u000121=1\u000155=ABC\u000154=1\u000138=100\u000140=1\u000160=20120709-10:10:54\u000110=000\u0001";
var message = Message.Parse(rawMessage, dictionary, MessageValidationFlags.StrictCreated);
message.Validate();
var order = new SerializedMessage(message);
return order;
}
private static void InboundApplicationMsgEvent(object sender, InboundMessageEventArgs args)
{
Message message = args.Message;
if (message.CompareType(MsgType.News))
{
string headline = message[Tag.Headline];
if (headline == "Warmup is finished")
{
warmupIsFinished.Set();
receiveStart = Stopwatch.GetTimestamp();
}
else if (headline == "Testing is finished")
{
receiveFinish = Stopwatch.GetTimestamp();
testingIsFinished.Set();
}
}
}
private static void OnError(object sender, SessionErrorEventArgs e)
{
WriteLine("Session Error: " + e);
}
private static void OnWarning(object sender, SessionWarningEventArgs e)
{
WriteLine("Session Warning: " + e);
}
private static string GetLicenseStoreFolder()
{
string path = Path.Join(AppContext.BaseDirectory, "../../../../../../license");
if (Directory.Exists(path))
return path;
// We assume to run after `dotnet publish` using the default path.
return Path.Join(AppContext.BaseDirectory, "../../../../../../../license");
}
private static string GetProductName()
{
var assemblyName = typeof(Engine).Assembly.GetName();
return $"Throughput Benchmark Sample\n\n{assemblyName.Name} version {assemblyName.Version}\n";
}
}
}