Throttling Sell Side Sample
Source code
using System;
using System.Collections.Generic;
using OnixS.Fix;
using System.Globalization;
using System.IO;
using OnixS.Fix.Fix44;
using System.Runtime.InteropServices;
using NLog.Extensions.Logging;
namespace ThrottlingSellSide
{
/// <summary>
/// Waits for incoming connections and processes incoming messages.
/// </summary>
internal class Acceptor
{
private const ProtocolVersion fixVersion = ProtocolVersion.Fix44;
private const int MessagesPerSecondLimit = 10;
private TimeSpan ThrottlingInterval = TimeSpan.FromSeconds(1);
private Acceptor() { throttler = new Throttler(MessagesPerSecondLimit, ThrottlingInterval); }
private static string GetLicenseStoreFolder()
{
string path = Path.Join(AppContext.BaseDirectory, "../../../../../license");
if (Directory.Exists(path))
return path;
// expecting it run after dotnet publish using default paths
return Path.Join(AppContext.BaseDirectory, "../../../../../../license");
}
private void Run()
{
Console.WriteLine("Throttling Sell Side Sample");
var settings = new EngineSettings()
{
SendLogoutOnException = true,
SendLogoutOnInvalidLogon = true, // E.g. to send a Logout when the sequence number of the incoming Logon (A) message is less than expected.
LicenseStore = GetLicenseStoreFolder(),
LoggerProvider = new NLogLoggerProvider()
};
settings.ListenPorts.Add(10450);
Engine.Init(settings);
Engine.Instance.Error += (object sender, EngineErrorEventArgs args) =>
{
Console.WriteLine("Engine Error: " + args.ToString());
};
Engine.Instance.Warning += (object sender, EngineWarningEventArgs args) =>
{
Console.WriteLine("Engine Warning: " + args.ToString());
};
var dictionary = fixVersion.ToDictionary();
const string SenderCompID = "ThrottlingSellSide";
const string TargetCompID = "ThrottlingBuySide";
using var session = new Session(SenderCompID, TargetCompID, dictionary);
session.InboundApplicationMessage += OnInboundApplicationMessage;
session.StateChanged += (object sender, SessionStateChangeEventArgs e) =>
{
Console.WriteLine("Session state: " + e.NewState.ToString());
};
session.Error += (object sender, SessionErrorEventArgs e) =>
{
Console.WriteLine("Session Error: " + e.ToString());
};
session.Warning += (object sender, SessionWarningEventArgs e) =>
{
Console.WriteLine("Warning: " + e.ToString());
};
throttler.Reset(MessagesPerSecondLimit, ThrottlingInterval);
session.LogonAsAcceptor();
Console.WriteLine("\nAwaiting session-initiator with "
+ "\n SenderCompID (49) = " + TargetCompID
// Note: from the counterparty point of view SenderCompID is TargetCompID
+ "\n TargetCompID (56) = " + SenderCompID
+ "\n FIX version = " + fixVersion
+ "\non port " + Engine.Instance.Settings.ListenPorts[0] + " ...");
while (true)
{
Console.Write("Press 'X' to quit");
// Start a console read operation. Do not display the input.
ConsoleKeyInfo cki = Console.ReadKey(true);
// Announce the name of the key that was pressed .
Console.WriteLine($" Key pressed: {cki.Key}\n");
// Exit if the user pressed the 'X' key.
if (cki.Key == ConsoleKey.X)
break;
}
Console.WriteLine("Orders handled: " + totalOrdersHandled + ", messages rejected: " + totalMessagesRejected);
Engine.Shutdown();
}
private void OnInboundApplicationMessage(object sender, InboundMessageEventArgs e)
{
Console.WriteLine("Received application-level message:\n" + e.Message.ToString());
try
{
var sn = (Session)sender;
if (throttler.TryThrottle() != TimeSpan.Zero)
{
// Reject the message
sn.SendReject(e.Message.SeqNum, "The message is rejected due to throttling limit exhausting");
++totalMessagesRejected;
Console.WriteLine("Rejection " + totalMessagesRejected + ": message " + e.Message.SeqNum + " rejected due to throttling limit exhausting.");
return;
}
if (e.Message.Type == MsgType.NewOrderSingle)
{
Message execReport = CreateExecutionReport(e.Message, OrdStatus.New, ExecType.New);
sn.Send(execReport);
Console.WriteLine("Sent to the counterparty:\n" + execReport);
++totalOrdersHandled;
}
}
catch (Exception ex)
{
Console.WriteLine("Exception during the processing of the incoming message: " + ex);
}
}
private Message CreateExecutionReport(Message order, string orderStatus, string executionType)
{
var report = new Message(MsgType.ExecutionReport, order.Dictionary);
++orderCounter;
report[Tag.ClOrdID] = order[Tag.ClOrdID];
report[Tag.OrderID] = "OrderID_ " + DateTime.Now.ToString("HHmmss", CultureInfo.InvariantCulture);
report[Tag.ExecID] = "ExecID_" + orderCounter;
report[Tag.OrdStatus] = orderStatus;
report[Tag.ExecType] = executionType;
report[Tag.Symbol] = order[Tag.Symbol];
report[Tag.Side] = order[Tag.Side];
report[Tag.OrdType] = order[Tag.OrdType];
report[Tag.OrderQty] = order[Tag.OrderQty];
report[Tag.LeavesQty] = order[Tag.OrderQty];
report.Set(Tag.CumQty, 0)
.Set(Tag.AvgPx, 100.0);
return report;
}
private int orderCounter = 0;
private Throttler throttler;
private int totalOrdersHandled = 0;
private int totalMessagesRejected = 0;
/// <summary>
/// The main entry point for the application.
/// </summary>
private static int Main()
{
try
{
var acceptor = new Acceptor();
acceptor.Run();
}
catch (Exception ex)
{
Console.WriteLine("Exception: " + ex);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) && ex.Message.Contains("ErrorCode=10013"))
{
Console.WriteLine("The socket is already in use, or access to the port is restricted on this OS. Please try to change the listen port in EngineSettings.ListenPorts to another one.");
Console.WriteLine("You can view a list of which ports are excluded from your user by running this command: 'netsh interface ipv4 show excludedportrange protocol=tcp'");
}
return 1;
}
finally
{
// From https://github.com/NLog/NLog/wiki/Tutorial:
// NET Application running on Mono / Linux are required to stop threads / timers before entering application shutdown phase.
// Failing to do this will cause unhandled exceptions and segmentation faults, and other unpredictable behavior.
NLog.LogManager.Shutdown(); // Flush and close down internal threads and timers
}
return 0;
}
}
}