Database Session Storage Sample
Source code
using OnixS.Fix;
using OnixS.Fix.Fix44;
using System;
using System.IO;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using NLog.Extensions.Logging;
namespace DatabaseSessionStorage
{
static class Sample
{
private static string GetLicenseStoreFolder()
{
var path = Path.Join(AppContext.BaseDirectory, "../../../../../license");
if (Directory.Exists(path))
return path;
// expecting it run after dotnet publish using default paths
return Path.Join(AppContext.BaseDirectory, "../../../../../../license");
}
public static int Main()
{
try
{
var settings = new EngineSettings()
{
LicenseStore = GetLicenseStoreFolder(),
LoggerProvider = new NLogLoggerProvider()
};
const int listenPort = 10450;
settings.ListenPorts.Add(listenPort);
Engine.Init(settings);
Run(listenPort);
}
catch (Exception ex)
{
Console.WriteLine("Error while executing sample: {0}", ex);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) && ex.Message.Contains("ErrorCode=10013"))
{
Console.WriteLine("The socket is already in use, or access to the port is restricted on this OS. Please try to change the listen port in EngineSettings.ListenPorts to another one.");
Console.WriteLine("You can view a list of which ports are excluded from your user by running this command: 'netsh interface ipv4 show excludedportrange protocol=tcp'");
}
return 1;
}
finally
{
Engine.Shutdown();
// From https://github.com/NLog/NLog/wiki/Tutorial:
// NET Application running on Mono / Linux are required to stop threads / timers before entering application shutdown phase.
// Failing to do this will cause unhandled exceptions and segmentation faults, and other unpredictable behavior.
NLog.LogManager.Shutdown(); // Flush and close down internal threads and timers
}
return 0;
}
const ProtocolVersion version = ProtocolVersion.Fix44;
private static void Run(int port)
{
const string Sender = "FileBasedStorage";
const string Target = "DatabaseStorage";
using var acceptor = new Session(Sender, Target, version, false);
acceptor.InboundSessionMessage += (sender, args) => { Console.WriteLine("\nAcceptor received " + args.Message.ToString('|', FixStringFormat.TagName)); };
acceptor.InboundApplicationMessage += (sender, args) =>
{
Console.WriteLine("\nAcceptor received " + args.Message.ToString('|', FixStringFormat.TagName));
inboundApplicationMessageEvent.Set();
};
acceptor.Error += OnError;
acceptor.Warning += OnWarning;
acceptor.LogonAsAcceptor();
using var storage = new SqliteSessionStorage("./SessionStorage.db3", Sender, Target, version, Engine.Instance.Settings.ResendingQueueSize);
using var initiator = new Session(Target, Sender, version, false, storage);
initiator.InboundSessionMessage += (sender, args) => { Console.WriteLine("\nInitiator received " + args.Message.ToString('|', FixStringFormat.TagName)); };
initiator.MessageResending += (sender, e) => { e.AllowResending = true; };
initiator.Error += OnError;
initiator.Warning += OnWarning;
initiator.LogonAsInitiator(IPAddress.Loopback, port);
var order = CreateOrder();
initiator.Send(order);
inboundApplicationMessageEvent.WaitOne();
acceptor.SendResendRequest(1);
inboundApplicationMessageEvent.WaitOne();
initiator.Logout();
acceptor.Logout();
}
private static void OnError(object sender, SessionErrorEventArgs e)
{
Console.WriteLine("Session Error: " + e.ToString());
}
private static void OnWarning(object sender, SessionWarningEventArgs e)
{
Console.WriteLine("Session Warning: " + e.ToString());
}
private static readonly AutoResetEvent inboundApplicationMessageEvent = new AutoResetEvent(false);
public static Message CreateOrder()
{
var order = new Message(MsgType.NewOrderSingle, version);
order.Set(Tag.HandlInst, HandlInst.AutoExecPriv)
.Set(Tag.ClOrdID, "Unique identifier for Order")
.Set(Tag.Symbol, "AAPL")
.Set(Tag.Side, Side.Buy)
.Set(Tag.OrderQty, 1000)
.Set(Tag.OrdType, OrdType.Market);
return order;
}
}
}
SQLite Session Storage
using Microsoft.Data.Sqlite;
using OnixS.Fix;
using OnixS.Fix.Fix42;
using OnixS.Fix.Storage;
using System;
using System.Collections.Generic;
using System.Linq;
using Tag = OnixS.Fix.Fix50.Tag;
namespace DatabaseSessionStorage
{
sealed class SqliteSessionStorage : ISessionStorage
{
private readonly SqliteConnection connection;
private readonly long stateId = -1;
private int inSeqNum;
private int outSeqNum;
private bool terminated;
private DateTime sessionCreationTime;
private bool closed = false;
private long maxStorageSize;
private MessageMode messageMode;
public SqliteSessionStorage(string dataSource, string senderCompId, string targetCompId, ProtocolVersion version, long resendingQueueSize)
{
maxStorageSize = resendingQueueSize;
var createNewSessionEntry = true;
var connectionStringBuilder = new SqliteConnectionStringBuilder
{
DataSource = dataSource
};
connection = new SqliteConnection(connectionStringBuilder.ConnectionString);
connection.Open();
var selectCmd = connection.CreateCommand();
selectCmd.CommandText = $"SELECT * FROM [States] WHERE [SenderCompID] = '{senderCompId}' AND [TargetCompID] = '{targetCompId}' AND [Version] = '{ProtocolVersionHelper.ToString(version)}' ORDER BY [SessionCreationTime] DESC LIMIT 1";
using var selectCmdReader = selectCmd.ExecuteReader();
if (selectCmdReader.Read())
{
const int stateIdColumnOrdinal = 0;
const int stateSessionCreationTimeColumnOrdinal = 1;
const int stateInSeqNumColumnOrdinal = 2;
const int stateOutSeqNumColumnOrdinal = 3;
const int stateTerminatedColumnOrdinal = 6;
stateId = selectCmdReader.GetInt64(stateIdColumnOrdinal);
sessionCreationTime = new DateTime(selectCmdReader.GetInt64(stateSessionCreationTimeColumnOrdinal));
inSeqNum = selectCmdReader.GetInt32(stateInSeqNumColumnOrdinal);
outSeqNum = selectCmdReader.GetInt32(stateOutSeqNumColumnOrdinal);
terminated = selectCmdReader.GetInt16(stateTerminatedColumnOrdinal) > 0;
if (!Terminated)
{
createNewSessionEntry = false;
var command = connection.CreateCommand();
command.CommandText = $"SELECT * FROM [Summary] WHERE [StateId] = {stateId} AND [InOut] = 1 ORDER BY [Id] DESC LIMIT {maxStorageSize}";
using var reader = command.ExecuteReader();
const int summaryMessageColumnOrdinal = 4;
while (reader.Read())
{
var rawMessage = Convert.FromBase64String(reader.GetString(summaryMessageColumnOrdinal));
var message = Message.Parse(rawMessage, Engine.Instance.MessageInfoDictionaryManager);
if (message.GetFlag(Tag.PossDupFlag))
continue;
// Skip all session level messages except for Reject, cause it is the only administrative message which can be resent.
if (message.IsSessionLevel() && message.Type != MsgType.Reject)
continue;
sentMessages.Insert(0, message); //select command have DESC ordering
}
}
}
if(createNewSessionEntry)
{
using var transaction = connection.BeginTransaction();
var command = connection.CreateCommand();
sessionCreationTime = DateTime.UtcNow;
command.CommandText = $"INSERT INTO States VALUES(NULL, {SessionCreationTime.Ticks}, 0, 0, '{senderCompId}', '{targetCompId}', 0, '{ProtocolVersionHelper.ToString(version)}')";
command.ExecuteNonQuery();
// get primary key value for the inserted state entry
var getRowIdCmd = connection.CreateCommand();
getRowIdCmd.CommandText = $"SELECT last_insert_rowid()";
using var getRowIdCmdReader = getRowIdCmd.ExecuteReader();
{
if (getRowIdCmdReader.Read())
stateId = getRowIdCmdReader.GetInt64(0);
}
transaction.Commit();
}
}
public void Clear()
{
sentMessages.Clear();
}
public void Close(bool keepSequenceNumbersBetweenFixConnections, bool doBackup = false)
{
if (closed)
return;
using var transaction = connection.BeginTransaction();
var command = connection.CreateCommand();
command.CommandText = $"UPDATE States SET Terminated = {!keepSequenceNumbersBetweenFixConnections} WHERE Id = {stateId}";
command.ExecuteNonQuery();
transaction.Commit();
closed = true;
}
public string Id
{
get
{
return "SqliteSessionStorage";
}
}
public int InSeqNum
{
get
{
return inSeqNum;
}
set
{
inSeqNum = value;
using var transaction = connection.BeginTransaction();
var command = connection.CreateCommand();
command.CommandText = $"UPDATE States SET InSeqNum = {inSeqNum} WHERE Id = {stateId}";
command.ExecuteNonQuery();
transaction.Commit();
}
}
public int OutSeqNum
{
get
{
return outSeqNum;
}
set
{
outSeqNum = value;
using var transaction = connection.BeginTransaction();
var command = connection.CreateCommand();
command.CommandText = $"UPDATE States SET OutSeqNum = {outSeqNum} WHERE Id = {stateId}";
command.ExecuteNonQuery();
transaction.Commit();
}
}
public DateTime SessionCreationTime
{
get
{
return sessionCreationTime;
}
set
{
sessionCreationTime = value;
using var transaction = connection.BeginTransaction();
var command = connection.CreateCommand();
command.CommandText = $"UPDATE States SET SessionCreationTime = '{sessionCreationTime.Ticks}' WHERE Id = {stateId}";
command.ExecuteNonQuery();
transaction.Commit();
}
}
public MessageMode MessageMode
{
get { return messageMode; }
set { messageMode = value; }
}
public void Flush()
{
// NOOP
}
private readonly List<IMessage> sentMessages = new List<IMessage>();
public IList<IMessage> GetOutboundMessages(int beginSequenceNumber, int endSequenceNumber)
{
return sentMessages.Where((message) => message.SeqNum >= beginSequenceNumber && message.SeqNum <= endSequenceNumber).OrderBy((message) => message.SeqNum).ToList();
}
public void StoreInboundMessage(ReadOnlySpan<byte> rawMessage, int msgSeqNum, bool isOriginal)
{
if (closed)
return;
InSeqNum = msgSeqNum;
using var transaction = connection.BeginTransaction();
var command = connection.CreateCommand();
command.CommandText = $"INSERT INTO Summary VALUES(NULL, {stateId}, 0, '{DateTime.UtcNow.ToString("yyyy MM dd-HH:mm:ss.fffffff")}', '{Convert.ToBase64String(rawMessage)}')";
command.ExecuteNonQuery();
transaction.Commit();
}
public void StoreOutboundMessage(ReadOnlySpan<byte> rawMessage, int msgSeqNum, bool isOriginal = true, bool warmUp = false, DateTime timestamp = default)
{
if (warmUp)
return;
if (closed)
return;
if (isOriginal)
{
sentMessages.Add(Message.Parse(rawMessage, Engine.Instance.MessageInfoDictionaryManager));
if (sentMessages.Count > MaxStorageSize)
sentMessages.RemoveAt(0);
}
using var transaction = connection.BeginTransaction();
var command = connection.CreateCommand();
command.CommandText = $"INSERT INTO Summary VALUES(NULL, {stateId}, 1, '{DateTime.UtcNow.ToString("yyyy MM dd-HH:mm:ss.fffffff")}', '{Convert.ToBase64String(rawMessage)}')";
command.ExecuteNonQuery();
transaction.Commit();
if (OutSeqNum < msgSeqNum)
OutSeqNum = msgSeqNum;
}
public void Dispose()
{
connection.Dispose();
}
public SessionStorageType Type => SessionStorageType.PluggableStorage;
public bool Terminated
{
get
{
return terminated;
}
set
{
terminated = value;
using var transaction = connection.BeginTransaction();
var command = connection.CreateCommand();
command.CommandText = $"UPDATE States SET Terminated = {terminated} WHERE Id = {stateId}";
command.ExecuteNonQuery();
transaction.Commit();
}
}
public long MaxStorageSize
{
get => maxStorageSize;
set
{
maxStorageSize = value;
if (sentMessages.Count > MaxStorageSize)
sentMessages.RemoveRange(0, sentMessages.Count - (int)MaxStorageSize);
}
}
};
}