Pluggable Session Storage Sample
Source code
using OnixS.Fix;
using OnixS.Fix.Fix44;
using System;
using System.IO;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using NLog.Extensions.Logging;
namespace PluggableSessionStorage
{
static class Sample
{
private static string GetLicenseStoreFolder()
{
string path = Path.Join(AppContext.BaseDirectory, "../../../../../license");
if (Directory.Exists(path))
return path;
// expecting it run after dotnet publish using default paths
return Path.Join(AppContext.BaseDirectory, "../../../../../../license");
}
public static int Main()
{
try
{
var settings = new EngineSettings()
{
LicenseStore = GetLicenseStoreFolder(),
LoggerProvider = new NLogLoggerProvider()
};
const int listenPort = 10450;
settings.ListenPorts.Add(listenPort);
Engine.Init(settings);
Run(listenPort);
}
catch (Exception ex)
{
Console.WriteLine("Error while executing sample: {0}", ex);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) && ex.Message.Contains("ErrorCode=10013"))
{
Console.WriteLine("The socket is already in use, or access to the port is restricted on this OS. Please try to change the listen port in EngineSettings.ListenPorts to another one.");
Console.WriteLine("You can view a list of which ports are excluded from your user by running this command: 'netsh interface ipv4 show excludedportrange protocol=tcp'");
}
return 1;
}
finally
{
Engine.Shutdown();
// From https://github.com/NLog/NLog/wiki/Tutorial:
// NET Application running on Mono / Linux are required to stop threads / timers before entering application shutdown phase.
// Failing to do this will cause unhandled exceptions and segmentation faults, and other unpredictable behavior.
NLog.LogManager.Shutdown(); // Flush and close down internal threads and timers
}
return 0;
}
const ProtocolVersion version = ProtocolVersion.Fix44;
private static void Run(int port)
{
var storage = new MyStorage();
const string Sender = "FileBasedStorage";
const string Target = "PluggableStorage";
var acceptor = new Session(Sender, Target, version, false);
acceptor.InboundSessionMessage += (object sender, InboundMessageEventArgs args) =>
{
Console.WriteLine("\nAcceptor received " + args.Message.ToString('|', FixStringFormat.TagName));
};
acceptor.InboundApplicationMessage += (object sender, InboundMessageEventArgs args) =>
{
Console.WriteLine("\nAcceptor received " + args.Message.ToString('|', FixStringFormat.TagName));
inboundApplicationMessageEvent.Set();
};
acceptor.Error += OnError;
acceptor.Warning += OnWarning;
acceptor.LogonAsAcceptor();
var initiator = new Session(Target, Sender, version, false, storage);
initiator.InboundSessionMessage += (object sender, InboundMessageEventArgs args) =>
{
Console.WriteLine("\nInitiator received " + args.Message.ToString('|', FixStringFormat.TagName));
};
initiator.MessageResending += (object sender, MessageResendingEventArgs e) =>
{
e.AllowResending = true;
};
initiator.Error += OnError;
initiator.Warning += OnWarning;
initiator.LogonAsInitiator(IPAddress.Loopback, port);
Message order = CreateOrder();
initiator.Send(order);
inboundApplicationMessageEvent.WaitOne();
acceptor.SendResendRequest(1);
inboundApplicationMessageEvent.WaitOne();
initiator.Logout();
acceptor.Logout();
initiator.Dispose();
acceptor.Dispose();
}
private static void OnError(object sender, SessionErrorEventArgs e)
{
Console.WriteLine("Session Error: " + e.ToString());
}
private static void OnWarning(object sender, SessionWarningEventArgs e)
{
Console.WriteLine("Session Warning: " + e.ToString());
}
private readonly static AutoResetEvent inboundApplicationMessageEvent = new AutoResetEvent(false);
public static Message CreateOrder()
{
var order = new Message(MsgType.NewOrderSingle, version);
order.Set(Tag.HandlInst, HandlInst.AutoExecPriv)
.Set(Tag.ClOrdID, "Unique identifier for Order")
.Set(Tag.Symbol, "AAPL")
.Set(Tag.Side, Side.Buy)
.Set(Tag.OrderQty, 1000)
.Set(Tag.OrdType, OrdType.Market);
return order;
}
}
}
Storage
using OnixS.Fix;
using OnixS.Fix.Storage;
using System;
using System.Collections.Generic;
using System.Linq;
namespace PluggableSessionStorage
{
sealed class MyStorage : ISessionStorage
{
public void Clear()
{
outboundMessages.Clear();
}
public void Close(bool keepSequenceNumbersBetweenFixConnections, bool doBackup = false)
{
// NOOP
}
public string Id
{
get
{
return "MyStorage";
}
}
public int InSeqNum { get; set; }
public int OutSeqNum { get; set; }
private readonly Dictionary<int, IMessage> outboundMessages = new Dictionary<int, IMessage>();
public DateTime SessionCreationTime { get; set; }
public MessageMode MessageMode { get; set; }
public void Flush()
{
// NOOP
}
public IList<IMessage> GetOutboundMessages(int beginSequenceNumber, int endSequenceNumber)
{
return outboundMessages.Where((pair) => pair.Key >= beginSequenceNumber && pair.Key <= endSequenceNumber).OrderBy((pair) => pair.Key).Select((pair) => pair.Value).ToList();
}
public void StoreInboundMessage(ReadOnlySpan<byte> rawMessage, int msgSeqNum, bool isOriginal)
{
InSeqNum = msgSeqNum;
}
public void StoreOutboundMessage(ReadOnlySpan<byte> rawMessage, int msgSeqNum, bool isOriginal = true, bool warmUp = false, DateTime timestamp = default)
{
OutSeqNum = msgSeqNum;
if (!warmUp && !outboundMessages.ContainsKey(msgSeqNum))
{
outboundMessages.Add(msgSeqNum, Message.Parse(rawMessage, Engine.Instance.MessageInfoDictionaryManager));
}
}
public void Dispose()
{
// NOOP
}
public SessionStorageType Type => SessionStorageType.PluggableStorage;
public bool Terminated { get; set; }
public long MaxStorageSize { get; set; }
};
}