Resending Messages Sample
Source code
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Runtime.InteropServices;
using NLog.Extensions.Logging;
using OnixS.Fix;
using OnixS.Fix.Fix42;
namespace ResendingMessagesSample
{
static class ResendingMessagesSample
{
const ProtocolVersion FixVersion = ProtocolVersion.Fix42;
const int ListenPort = 10450;
const int CounterpartyPort = ListenPort;
const int HeartBtInt = 30;
const string SenderCompId = "Initiator";
const string TargetCompId = "Acceptor";
const int OrderCount = 5;
const int SequenceGapValue = 3;
private static string GetLicenseStoreFolder()
{
string path = Path.Join(AppContext.BaseDirectory, "../../../../../license");
if (Directory.Exists(path))
return path;
// expecting it run after dotnet publish using default paths
return Path.Join(AppContext.BaseDirectory, "../../../../../../license");
}
static void WaitUntilEnterKey(string waitText)
{
if (Console.IsInputRedirected)
return;
System.Threading.Thread.Sleep(1000);
Console.WriteLine("\n" + waitText);
Console.ReadKey();
}
static class AcceptorListener
{
public static void OnInboundApplicationMsg(object sender, InboundMessageEventArgs e)
{
Console.WriteLine("\n[" + TargetCompId + "] - Inbound application-level message" +
(e.Message.GetFlag(Tag.PossDupFlag) ? " with PossDupFlag=Y:\n" : ":\n") + e.Message);
}
public static void OnOutboundApplicationMsg(object sender, MessageEventArgs e)
{
Console.WriteLine("\n[" + TargetCompId + "] - Inbound application-level message" +
(e.Message.GetFlag(Tag.PossDupFlag) ? " with PossDupFlag=Y:\n" : ":\n") + e.Message);
}
public static void OnInboundSessionMsg(object sender, InboundMessageEventArgs e)
{
if(e.Message.Type == OnixS.Fix.Fix44.MsgType.SequenceReset)
{
Console.WriteLine("\n[" + TargetCompId + "] - Inbound <Sequence Reset> message, MsgSeqNum=" +
e.Message.Get(Tag.MsgSeqNum) + ", NewSeqNo=" + e.Message.Get(Tag.NewSeqNo) + ":\n" + e.Message);
}
else Console.WriteLine("\n[" + TargetCompId + "] - Inbound session-level message" +
(e.Message.GetFlag(Tag.PossDupFlag) ? " with PossDupFlag=Y:\n" : ":\n") + e.Message);
}
public static void OnOutboundSessionMsg(object sender, MessageEventArgs e)
{
if(e.Message.Type == OnixS.Fix.Fix44.MsgType.ResendRequest)
{
Console.WriteLine("\n[" + TargetCompId + "] - Outbound <Resend Request> message, BeginSeqNo=" +
e.Message.Get(Tag.BeginSeqNo) + ", EndSeqNo=" + e.Message.Get(Tag.EndSeqNo) + ":\n" + e.Message);
}
else Console.WriteLine("\n[" + TargetCompId + "] - Outbound session-level message:\n" + e.Message);
}
public static void OnStateChange(object sender, SessionStateChangeEventArgs e)
{
Console.WriteLine("\n[" + TargetCompId + "] - Session's state is changed, prevState="
+ e.PrevState.ToString()
+ ", newState="
+ e.NewState.ToString());
}
public static void OnWarning(object sender, SessionWarningEventArgs e)
{
Console.WriteLine("\n[" + TargetCompId + "] - Session warning: " + e.ToString());
}
public static void OnError(object sender, SessionErrorEventArgs e)
{
Console.WriteLine("\n[" + TargetCompId + "] - Session error: " + e.ToString());
}
}
static class InitiatorListener
{
public static void OnMessageResending(object sender, MessageResendingEventArgs e)
{
Console.WriteLine("\n[" + SenderCompId + "] - MessageResending event is called for message with MsgSeqNum=" +
e.Message.Get(Tag.MsgSeqNum) + ", AllowResending is " + ResendMode.ToString());
e.AllowResending = ResendMode;
}
public static void OnWarning(object sender, SessionWarningEventArgs e)
{
Console.WriteLine("\n[" + SenderCompId + "] - Session warning: " + e.ToString());
}
public static void OnError(object sender, SessionErrorEventArgs e)
{
Console.WriteLine("\n[" + SenderCompId + "] - Session error: " + e.ToString());
}
static public bool ResendMode { get; set; } = false;
}
static void SimulateSequenceGap(Session acceptor, Session initiator, string caseName)
{
acceptor.InboundApplicationMessage += AcceptorListener.OnInboundApplicationMsg;
acceptor.InboundSessionMessage += AcceptorListener.OnInboundSessionMsg;
acceptor.OutboundApplicationMessage += AcceptorListener.OnOutboundApplicationMsg;
acceptor.OutboundSessionMessage += AcceptorListener.OnOutboundSessionMsg;
acceptor.StateChanged += new EventHandler<SessionStateChangeEventArgs>(AcceptorListener.OnStateChange);
acceptor.Warning += new EventHandler<SessionWarningEventArgs>(AcceptorListener.OnWarning);
acceptor.Error += new EventHandler<SessionErrorEventArgs>(AcceptorListener.OnError);
acceptor.ResetLocalSequenceNumbers();
acceptor.LogonAsAcceptor();
initiator.ResetLocalSequenceNumbers();
initiator.Warning += new EventHandler<SessionWarningEventArgs>(InitiatorListener.OnWarning);
initiator.Error += new EventHandler<SessionErrorEventArgs>(InitiatorListener.OnError);
Console.WriteLine("\nConnecting to Acceptor on host=" + IPAddress.Loopback.ToString() + " port=" + CounterpartyPort + "...");
initiator.LogonAsInitiator(IPAddress.Loopback, CounterpartyPort, HeartBtInt);
Console.WriteLine("\nSending " + OrderCount + " regular orders...");
var order = new Message(OnixS.Fix.Fix44.MsgType.NewOrderSingle, initiator.Dictionary);
int orderId = 0;
for (int orderCounter = 0; orderCounter < OrderCount; ++orderCounter)
{
order.Set(Tag.ClOrdID, ++orderId);
initiator.Send(order);
}
if (interactive)
{
WaitUntilEnterKey("Press any key to check [" + caseName + "] ...");
}
else
{
System.Threading.Thread.Sleep(100);
}
Console.WriteLine("\nLet's decrease the incoming sequence number of Acceptor by " + SequenceGapValue +
" and send " + OrderCount + " orders to simulate a sequence gap.");
acceptor.InSeqNum -= SequenceGapValue;
for (int orderCounter = 0; orderCounter < OrderCount; ++orderCounter)
{
order.Set(Tag.ClOrdID, ++orderId);
initiator.Send(order);
}
if (interactive)
{
WaitUntilEnterKey("Press any key to disconnect Initiator and complete the case...");
}
else
{
System.Threading.Thread.Sleep(100);
}
initiator.Logout("The session is disconnected by Initiator");
acceptor.Logout();
}
static void Case1(string caseName)
{
using Session acceptor = new Session(TargetCompId, SenderCompId, FixVersion);
using Session initiator = new Session(SenderCompId, TargetCompId, FixVersion);
SimulateSequenceGap(acceptor, initiator, caseName);
}
static void Case2(string caseName)
{
using Session acceptor = new Session(TargetCompId, SenderCompId, FixVersion);
using Session initiator = new Session(SenderCompId, TargetCompId, FixVersion);
InitiatorListener.ResendMode = false;
initiator.MessageResending += new EventHandler<MessageResendingEventArgs>(InitiatorListener.OnMessageResending);
SimulateSequenceGap(acceptor, initiator, caseName);
}
static void Case3(string caseName)
{
using Session acceptor = new Session(TargetCompId, SenderCompId, FixVersion);
using Session initiator = new Session(SenderCompId, TargetCompId, FixVersion);
InitiatorListener.ResendMode = true;
initiator.MessageResending += new EventHandler<MessageResendingEventArgs>(InitiatorListener.OnMessageResending);
SimulateSequenceGap(acceptor, initiator, caseName);
}
static void Case4(string caseName)
{
using Session acceptor = new Session(TargetCompId, SenderCompId, FixVersion);
acceptor.ResendRequestMaximumRange = SequenceGapValue - 1;
using Session initiator = new Session(SenderCompId, TargetCompId, FixVersion);
InitiatorListener.ResendMode = true;
initiator.MessageResending += new EventHandler<MessageResendingEventArgs>(InitiatorListener.OnMessageResending);
SimulateSequenceGap(acceptor, initiator, caseName);
}
static void Case5(string caseName)
{
using Session acceptor = new Session(TargetCompId, SenderCompId, FixVersion);
using Session initiator = new Session(SenderCompId, TargetCompId, FixVersion);
InitiatorListener.ResendMode = true;
initiator.MessageResending += new EventHandler<MessageResendingEventArgs>(InitiatorListener.OnMessageResending);
initiator.ResendingQueueSize = SequenceGapValue - 1;
SimulateSequenceGap(acceptor, initiator, caseName);
}
static void Case6(string caseName)
{
using Session acceptor = new Session(TargetCompId, SenderCompId, FixVersion);
acceptor.RequestOnlyMissedMessages = true;
using Session initiator = new Session(SenderCompId, TargetCompId, FixVersion);
InitiatorListener.ResendMode = true;
initiator.MessageResending += new EventHandler<MessageResendingEventArgs>(InitiatorListener.OnMessageResending);
SimulateSequenceGap(acceptor, initiator, caseName);
}
static void Case7(string caseName)
{
using Session acceptor = new Session(TargetCompId, SenderCompId, FixVersion);
acceptor.RequestOnlyMissedMessages = true;
acceptor.ResendRequestMaximumRange = SequenceGapValue - 1;
using Session initiator = new Session(SenderCompId, TargetCompId, FixVersion);
InitiatorListener.ResendMode = true;
initiator.MessageResending += new EventHandler<MessageResendingEventArgs>(InitiatorListener.OnMessageResending);
SimulateSequenceGap(acceptor, initiator, caseName);
}
public delegate void Case(string caseName);
static bool interactive;
/// <summary>
/// The main entry point for the application.
/// </summary>
static int Main(string[] args)
{
try
{
Console.WriteLine("ResendingMessages sample");
var settings = new EngineSettings()
{
LicenseStore = GetLicenseStoreFolder(),
LoggerProvider = new NLogLoggerProvider()
};
settings.ListenPorts.Add(ListenPort);
Engine.Init(settings);
var menuItems = new Dictionary<int, KeyValuePair<Case, string>>
{
{ 1, new KeyValuePair<Case, string>(Case1, "Default resend functionality without using any specific settings and MessageResending events in the Initiator") },
{ 2, new KeyValuePair<Case, string>(Case2, "Default resend functionality with using the MessageResending event which sets AllowResending to false") },
{ 3, new KeyValuePair<Case, string>(Case3, "Default resend functionality with using the MessageResending event which sets AllowResending to true") },
{ 4, new KeyValuePair<Case, string>(Case4, "Default resend functionality with a limitation of the resend request range") },
{ 5, new KeyValuePair<Case, string>(Case5, "Default resend functionality with a limitation of the resending queue size") },
{ 6, new KeyValuePair<Case, string>(Case6, "Second option of resend functionality (RequestOnlyMissedMessages=true) with using the MessageResending event which sets AllowResending to true") },
{ 7, new KeyValuePair<Case, string>(Case7, "Second option of resend functionality (RequestOnlyMissedMessages=true) with a limitation of the resend request range") }
};
interactive = 0 == args.Length;
if (interactive)
{
while (true)
{
Console.WriteLine("\nSelect resend functionality case you want to perform or press <Enter> to exit: \n");
for (int index = 1; index <= menuItems.Count; ++index)
{
Console.WriteLine(index.ToString() + ") " + menuItems[index].Value);
}
ConsoleKeyInfo key = Console.ReadKey();
if (key.Key == ConsoleKey.Enter)
break;
bool r = int.TryParse(key.KeyChar.ToString(), out int selectedItemNumber);
if (r && selectedItemNumber > 0 && selectedItemNumber <= menuItems.Count)
{
menuItems[selectedItemNumber].Key(menuItems[selectedItemNumber].Value);
}
else
{
Console.WriteLine("\nInvalid command requested. Repeat your choice.");
}
}
}
else
{
Case1("Default resend functionality without using any specific settings and MessageResending events in the Initiator");
}
Engine.Shutdown();
}
catch(Exception ex)
{
Console.WriteLine("Exception: " + ex);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) && ex.Message.Contains("ErrorCode=10013"))
{
Console.WriteLine("The socket is already in use, or access to the port is restricted on this OS. Please try to change the listen port in EngineSettings.ListenPorts to another one.");
Console.WriteLine("You can view a list of which ports are excluded from your user by running this command: 'netsh interface ipv4 show excludedportrange protocol=tcp'");
}
return 1;
}
finally
{
// From https://github.com/NLog/NLog/wiki/Tutorial:
// NET Application running on Mono / Linux are required to stop threads / timers before entering application shutdown phase.
// Failing to do this will cause unhandled exceptions and segmentation faults, and other unpredictable behavior.
NLog.LogManager.Shutdown(); // Flush and close down internal threads and timers
}
return 0;
}
}
}