• Version 1.16.0
Show / Hide Table of Contents

Database Session Storage Sample Project

This sample demonstrates how to create a custom database session storage.

© Onix Solutions

Source code


using OnixS.Fix;
using OnixS.Fix.Fix44;
using System;
using System.IO;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using NLog.Extensions.Logging;

namespace DatabaseSessionStorage
{
    static class Sample
    {
        private static string GetLicenseStoreFolder()
        {
            var path = Path.Join(AppContext.BaseDirectory, "../../../../../license");

            if (Directory.Exists(path))
                return path;

            // expecting it run after dotnet publish using default paths
            return Path.Join(AppContext.BaseDirectory, "../../../../../../license");
        }

        public static int Main()
        {
            try
            {
                var settings = new EngineSettings()
                {
                    LicenseStore = GetLicenseStoreFolder(),
                    LoggerProvider = new NLogLoggerProvider()
                };

                const int listenPort = 10450;
                settings.ListenPorts.Add(listenPort);

                Engine.Init(settings);

                Run(listenPort);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error while executing sample: {0}", ex);

                if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) && ex.Message.Contains("ErrorCode=10013"))
                {
                    Console.WriteLine("The socket is already in use, or access to the port is restricted on this OS. Please try to change the listen port in EngineSettings.ListenPorts to another one.");
                    Console.WriteLine("You can view a list of which ports are excluded from your user by running this command: 'netsh interface ipv4 show excludedportrange protocol=tcp'");
                }

                return 1;
            }
            finally
            {
                Engine.Shutdown();

                // From https://github.com/NLog/NLog/wiki/Tutorial:
                // NET Application running on Mono / Linux are required to stop threads / timers before entering application shutdown phase.
                // Failing to do this will cause unhandled exceptions and segmentation faults, and other unpredictable behavior.
                NLog.LogManager.Shutdown(); // Flush and close down internal threads and timers
            }

            return 0;
        }

        const ProtocolVersion version = ProtocolVersion.Fix44;

        private static void Run(int port)
        {
            const string Sender = "FileBasedStorage";
            const string Target = "DatabaseStorage";

            using var acceptor = new Session(Sender, Target, version, false);

            acceptor.InboundSessionMessage += (sender, args) => { Console.WriteLine("\nAcceptor received " + args.Message.ToString('|', FixStringFormat.TagName)); };
            acceptor.InboundApplicationMessage += (sender, args) =>
            {
                Console.WriteLine("\nAcceptor received " + args.Message.ToString('|', FixStringFormat.TagName));

                inboundApplicationMessageEvent.Set();
            };

            acceptor.Error += OnError;
            acceptor.Warning += OnWarning;

            acceptor.LogonAsAcceptor();

            using var storage = new SqliteSessionStorage("./SessionStorage.db3", Sender, Target, version, Engine.Instance.Settings.ResendingQueueSize);
            using var initiator = new Session(Target, Sender, version, false, storage);

            initiator.InboundSessionMessage += (sender, args) => { Console.WriteLine("\nInitiator received " + args.Message.ToString('|', FixStringFormat.TagName)); };
            initiator.MessageResending += (sender, e) => { e.AllowResending = true; };

            initiator.Error += OnError;
            initiator.Warning += OnWarning;

            initiator.LogonAsInitiator(IPAddress.Loopback, port);

            var order = CreateOrder();
            initiator.Send(order);

            inboundApplicationMessageEvent.WaitOne();

            acceptor.SendResendRequest(1);

            inboundApplicationMessageEvent.WaitOne();

            initiator.Logout();
            acceptor.Logout();
        }

        private static void OnError(object sender, SessionErrorEventArgs e)
        {
            Console.WriteLine("Session Error: " + e.ToString());
        }

        private static void OnWarning(object sender, SessionWarningEventArgs e)
        {
            Console.WriteLine("Session Warning: " + e.ToString());
        }

        private static readonly AutoResetEvent inboundApplicationMessageEvent = new AutoResetEvent(false);

        public static Message CreateOrder()
        {
            var order = new Message(MsgType.NewOrderSingle, version);

            order.Set(Tag.HandlInst, HandlInst.AutoExecPriv)
                 .Set(Tag.ClOrdID, "Unique identifier for Order")
                 .Set(Tag.Symbol, "AAPL")
                 .Set(Tag.Side, Side.Buy)
                 .Set(Tag.OrderQty, 1000)
                 .Set(Tag.OrdType, OrdType.Market);

            return order;
        }
    }
}

SQLite Session Storage


using Microsoft.Data.Sqlite;
using OnixS.Fix;
using OnixS.Fix.Fix42;
using OnixS.Fix.Storage;
using System;
using System.Collections.Generic;
using System.Linq;
using Tag = OnixS.Fix.Fix50.Tag;

namespace DatabaseSessionStorage
{
    sealed class SqliteSessionStorage : ISessionStorage
    {
        private readonly SqliteConnection connection;
        private readonly long stateId = -1;
        private int inSeqNum;
        private int outSeqNum;
        private bool terminated;
        private DateTime sessionCreationTime;
        private bool closed = false;
        private long maxStorageSize;
        private MessageMode messageMode;

        public SqliteSessionStorage(string dataSource, string senderCompId, string targetCompId, ProtocolVersion version, long resendingQueueSize)
        {
            maxStorageSize = resendingQueueSize;

            var createNewSessionEntry = true;

            var connectionStringBuilder = new SqliteConnectionStringBuilder
            {
                DataSource = dataSource
            };

            connection = new SqliteConnection(connectionStringBuilder.ConnectionString);
            connection.Open();

            var selectCmd = connection.CreateCommand();
            selectCmd.CommandText = $"SELECT * FROM [States] WHERE [SenderCompID] = '{senderCompId}' AND [TargetCompID] = '{targetCompId}' AND [Version] = '{ProtocolVersionHelper.ToString(version)}' ORDER BY [SessionCreationTime] DESC LIMIT 1";

            using var selectCmdReader = selectCmd.ExecuteReader();
            if (selectCmdReader.Read())
            {
                const int stateIdColumnOrdinal = 0;
                const int stateSessionCreationTimeColumnOrdinal = 1;
                const int stateInSeqNumColumnOrdinal = 2;
                const int stateOutSeqNumColumnOrdinal = 3;
                const int stateTerminatedColumnOrdinal = 6;

                stateId = selectCmdReader.GetInt64(stateIdColumnOrdinal);

                sessionCreationTime = new DateTime(selectCmdReader.GetInt64(stateSessionCreationTimeColumnOrdinal));
                inSeqNum = selectCmdReader.GetInt32(stateInSeqNumColumnOrdinal);
                outSeqNum = selectCmdReader.GetInt32(stateOutSeqNumColumnOrdinal);
                terminated = selectCmdReader.GetInt16(stateTerminatedColumnOrdinal) > 0;

                if (!Terminated)
                {
                    createNewSessionEntry = false;

                    var command = connection.CreateCommand();
                    command.CommandText = $"SELECT * FROM [Summary] WHERE [StateId] = {stateId} AND [InOut] = 1 ORDER BY [Id] DESC LIMIT {maxStorageSize}";

                    using var reader = command.ExecuteReader();

                    const int summaryMessageColumnOrdinal = 4;
                    while (reader.Read())
                    {
                        var rawMessage = Convert.FromBase64String(reader.GetString(summaryMessageColumnOrdinal));
                        var message = Message.Parse(rawMessage, Engine.Instance.MessageInfoDictionaryManager);

                        if (message.GetFlag(Tag.PossDupFlag))
                            continue;

                        // Skip all session level messages except for Reject, cause it is the only administrative message which can be resent.
                        if (message.IsSessionLevel() && message.Type != MsgType.Reject)
                            continue;

                        sentMessages.Insert(0, message); //select command have DESC ordering
                    }
                }
            }

            if(createNewSessionEntry)
            {
                using var transaction = connection.BeginTransaction();
                var command = connection.CreateCommand();

                sessionCreationTime = DateTime.UtcNow;

                command.CommandText = $"INSERT INTO States VALUES(NULL, {SessionCreationTime.Ticks}, 0, 0, '{senderCompId}', '{targetCompId}', 0, '{ProtocolVersionHelper.ToString(version)}')";
                command.ExecuteNonQuery();

                // get primary key value for the inserted state entry
                var getRowIdCmd = connection.CreateCommand();
                getRowIdCmd.CommandText = $"SELECT last_insert_rowid()";

                using var getRowIdCmdReader = getRowIdCmd.ExecuteReader();
                {
                    if (getRowIdCmdReader.Read())
                        stateId = getRowIdCmdReader.GetInt64(0);
                }

                transaction.Commit();
            }
        }

        public void Clear()
        {
            sentMessages.Clear();
        }

        public void Close(bool keepSequenceNumbersBetweenFixConnections, bool doBackup = false)
        {
            if (closed)
                return;

            using var transaction = connection.BeginTransaction();
            var command = connection.CreateCommand();

            command.CommandText = $"UPDATE States SET Terminated = {!keepSequenceNumbersBetweenFixConnections} WHERE Id = {stateId}";
            command.ExecuteNonQuery();

            transaction.Commit();

            closed = true;
        }

        public string Id
        {
            get
            {
                return "SqliteSessionStorage";
            }
        }

        public int InSeqNum
        {
            get
            {
                return inSeqNum;
            }
            set
            {
                inSeqNum = value;

                using var transaction = connection.BeginTransaction();

                var command = connection.CreateCommand();

                command.CommandText = $"UPDATE States SET InSeqNum = {inSeqNum} WHERE Id = {stateId}";
                command.ExecuteNonQuery();

                transaction.Commit();
            }
        }

        public int OutSeqNum
        {
            get
            {
                return outSeqNum;
            }
            set
            {
                outSeqNum = value;

                using var transaction = connection.BeginTransaction();

                var command = connection.CreateCommand();

                command.CommandText = $"UPDATE States SET OutSeqNum = {outSeqNum} WHERE Id = {stateId}";
                command.ExecuteNonQuery();

                transaction.Commit();
            }
        }

        public DateTime SessionCreationTime
        {
            get
            {
                return sessionCreationTime;
            }
            set
            {
                sessionCreationTime = value;

                using var transaction = connection.BeginTransaction();

                var command = connection.CreateCommand();

                command.CommandText = $"UPDATE States SET SessionCreationTime = '{sessionCreationTime.Ticks}' WHERE Id = {stateId}";
                command.ExecuteNonQuery();

                transaction.Commit();
            }
        }

        public MessageMode MessageMode
        {
            get { return messageMode; }
            set { messageMode = value; }
        }

        public void Flush()
        {
            // NOOP
        }

        private readonly List<IMessage> sentMessages = new List<IMessage>();

        public IList<IMessage> GetOutboundMessages(int beginSequenceNumber, int endSequenceNumber)
        {
            return sentMessages.Where((message) => message.SeqNum >= beginSequenceNumber && message.SeqNum <= endSequenceNumber).OrderBy((message) => message.SeqNum).ToList();
        }

        public void StoreInboundMessage(ReadOnlySpan<byte> rawMessage, int msgSeqNum, bool isOriginal)
        {
            if (closed)
                return;

            InSeqNum = msgSeqNum;

            using var transaction = connection.BeginTransaction();

            var command = connection.CreateCommand();

            command.CommandText = $"INSERT INTO Summary VALUES(NULL, {stateId}, 0, '{DateTime.UtcNow.ToString("yyyy MM dd-HH:mm:ss.fffffff")}', '{Convert.ToBase64String(rawMessage)}')";
            command.ExecuteNonQuery();

            transaction.Commit();
        }

        public void StoreOutboundMessage(ReadOnlySpan<byte> rawMessage, int msgSeqNum, bool isOriginal = true, bool warmUp = false, DateTime timestamp = default)
        {
            if (warmUp)
                return;

            if (closed)
                return;

            if (isOriginal)
            {
                sentMessages.Add(Message.Parse(rawMessage, Engine.Instance.MessageInfoDictionaryManager));

                if (sentMessages.Count > MaxStorageSize)
                    sentMessages.RemoveAt(0);
            }

            using var transaction = connection.BeginTransaction();

            var command = connection.CreateCommand();

            command.CommandText = $"INSERT INTO Summary VALUES(NULL, {stateId}, 1, '{DateTime.UtcNow.ToString("yyyy MM dd-HH:mm:ss.fffffff")}', '{Convert.ToBase64String(rawMessage)}')";
            command.ExecuteNonQuery();

            transaction.Commit();

            if (OutSeqNum < msgSeqNum)
                OutSeqNum = msgSeqNum;
        }

        public void Dispose()
        {
            connection.Dispose();
        }

        public SessionStorageType Type => SessionStorageType.PluggableStorage;

        public bool Terminated
        {
            get
            {
                return terminated;
            }
            set
            {
                terminated = value;

                using var transaction = connection.BeginTransaction();

                var command = connection.CreateCommand();

                command.CommandText = $"UPDATE States SET Terminated = {terminated} WHERE Id = {stateId}";
                command.ExecuteNonQuery();

                transaction.Commit();
            }
        }

        public long MaxStorageSize
        {
            get => maxStorageSize;
            set
            {
                maxStorageSize = value;

                if (sentMessages.Count > MaxStorageSize)
                    sentMessages.RemoveRange(0, sentMessages.Count - (int)MaxStorageSize);
            }
        }
    };
}

In this article
Back to top Copyright © Onix Solutions.
Generated by DocFX