Samples :: Database Storage

Database Storage Sample

Description

The example demonstrates the externalization of FIX Engine state into persistent storage. Engine data can be stored in external, network-accessible storage that is physically located outside the node where the FIX Engine is running. The example illustrates the architectural principle only. It is not a complete solution for clustered or active-active deployments.

The external storage contains sent and received FIX messages and communication session state. These data correspond to the file-based mechanisms:

  • .summary files store messages
  • .state files store session state

In this implementation, a functional equivalent of these mechanisms is provided using a JDBC-compatible database. The database may be deployed on remote or fault-tolerant infrastructure. This allows the FIX Engine state to be restored when the engine starts on another node.

Log files are not part of this mechanism. They are stored in the local file system in the usual manner.

The example provides:

  • DatabaseSessionStorage, which implements SessionStorage and persists session state and message summaries using JDBC
  • A sample application that registers the custom storage with the FIX Engine
  • A performance meter that generates a large number of outbound messages

DatabaseStorageRepository provides only minimal functionality. It does not implement advanced session data management policies, such as state transfer between sessions. Such functionality may be required in production environments.

Session Ownership Scope

This sample does not provide a distributed ownership protocol for the same session identity across multiple engine processes.

For production deployments, session identities should be partitioned so that ownership keys do not overlap between processes: SenderCompID for initiator role and TargetCompID for acceptor role. In this model, session uniqueness inside one process is handled by the FIX Engine and its storage repository, while different processes do not contend for the same identities.

If cross-process session allocation is required, it should be handled by a higher-level control plane that assigns sessions to engine instances. Implementing this coordination is out of scope for this sample.

The biz.onixs.fix.engine.storage.SessionStorage and biz.onixs.fix.engine.storage.StorageRepository are implemented.

Usage

  • Run the sample:
    • win: DatabaseStorageSample.bat
    • linux: DatabaseStorageSample.sh
  • Clean everything:
    • win: clean.bat
    • linux: clean.sh

Source Code

Sample application entry point:


import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.EngineSettings;
import biz.onixs.fix.engine.Session;
import biz.onixs.fix.engine.SessionState;
import biz.onixs.fix.engine.storage.SessionStorageType;
import biz.onixs.fix.engine.storage.StorageRepository;
import biz.onixs.fix.engine.storage.StorageRepositoryManager;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX40;
import biz.onixs.fix.tag.Tag;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * Sample application showing how to register and use database-backed session storage.
 */
public class DatabaseStorageSample {
    private static final Logger LOG = LoggerFactory.getLogger(DatabaseStorageSample.class);
    private static final String SETTINGS_RESOURCE = "sample/DatabaseSessionStorage.properties";
    Settings storageSettings = new PropertyBasedSettings(SETTINGS_RESOURCE);
    private static final String MY_STORAGE_TYPE_ID = "MyStorage";
    private static final String SENDER_CID = "A";
    private static final String TARGET_CID = "B";
    private static final Version FIX_VERSION = Version.FIX50_SP2;
    private static final int PORT = 10001;
    private static final long EVENT_WAIT_TIMEOUT_SECONDS = 10L;
    private final Semaphore initiatorEstablished = new Semaphore(0);
    private final Semaphore acceptorEstablished = new Semaphore(0);
    private final Semaphore executionReportReceived = new Semaphore(0);
    private SessionStorageType myStorageType = null;

    /**
     * Runs the sample by initializing the engine and exercising a session pair.
     */
    private void run() {
        Engine engine = null;
        Session initiator = null;
        Session acceptor = null;
        try {
            engine = initEngine();
            LOG.info("Engine inited.");
            //
            initiator = createSession(SENDER_CID, TARGET_CID);
            acceptor = createSession(TARGET_CID, SENDER_CID);
            configureApplicationFlow(initiator, acceptor);
            //
            LOG.info("Establishing sessions...");
            acceptor.logonAsAcceptor();
            initiator.logonAsInitiator("localhost", PORT);
            waitFor("initiator session establishment", initiatorEstablished);
            waitFor("acceptor session establishment", acceptorEstablished);
            //
            final Message order = createOrder();
            LOG.info("Sending order: {}", order);
            initiator.send(order);
            waitFor("execution report reception", executionReportReceived);
        } catch (final RuntimeException e) {
            LOG.error("{}", e.getMessage(), e);
        } finally {
            LOG.info("Closing sessions...");
            closeSession("initiator", initiator);
            closeSession("acceptor", acceptor);
            shutdownEngine(engine);
        }
    }

    /**
     * Initializes the FIX engine and registers a custom storage repository.
     */
    private Engine initEngine() {
        final EngineSettings engineSettings = new EngineSettings();
        engineSettings.addListenPort(PORT);
        // Set the storage folder name, but in this sample it is not used.
        // The custom session storage stores messages in the database.
        engineSettings.setStorageFolder("storage-DatabaseStorageSample");
        //
        final Engine engine = Engine.init(engineSettings);
        // Get the storage repository manager in order to register a custom session storage repository.
        final StorageRepositoryManager repositoryManager = engine.getStorageRepositoryManager();
        // Create instance of the custom session storage repository.
        final StorageRepository myStorageRepository = new DatabaseStorageRepository(storageSettings);
        // Register custom session storage.
        myStorageType = repositoryManager.register(MY_STORAGE_TYPE_ID, myStorageRepository);
        //
        return engine;
    }

    /**
     * Creates a session using the custom storage type.
     */
    private Session createSession(final String senderCompId, final String targetCompId) {
        // Create a session with the custom session storage.
        return new Session(senderCompId, targetCompId, FIX_VERSION, myStorageType);
    }

    /**
     * Configures state and application message handlers for both sessions.
     */
    private void configureApplicationFlow(final Session initiator, final Session acceptor) {
        initiator.addStateChangeListener((sender, args) -> {
            if (SessionState.ESTABLISHED == args.getNewState()) {
                initiatorEstablished.release();
            }
        });

        acceptor.addStateChangeListener((sender, args) -> {
            if (SessionState.ESTABLISHED == args.getNewState()) {
                acceptorEstablished.release();
            }
        });

        initiator.setInboundApplicationMessageListener((sender, args) -> {
            final Message message = args.getMsg();
            LOG.info("Initiator received application message: {}", message);
            if (message.checkType(FIX40.MsgType.Execution_Report)) {
                executionReportReceived.release();
            }
        });

        acceptor.setInboundApplicationMessageListener((sender, args) -> {
            final Message message = args.getMsg();
            LOG.info("Acceptor received application message: {}", message);
            if (message.checkType(FIX40.MsgType.Order_Single)) {
                final Message executionReport = createExecutionReport(message);
                LOG.info("Sending execution report in response: {}", executionReport);
                acceptor.send(executionReport);
            }
        });
    }

    /**
     * Creates a sample NewOrderSingle application message.
     */
    private Message createOrder() {
        final Message order = Message.create(FIX40.MsgType.Order_Single, FIX_VERSION);
        order.set(Tag.HandlInst, "1")
                .set(Tag.ClOrdID, "Order-" + System.currentTimeMillis())
                .set(Tag.Symbol, "IBM")
                .set(Tag.Side, "1")
                .set(Tag.OrderQty, 1000)
                .set(Tag.OrdType, "1");
        return order;
    }

    /**
     * Creates a simple execution report in response to an incoming order.
     */
    private Message createExecutionReport(final Message order) {
        final Message execReport = Message.create(FIX40.MsgType.Execution_Report, FIX_VERSION);
        execReport.set(Tag.OrderID, order.get(Tag.ClOrdID))
                .set(Tag.ExecID, "Exec-" + System.nanoTime())
                .set(Tag.ExecTransType, "0")
                .set(Tag.OrdStatus, "0")
                .set(Tag.Symbol, order.get(Tag.Symbol))
                .set(Tag.Side, order.get(Tag.Side))
                .set(Tag.LastQty, order.get(Tag.OrderQty))
                .set(Tag.LastPx, "100.0")
                .set(Tag.OrderQty, order.get(Tag.OrderQty))
                .set(Tag.CumQty, order.get(Tag.OrderQty))
                .set(Tag.AvgPx, "100.0");
        return execReport;
    }

    /**
     * Waits for an asynchronous sample event.
     */
    private void waitFor(final String eventName, final Semaphore semaphore) {
        try {
            if (!semaphore.tryAcquire(EVENT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                throw new IllegalStateException("Timed out waiting for: " + eventName);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for: " + eventName, e);
        }
    }

    /**
     * Logs out and disposes a session, suppressing cleanup failures.
     */
    private void closeSession(final String sessionName, final Session session) {
        if (session == null) {
            return;
        }
        try {
            session.logout();
        } catch (RuntimeException e) {
            LOG.warn("Failed to logout {} session: {}", sessionName, e.getMessage());
        }
        try {
            session.dispose();
        } catch (RuntimeException e) {
            LOG.warn("Failed to dispose {} session: {}", sessionName, e.getMessage());
        }
    }

    /**
     * Shuts down the engine, suppressing cleanup failures.
     */
    private void shutdownEngine(final Engine engine) {
        if (engine == null) {
            return;
        }
        try {
            engine.shutdown();
            LOG.info("Engine shutdown.");
        } catch (RuntimeException e) {
            LOG.warn("Failed to shutdown engine: {}", e.getMessage());
        }
    }

    /**
     * Entry point for running the sample.
     */
    public static void main(final String[] args) {
        try {
            final DatabaseStorageSample sample = new DatabaseStorageSample();
            sample.run();
        } catch (final Throwable throwable) {
            LOG.error(throwable.getMessage(), throwable);
        }
    }
}

The code for the classes used in the example is shown below.

DatabaseStorageRepository

Storage repository implementation:

import biz.onixs.fix.engine.SessionId;
import biz.onixs.fix.engine.storage.SessionStorage;
import biz.onixs.fix.engine.storage.StorageRepository;
import biz.onixs.util.settings.Settings;

/**
 * Storage repository that creates database-backed session storage instances.
 */
public class DatabaseStorageRepository implements StorageRepository {
    Settings settings;
    private int maxStorageSize = 1000;

    /**
     * Creates a repository configured with the provided settings.
     */
    public DatabaseStorageRepository(Settings settings) {
        this.settings = settings;
    }

    /**
     * Creates session storage and optionally clears it for a clean start.
     */
    @Override
    public SessionStorage create(final SessionId sessionId, final boolean cleanStart) {
        final SessionStorage storage = new DatabaseSessionStorage(sessionId, settings, cleanStart);
        storage.setMaxStorageSize(getMaxStorageSize());
        return storage;
    }

    /**
     * Returns the maximum number of messages to keep in storage.
     */
    @Override
    public int getMaxStorageSize() {
        return maxStorageSize;
    }

    /**
     * Updates the maximum number of messages to keep in storage.
     */
    @Override
    public void setMaxStorageSize(final int maxStorageSize) {
        if (0 > maxStorageSize) {
            throw new IllegalArgumentException("0 > maxStorageSize");
        }
        this.maxStorageSize = maxStorageSize;
    }
}

DatabaseSessionStorage

Session storage implementation:


import biz.onixs.fix.engine.SessionId;
import biz.onixs.fix.engine.storage.SessionStorage;
import biz.onixs.fix.parser.FixFlatMessage;
import biz.onixs.fix.parser.FixMessage;
import biz.onixs.util.ByteBuffer;
import biz.onixs.util.settings.Settings;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

/**
 * Database-backed session storage implementation of {@link SessionStorage}.
 *
 * Supported DBs:
 *  - H2 (embedded)
 *  - MySQL / MariaDB
 *  - PostgreSQL
 *
 * The DB dialect and JDBC connection parameters are expected to be provided via properties.
 */
public class DatabaseSessionStorage implements SessionStorage {
    // ---- properties keys (external .properties) ----
    private static final String PROP_DIALECT  = "db.dialect";
    private static final String PROP_URL      = "db.url";
    private static final String PROP_USER     = "db.user";
    private static final String PROP_PASSWORD = "db.password";

    private final String id;
    private final DatabaseDialect dialect;
    private final DatabaseSessionStateOperations sessionState;
    private final DatabaseSessionSummaryOperations sessionSummary;
    private final DatabaseTransactionHelper transactionHelper;

    private boolean closed = false;
    private int maxStorageSize = 1000;

    private Connection connection = null;

    /**
     * Creates session storage for the provided session identity and settings.
     */
    DatabaseSessionStorage(final SessionId sessionId, final Settings settings) {
        this(sessionId, settings, false);
    }

    /**
     * Creates session storage for the provided session identity and settings.
     */
    DatabaseSessionStorage(final SessionId sessionId, final Settings settings, final boolean cleanStart) {
        Objects.requireNonNull(sessionId, "null == sessionId");
        Objects.requireNonNull(settings, "null == settings");

        id = sessionId.getSenderCompId() + '-' + sessionId.getTargetCompId() + '-' +
                sessionId.getFixVersion().getId() + '-' + sessionId.getToken();

        this.dialect = parseDialect(requireSetting(settings, PROP_DIALECT));

        createDatabaseConnection(settings);

        new DatabaseStorageSchemaInitializer(connection, dialect).initializeSchema();

        sessionState = new DatabaseSessionStateOperations(
                connection,
                sessionId.getSenderCompId(),
                sessionId.getTargetCompId(),
                sessionId.getFixVersion().getId(),
                sessionId.getToken()
        );
        sessionSummary = new DatabaseSessionSummaryOperations(connection);
        transactionHelper = new DatabaseTransactionHelper(connection);

        // Find or create session state record following file-based storage lifecycle.
        transactionHelper.inTransaction(() -> {
            sessionState.loadOrInsert(cleanStart);
        });
    }

    /**
     * Creates the JDBC connection based on the provided settings.
     */
    private void createDatabaseConnection(Settings settings) {
        final String url = requireSetting(settings, PROP_URL);
        final String user = settings.getString(PROP_USER, "");
        final String pass = settings.getString(PROP_PASSWORD, "");

        try {
            connection = DriverManager.getConnection(url, user, pass);
        } catch (SQLException e) {
            throw new RuntimeException("Failed to open JDBC connection for url=" + url, e);
        }
    }

    /**
     * Reads a required setting or throws if missing/blank.
     */
    private static String requireSetting(Settings settings, String key) {
        final String v = settings.getString(key);
        if (v == null || v.trim().isEmpty()) {
            throw new IllegalArgumentException("Missing required property: " + key);
        }
        return v.trim();
    }

    /**
     * Parses the database dialect from a configuration value.
     */
    private static DatabaseDialect parseDialect(String v) {
        switch (v.trim().toLowerCase(Locale.ROOT)) {
            case "h2":
                return DatabaseDialect.H2;
            case "mysql":
                return DatabaseDialect.MYSQL;
            case "postgres":
                return DatabaseDialect.POSTGRES;
            default:
                throw new IllegalArgumentException("Unknown db.dialect: " + v);
        }
    }

    /**
     * Implements {@link SessionStorage#clear()} by resetting state and removing summary rows.
     */
    @Override
    public synchronized void clear() {
        transactionHelper.inTransaction(() -> {
            sessionState.resetState();
            sessionSummary.deleteSummaryForState(sessionState.getSessionRecordId());
        });
    }

    /**
     * Implements {@link SessionStorage#close(boolean)} by marking termination and closing the connection.
     */
    @Override
    public synchronized void close(final boolean terminated) {
        RuntimeException failure = null;
        final Connection currentConnection = connection;
        try {
            if (currentConnection != null && sessionState.getSessionRecordId() > 0) {
                sessionState.markTerminated(terminated);
            }
        } catch (SQLException e) {
            failure = new RuntimeException(e);
        } catch (RuntimeException e) {
            failure = e;
        } finally {
            if (currentConnection != null) {
                try {
                    currentConnection.close();
                } catch (SQLException e) {
                    if (failure == null) {
                        failure = new RuntimeException(e);
                    } else {
                        failure.addSuppressed(e);
                    }
                }
            }
            connection = null;
            closed = true;
        }
        if (failure != null) {
            throw failure;
        }
    }

    /**
     * Implements {@link SessionStorage#getOutboundMessages(long, long)} by loading outbound messages from storage.
     */
    @Override
    public synchronized List<FixMessage> getOutboundMessages(final long beginSequenceNumber,
                                                             final long endSequenceNumber) {
        if (beginSequenceNumber < 1L) {
            throw new IllegalArgumentException("beginSequenceNumber < 1");
        }

        sessionState.ensureStateLoaded();
        try {
            return sessionSummary.loadOutboundMessages(
                    sessionState.getSessionRecordId(),
                    beginSequenceNumber,
                    endSequenceNumber,
                    getMaxStorageSize()
            );
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Implements {@link SessionStorage#storeInboundMessage(ByteBuffer, long, boolean)} by persisting inbound summaries.
     */
    @Override
    public synchronized void storeInboundMessage(final ByteBuffer rawMessage, final long msgSeqNum,
                                                 final boolean isOriginal) {
        Objects.requireNonNull(rawMessage, "rawMessage");
        if (msgSeqNum < 1L) {
            throw new IllegalArgumentException("msgSeqNum < 1");
        }

        if (closed) {
            return;
        }

        sessionState.ensureStateLoaded();

        transactionHelper.inTransaction(() -> {
            sessionSummary.insertInboundMessage(
                    sessionState.getSessionRecordId(),
                    rawMessage,
                    msgSeqNum,
                    isOriginal
            );

            // The ability to cache message sequence numbers depends on the overall storage management.
            if ((getInSeqNum() + 1L) == msgSeqNum) {
                setInSeqNum(msgSeqNum);
            }
        });
    }

    /**
     * Implements {@link SessionStorage#storeOutboundMessage(ByteBuffer, long, boolean, boolean)} by persisting outbound summaries.
     */
    @Override
    public synchronized void storeOutboundMessage(final ByteBuffer rawMessage, final long msgSeqNum,
                                                  final boolean isOriginal, final boolean warmUp) {
        Objects.requireNonNull(rawMessage, "rawMessage");
        if (msgSeqNum < 1L) {
            throw new IllegalArgumentException("msgSeqNum < 1");
        }

        if (closed || warmUp) {
            return;
        }

        sessionState.ensureStateLoaded();

        transactionHelper.inTransaction(() -> {
            sessionSummary.insertOutboundMessage(
                    sessionState.getSessionRecordId(),
                    rawMessage,
                    msgSeqNum,
                    isOriginal
            );

            // The ability to cache message sequence numbers depends on the overall storage management.
            if (getOutSeqNum() < msgSeqNum) {
                setOutSeqNum(msgSeqNum);
            }
        });
    }

    /**
     * Implements {@link SessionStorage#getId()} by returning the session identifier.
     */
    @Override
    public synchronized String getId() {
        return id;
    }

    /**
     * Implements {@link SessionStorage#getInSeqNum()} by reading the stored inbound sequence number.
     */
    @Override
    public synchronized long getInSeqNum() {
        return readStoredSequenceNumber(DatabaseElementNames.COL_STATE_IN_SEQ_NUM);
    }

    /**
     * Implements {@link SessionStorage#setInSeqNum(long)} by updating the stored inbound sequence number.
     */
    @Override
    public synchronized void setInSeqNum(final long msgSeqNum) {
        updateStoredSequenceNumber(DatabaseElementNames.COL_STATE_IN_SEQ_NUM, msgSeqNum);
    }

    /**
     * Implements {@link SessionStorage#getOutSeqNum()} by reading the stored outbound sequence number.
     */
    @Override
    public synchronized long getOutSeqNum() {
        return readStoredSequenceNumber(DatabaseElementNames.COL_STATE_OUT_SEQ_NUM);
    }

    /**
     * Implements {@link SessionStorage#setOutSeqNum(long)} by updating the stored outbound sequence number.
     */
    @Override
    public synchronized void setOutSeqNum(final long msgSeqNum) {
        updateStoredSequenceNumber(DatabaseElementNames.COL_STATE_OUT_SEQ_NUM, msgSeqNum);
    }

    /**
     * Implements {@link SessionStorage#getMaxStorageSize()} by returning the current cap.
     */
    @Override
    public synchronized int getMaxStorageSize() {
        return maxStorageSize;
    }

    /**
     * Implements {@link SessionStorage#setMaxStorageSize(int)} by updating the maximum stored message count.
     */
    @Override
    public synchronized void setMaxStorageSize(final int maxStorageSize) {
        if (0 > maxStorageSize) {
            throw new IllegalArgumentException("0 > maxStorageSize");
        }
        this.maxStorageSize = maxStorageSize;
    }

    /**
     * Implements {@link SessionStorage#flush(boolean)} as a no-op for JDBC-backed storage.
     */
    @Override
    public synchronized void flush(boolean b) throws IOException {
        // no-op
    }

    /**
     * Builds a FIX message from the stored raw buffer.
     */
    static FixMessage createMessage(final ByteBuffer buffer) {
        return (buffer == null) ? null : new FixFlatMessage(buffer);
    }

    /**
     * Reads a stored sequence number from the state table.
     */
    private long readStoredSequenceNumber(String columnName) {
        try {
            return sessionState.readStoredSequenceNumber(columnName);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Updates a stored sequence number in the state table.
     */
    private void updateStoredSequenceNumber(String columnName, long value) {
        try {
            sessionState.updateStoredSequenceNumber(columnName, value);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}

DatabaseSessionStateOperations

Session state operations:


import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
 * Operations for session state records stored in the states table.
 */
final class DatabaseSessionStateOperations {
    private final Connection connection;
    private final String senderCompId;
    private final String targetCompId;
    private final String fixVersion;
    private final String sessionToken;
    private boolean loadedStateTerminated = false;
    private int sessionRecordId = 0;

    /**
     * Creates state operations for a specific session identity.
     */
    DatabaseSessionStateOperations(Connection connection,
                                   String senderCompId,
                                   String targetCompId,
                                   String fixVersion,
                                   String sessionToken) {
        this.connection = connection;
        this.senderCompId = senderCompId;
        this.targetCompId = targetCompId;
        this.fixVersion = fixVersion;
        this.sessionToken = sessionToken;
    }

    /**
     * Loads an active state row or creates a new one according to file-based storage lifecycle.
     */
    void loadOrInsert(final boolean cleanStart) throws SQLException {
        if (cleanStart) {
            insertStateRecord();
            return;
        }

        if (!loadLatestStateRecord()) {
            insertStateRecord();
            return;
        }

        if (loadedStateTerminated) {
            // Keep terminated row unchanged and start a new storage generation.
            insertStateRecord();
        }
    }

    /**
     * Returns the current state record id.
     */
    int getSessionRecordId() {
        return sessionRecordId;
    }

    /**
     * Ensures the state record has been loaded or inserted.
     */
    void ensureStateLoaded() {
        if (sessionRecordId <= 0) {
            throw new IllegalStateException("Session state is not loaded");
        }
    }

    /**
     * Resets the state row's sequence numbers and terminated flag.
     */
    void resetState() throws SQLException {
        ensureStateLoaded();
        final String sqlUpdate =
                "UPDATE " + DatabaseElementNames.TABLE_STATES +
                        " SET " +
                        DatabaseElementNames.COL_STATE_IN_SEQ_NUM + "=?, " +
                        DatabaseElementNames.COL_STATE_OUT_SEQ_NUM + "=?, " +
                        DatabaseElementNames.COL_STATE_TERMINATED + "=? " +
                        "WHERE " + DatabaseElementNames.COL_ID + "=?";

        try (PreparedStatement ps = connection.prepareStatement(sqlUpdate)) {
            ps.setLong(1, 0L);
            ps.setLong(2, 0L);
            ps.setLong(3, 0L);
            ps.setInt(4, sessionRecordId);
            ps.executeUpdate();
        }
    }

    /**
     * Updates the terminated flag in the state row.
     */
    void markTerminated(boolean terminated) throws SQLException {
        ensureStateLoaded();
        try (PreparedStatement ps = connection.prepareStatement(
                "UPDATE " + DatabaseElementNames.TABLE_STATES +
                        " SET " + DatabaseElementNames.COL_STATE_TERMINATED + "=? " +
                        " WHERE " + DatabaseElementNames.COL_ID + "=?")) {
            ps.setInt(1, terminated ? 1 : 0);
            ps.setInt(2, sessionRecordId);
            ps.executeUpdate();
        }
    }

    /**
     * Reads a sequence number column from the state row.
     */
    long readStoredSequenceNumber(String columnName) throws SQLException {
        ensureStateLoaded();
        final String sql =
                "SELECT " + columnName +
                        " FROM " + DatabaseElementNames.TABLE_STATES +
                        " WHERE " + DatabaseElementNames.COL_ID + "=?";
        try (PreparedStatement ps = connection.prepareStatement(sql)) {
            ps.setInt(1, sessionRecordId);
            try (ResultSet rs = ps.executeQuery()) {
                if (!rs.next()) {
                    throw new SQLException("No rows returned");
                }
                return rs.getLong(1);
            }
        }
    }

    /**
     * Updates a sequence number column in the state row.
     */
    void updateStoredSequenceNumber(String columnName, long value) throws SQLException {
        ensureStateLoaded();
        final String sql =
                "UPDATE " + DatabaseElementNames.TABLE_STATES +
                        " SET " + columnName + "=? " +
                        " WHERE " + DatabaseElementNames.COL_ID + "=?";
        try (PreparedStatement ps = connection.prepareStatement(sql)) {
            ps.setLong(1, value);
            ps.setInt(2, sessionRecordId);
            ps.executeUpdate();
        }
    }

    /**
     * Inserts a new state row and captures the generated id.
     */
    private void insertStateRecord() throws SQLException {
        final String sqlInsert =
                "INSERT INTO " + DatabaseElementNames.TABLE_STATES + " (" +
                        DatabaseElementNames.COL_STATE_IN_SEQ_NUM + ", " +
                        DatabaseElementNames.COL_STATE_OUT_SEQ_NUM + ", " +
                        DatabaseElementNames.COL_STATE_SENDER_COMP_ID + ", " +
                        DatabaseElementNames.COL_STATE_TARGET_COMP_ID + ", " +
                        DatabaseElementNames.COL_STATE_TERMINATED + ", " +
                        DatabaseElementNames.COL_STATE_VERSION + ", " +
                        DatabaseElementNames.COL_STATE_TOKEN + ", " +
                        DatabaseElementNames.COL_STATE_SESSION_CREATION_TIME +
                        ") VALUES (0, 0, ?, ?, 0, ?, ?, ?)";

        try (PreparedStatement stInsert = connection.prepareStatement(sqlInsert, Statement.RETURN_GENERATED_KEYS)) {
            stInsert.setString(1, senderCompId);
            stInsert.setString(2, targetCompId);
            stInsert.setString(3, fixVersion);
            stInsert.setString(4, sessionToken);
            // UTC epoch millis
            stInsert.setLong(5, System.currentTimeMillis());

            int rows = stInsert.executeUpdate();
            if (rows != 1) {
                throw new SQLException("Expected 1 row inserted into " + DatabaseElementNames.TABLE_STATES + " but got " + rows);
            }

            try (ResultSet keys = stInsert.getGeneratedKeys()) {
                if (keys.next()) {
                    sessionRecordId = keys.getInt(1);
                } else {
                    throw new SQLException("No generated key returned for " + DatabaseElementNames.TABLE_STATES);
                }
            }
        }
    }

    /**
     * Loads the newest state row for the current session identity.
     */
    private boolean loadLatestStateRecord() throws SQLException {
        final String sqlAll =
                "SELECT " + DatabaseElementNames.COL_ID + ", " + DatabaseElementNames.COL_STATE_TERMINATED +
                        " FROM " + DatabaseElementNames.TABLE_STATES +
                        " WHERE " + DatabaseElementNames.COL_STATE_SENDER_COMP_ID + "=? " +
                        "AND " + DatabaseElementNames.COL_STATE_TARGET_COMP_ID + "=? " +
                        "AND " + DatabaseElementNames.COL_STATE_VERSION + "=? " +
                        "AND " + DatabaseElementNames.COL_STATE_TOKEN + "=? " +
                        "ORDER BY " + DatabaseElementNames.COL_STATE_SESSION_CREATION_TIME + " DESC, " +
                        DatabaseElementNames.COL_ID + " DESC";

        try (PreparedStatement st = connection.prepareStatement(sqlAll)) {
            st.setString(1, senderCompId);
            st.setString(2, targetCompId);
            st.setString(3, fixVersion);
            st.setString(4, sessionToken);

            try (ResultSet rs = st.executeQuery()) {
                if (!rs.next()) {
                    return false;
                }
                sessionRecordId = rs.getInt(DatabaseElementNames.COL_ID);
                loadedStateTerminated = rs.getInt(DatabaseElementNames.COL_STATE_TERMINATED) != 0;
                return true;
            }
        }
    }
}

DatabaseSessionSummaryOperations

Message summary operations:


import biz.onixs.fix.parser.FixMessage;
import biz.onixs.util.ByteBuffer;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;

/**
 * Operations for session message summary records stored in the summary table.
 */
final class DatabaseSessionSummaryOperations {
    private final Connection connection;

    /**
     * Creates summary operations for the provided connection.
     */
    DatabaseSessionSummaryOperations(Connection connection) {
        this.connection = connection;
    }

    /**
     * Deletes all summary rows for the given session state id.
     */
    void deleteSummaryForState(int sessionRecordId) throws SQLException {
        final String sqlDelete =
                "DELETE FROM " + DatabaseElementNames.TABLE_SUMMARY +
                        " WHERE " + DatabaseElementNames.COL_SUMMARY_STATE_ID + "=?";
        try (PreparedStatement psDelete = connection.prepareStatement(sqlDelete)) {
            psDelete.setInt(1, sessionRecordId);
            psDelete.executeUpdate();
        }
    }

    /**
     * Loads outbound messages for the given session and range.
     */
    List<FixMessage> loadOutboundMessages(int sessionRecordId,
                                          long beginSequenceNumber,
                                          long endSequenceNumber,
                                          int maxStorageSize) throws SQLException {
        if (maxStorageSize <= 0) {
            return new ArrayList<>();
        }

        final String sql =
                "SELECT " + DatabaseElementNames.COL_SUMMARY_SEQ_NUM + ", " +
                        DatabaseElementNames.COL_SUMMARY_MESSAGE +
                        " FROM " + DatabaseElementNames.TABLE_SUMMARY +
                        " WHERE " +
                        DatabaseElementNames.COL_SUMMARY_STATE_ID + "=? " +
                        "AND " + DatabaseElementNames.COL_SUMMARY_IN_OUT + "=0 " +
                        "AND " + DatabaseElementNames.COL_SUMMARY_SEQ_NUM + ">=? " +
                        "AND " + DatabaseElementNames.COL_SUMMARY_SEQ_NUM + "<=? " +
                        "ORDER BY " + DatabaseElementNames.COL_SUMMARY_SEQ_NUM + " DESC, " +
                        DatabaseElementNames.COL_SUMMARY_IS_ORIGINAL + " DESC, " +
                        DatabaseElementNames.COL_SUMMARY_TIMESTAMP + " DESC, " +
                        DatabaseElementNames.COL_ID + " DESC";

        try (PreparedStatement ps = connection.prepareStatement(sql)) {
            ps.setInt(1, sessionRecordId);
            ps.setLong(2, beginSequenceNumber);
            ps.setLong(3, endSequenceNumber);

            try (ResultSet rs = ps.executeQuery()) {
                final List<FixMessage> result = new ArrayList<>();
                long previousSeqNum = Long.MIN_VALUE;
                while (rs.next()) {
                    final long seqNum = rs.getLong(DatabaseElementNames.COL_SUMMARY_SEQ_NUM);
                    if (seqNum == previousSeqNum) {
                        continue;
                    }

                    byte[] data = Base64.getDecoder().decode(rs.getString(DatabaseElementNames.COL_SUMMARY_MESSAGE));
                    previousSeqNum = seqNum;

                    FixMessage msg = DatabaseSessionStorage.createMessage(new ByteBuffer(data));
                    result.add(msg);

                    if (result.size() >= maxStorageSize) {
                        break;
                    }
                }
                Collections.reverse(result);
                return result;
            }
        }
    }

    /**
     * Inserts an inbound message summary row.
     */
    void insertInboundMessage(int sessionRecordId,
                              ByteBuffer rawMessage,
                              long msgSeqNum,
                              boolean isOriginal) throws SQLException {
        final String sqlInsert =
                "INSERT INTO " + DatabaseElementNames.TABLE_SUMMARY + " (" +
                        DatabaseElementNames.COL_SUMMARY_STATE_ID + ", " +
                        DatabaseElementNames.COL_SUMMARY_IN_OUT + ", " +
                        DatabaseElementNames.COL_SUMMARY_SEQ_NUM + ", " +
                        DatabaseElementNames.COL_SUMMARY_TIMESTAMP + ", " +
                        DatabaseElementNames.COL_SUMMARY_IS_ORIGINAL + ", " +
                        DatabaseElementNames.COL_SUMMARY_MESSAGE +
                        ") VALUES (?, 1, ?, ?, ?, ?)";

        try (PreparedStatement ps = connection.prepareStatement(sqlInsert)) {
            ps.setInt(1, sessionRecordId);
            ps.setLong(2, msgSeqNum);
            ps.setLong(3, System.currentTimeMillis()); // UTC epoch millis
            ps.setInt(4, isOriginal ? 1 : 0);
            ps.setString(5, Base64.getEncoder().encodeToString(rawMessage.getCopy()));
            ps.executeUpdate();
        }
    }

    /**
     * Inserts an outbound message summary row.
     */
    void insertOutboundMessage(int sessionRecordId,
                               ByteBuffer rawMessage,
                               long msgSeqNum,
                               boolean isOriginal) throws SQLException {
        final String sqlInsert =
                "INSERT INTO " + DatabaseElementNames.TABLE_SUMMARY + " (" +
                        DatabaseElementNames.COL_SUMMARY_STATE_ID + ", " +
                        DatabaseElementNames.COL_SUMMARY_IN_OUT + ", " +
                        DatabaseElementNames.COL_SUMMARY_SEQ_NUM + ", " +
                        DatabaseElementNames.COL_SUMMARY_TIMESTAMP + ", " +
                        DatabaseElementNames.COL_SUMMARY_IS_ORIGINAL + ", " +
                        DatabaseElementNames.COL_SUMMARY_MESSAGE +
                        ") VALUES (?, 0, ?, ?, ?, ?)";

        try (PreparedStatement ps = connection.prepareStatement(sqlInsert)) {
            ps.setInt(1, sessionRecordId);
            ps.setLong(2, msgSeqNum);
            ps.setLong(3, System.currentTimeMillis()); // UTC epoch millis
            ps.setInt(4, isOriginal ? 1 : 0);
            ps.setString(5, Base64.getEncoder().encodeToString(rawMessage.getCopy()));
            ps.executeUpdate();
        }
    }
}

DatabaseStorageSchemaInitializer

Schema initialization helper:


import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

/**
 * Initializes the database storage schema by creating tables and indexes.
 *
 * Exact SQL syntax depends on the chosen database dialect.
 */
class DatabaseStorageSchemaInitializer {
    private final Connection connection;
    private final DatabaseDialect dialect;

    /**
     * Creates a schema initializer for the given connection and dialect.
     */
    public DatabaseStorageSchemaInitializer(Connection connection, DatabaseDialect dialect) {
        this.connection = connection;
        this.dialect = dialect;
    }

    /**
     * Creates the required tables and indexes if they do not already exist.
     */
    public void initializeSchema() {
        final String largeText = sqlLargeTextType();

        try (Statement st = connection.createStatement()) {
            final String sqlCreateStates = "CREATE TABLE IF NOT EXISTS " + DatabaseElementNames.TABLE_STATES + " (" +
                    DatabaseElementNames.COL_ID + " " + sqlIntPkAutoType() + " NOT NULL, " +
                    DatabaseElementNames.COL_STATE_SESSION_CREATION_TIME + " BIGINT NOT NULL, " +
                    DatabaseElementNames.COL_STATE_IN_SEQ_NUM + " BIGINT NOT NULL, " +
                    DatabaseElementNames.COL_STATE_OUT_SEQ_NUM + " BIGINT NOT NULL, " +
                    DatabaseElementNames.COL_STATE_SENDER_COMP_ID + " " + largeText + " NOT NULL, " +
                    DatabaseElementNames.COL_STATE_TARGET_COMP_ID + " " + largeText + " NOT NULL, " +
                    DatabaseElementNames.COL_STATE_TERMINATED + " INTEGER NOT NULL, " +
                    DatabaseElementNames.COL_STATE_TOKEN + " " + largeText + " NOT NULL, " +
                    DatabaseElementNames.COL_STATE_VERSION + " " + largeText + " NOT NULL, " +
                    "CHECK (" + DatabaseElementNames.COL_STATE_TERMINATED + " IN (0, 1)), " +
                    "PRIMARY KEY (" + DatabaseElementNames.COL_ID + ")" +
                    ")";
            st.execute(sqlCreateStates);

            final String sqlCreateSummary = "CREATE TABLE IF NOT EXISTS " + DatabaseElementNames.TABLE_SUMMARY + " (" +
                    DatabaseElementNames.COL_ID + " " + sqlBigIntPkAutoType() + " NOT NULL, " +
                    DatabaseElementNames.COL_SUMMARY_STATE_ID + " INTEGER NOT NULL, " +
                    DatabaseElementNames.COL_SUMMARY_IN_OUT + " INTEGER NOT NULL, " +
                    DatabaseElementNames.COL_SUMMARY_IS_ORIGINAL + " INTEGER NOT NULL, " +
                    DatabaseElementNames.COL_SUMMARY_SEQ_NUM + " BIGINT NOT NULL, " +
                    DatabaseElementNames.COL_SUMMARY_TIMESTAMP + " BIGINT NOT NULL, " +
                    DatabaseElementNames.COL_SUMMARY_MESSAGE + " " + largeText + " NOT NULL, " +
                    "CHECK (" + DatabaseElementNames.COL_SUMMARY_IN_OUT + " IN (0, 1)), " +
                    "CHECK (" + DatabaseElementNames.COL_SUMMARY_IS_ORIGINAL + " IN (0, 1)), " +
                    "PRIMARY KEY (" + DatabaseElementNames.COL_ID + "), " +
                    "CONSTRAINT fk_" + DatabaseElementNames.TABLE_SUMMARY + "_" + DatabaseElementNames.COL_SUMMARY_STATE_ID + " " +
                    "FOREIGN KEY (" + DatabaseElementNames.COL_SUMMARY_STATE_ID + ") " +
                    "REFERENCES " + DatabaseElementNames.TABLE_STATES + " (" + DatabaseElementNames.COL_ID + ")" +
                    ")";
            st.execute(sqlCreateSummary);

            try {
                createIndexIfMissing(
                        st,
                        DatabaseElementNames.IDX_STATES_LOOKUP,
                        DatabaseElementNames.TABLE_STATES,
                        sqlStatesLookupIndexColumns()
                );
                createIndexIfMissing(
                        st,
                        DatabaseElementNames.IDX_STATES_CREATED_AT,
                        DatabaseElementNames.TABLE_STATES,
                        sqlStatesCreatedAtIndexColumns()
                );
            } catch (SQLException e) {
                // Best-effort for legacy schemas where identity columns may be non-indexable large text types.
            }

            createIndexIfMissing(
                    st,
                    DatabaseElementNames.IDX_SUMMARY_STATE_INOUT_SEQ,
                    DatabaseElementNames.TABLE_SUMMARY,
                    DatabaseElementNames.COL_SUMMARY_STATE_ID + ", " +
                            DatabaseElementNames.COL_SUMMARY_IN_OUT + ", " +
                            DatabaseElementNames.COL_SUMMARY_SEQ_NUM
            );
            createIndexIfMissing(
                    st,
                    DatabaseElementNames.IDX_SUMMARY_RESEND_LOOKUP,
                    DatabaseElementNames.TABLE_SUMMARY,
                    DatabaseElementNames.COL_SUMMARY_STATE_ID + ", " +
                            DatabaseElementNames.COL_SUMMARY_IN_OUT + ", " +
                            DatabaseElementNames.COL_SUMMARY_SEQ_NUM + ", " +
                            DatabaseElementNames.COL_SUMMARY_IS_ORIGINAL + ", " +
                            DatabaseElementNames.COL_SUMMARY_TIMESTAMP + ", " +
                            DatabaseElementNames.COL_ID
            );

        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Creates the given index when supported by the current dialect.
     */
    private void createIndexIfMissing(Statement st, String indexName, String table, String colsSql) throws SQLException {
        switch (dialect) {
            case POSTGRES:
            case H2: {
                String sql =
                        "CREATE INDEX IF NOT EXISTS " + indexName +
                                " ON " + table +
                                " (" + colsSql + ")";
                st.execute(sql);
                break;
            }
            case MYSQL: {
                String sql =
                        "CREATE INDEX " + indexName +
                                " ON " + table +
                                " (" + colsSql + ")";
                try {
                    st.execute(sql);
                } catch (SQLException e) {
                    // MySQL: Duplicate key name (index already exists)
                    if (e.getErrorCode() != 1061) {
                        throw e;
                    }
                }
                break;
            }
            default:
                throw new IllegalStateException("Unsupported dialect: " + dialect);
        }
    }

    /**
     * Large text SQL type. Kept in one place because it differs across DBs.
     */
    private String sqlLargeTextType() {
        switch (dialect) {
            case POSTGRES:
                return "TEXT";
            case MYSQL:
                return "LONGTEXT";
            case H2:
            default:
                return "CLOB";
        }
    }

    /**
     * States lookup index columns tuned per dialect.
     */
    private String sqlStatesLookupIndexColumns() {
        switch (dialect) {
            case MYSQL:
                return DatabaseElementNames.COL_STATE_SENDER_COMP_ID + "(32), " +
                        DatabaseElementNames.COL_STATE_TARGET_COMP_ID + "(32), " +
                        DatabaseElementNames.COL_STATE_VERSION + "(16), " +
                        DatabaseElementNames.COL_STATE_TOKEN + "(32), " +
                        DatabaseElementNames.COL_ID;
            case POSTGRES:
            case H2:
            default:
                return DatabaseElementNames.COL_STATE_SENDER_COMP_ID + ", " +
                        DatabaseElementNames.COL_STATE_TARGET_COMP_ID + ", " +
                        DatabaseElementNames.COL_STATE_VERSION + ", " +
                        DatabaseElementNames.COL_STATE_TOKEN + ", " +
                        DatabaseElementNames.COL_ID;
        }
    }

    /**
     * State lookup columns that prefer newest generation by creation time and id.
     */
    private String sqlStatesCreatedAtIndexColumns() {
        switch (dialect) {
            case MYSQL:
                return DatabaseElementNames.COL_STATE_SENDER_COMP_ID + "(32), " +
                        DatabaseElementNames.COL_STATE_TARGET_COMP_ID + "(32), " +
                        DatabaseElementNames.COL_STATE_VERSION + "(16), " +
                        DatabaseElementNames.COL_STATE_TOKEN + "(32), " +
                        DatabaseElementNames.COL_STATE_SESSION_CREATION_TIME + ", " +
                        DatabaseElementNames.COL_ID;
            case POSTGRES:
            case H2:
            default:
                return DatabaseElementNames.COL_STATE_SENDER_COMP_ID + ", " +
                        DatabaseElementNames.COL_STATE_TARGET_COMP_ID + ", " +
                        DatabaseElementNames.COL_STATE_VERSION + ", " +
                        DatabaseElementNames.COL_STATE_TOKEN + ", " +
                        DatabaseElementNames.COL_STATE_SESSION_CREATION_TIME + ", " +
                        DatabaseElementNames.COL_ID;
        }
    }

    /**
     * Auto-increment PK for INT.
     */
    private String sqlIntPkAutoType() {
        switch (dialect) {
            case POSTGRES:
                return "INTEGER GENERATED BY DEFAULT AS IDENTITY";
            case MYSQL:
            case H2:
            default:
                return "INTEGER AUTO_INCREMENT";
        }
    }

    /**
     * Auto-increment PK for BIGINT.
     */
    private String sqlBigIntPkAutoType() {
        switch (dialect) {
            case POSTGRES:
                return "BIGINT GENERATED BY DEFAULT AS IDENTITY";
            case MYSQL:
            case H2:
            default:
                return "BIGINT AUTO_INCREMENT";
        }
    }

}

DatabaseTransactionHelper

Transaction helper:


import java.sql.Connection;
import java.sql.SQLException;

/**
 * Helper for running JDBC work within a transaction boundary.
 */
final class DatabaseTransactionHelper {
    @FunctionalInterface
    interface SqlRunnable {
        /**
         * Executes JDBC work within a transaction.
         */
        void run() throws SQLException;
    }

    private final Connection connection;

    /**
     * Creates a helper bound to the provided connection.
     */
    DatabaseTransactionHelper(Connection connection) {
        this.connection = connection;
    }

    /**
     * Runs the work inside a transaction, committing on success and rolling back on failure.
     */
    void inTransaction(SqlRunnable work) {
        final boolean storedAutoCommit = getAutoCommitUnchecked();
        try {
            connection.setAutoCommit(false);
            work.run();
            connection.commit();
        } catch (SQLException e) {
            rollbackUnchecked();
            throw new RuntimeException(e);
        } catch (RuntimeException | Error e) {
            rollbackUnchecked();
            throw e;
        } finally {
            setAutoCommitUnchecked(storedAutoCommit);
        }
    }

    /**
     * Reads auto-commit without checked exception leakage.
     */
    private boolean getAutoCommitUnchecked() {
        try {
            return connection.getAutoCommit();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Sets auto-commit without checked exception leakage.
     */
    private void setAutoCommitUnchecked(boolean value) {
        try {
            connection.setAutoCommit(value);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Attempts a rollback without checked exception leakage.
     */
    private void rollbackUnchecked() {
        try {
            connection.rollback();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}

DatabaseElementNames

Schema names and constants:


/**
 * Defines table, column, and index names used by the database storage schema.
 */
public class DatabaseElementNames {
    // --- table names ---
    static final String TABLE_STATES  = "states";
    static final String TABLE_SUMMARY = "summary";
    // --- common name for the ID column ---
    static final String COL_ID = "id";
    // --- states columns ---
    static final String COL_STATE_SESSION_CREATION_TIME = "sessioncreatedat";
    static final String COL_STATE_IN_SEQ_NUM            = "incomingseqnum";
    static final String COL_STATE_OUT_SEQ_NUM           = "outgoingseqnum";
    static final String COL_STATE_SENDER_COMP_ID        = "sendercompid";
    static final String COL_STATE_TARGET_COMP_ID        = "targetcompid";
    static final String COL_STATE_TERMINATED            = "isterminated";
    static final String COL_STATE_TOKEN                 = "sessiontoken";
    static final String COL_STATE_VERSION               = "fixversion";
    static final String IDX_STATES_LOOKUP               = "idx_states_lookup";
    static final String IDX_STATES_CREATED_AT           = "idx_states_created_at";
    // --- summary columns ---
    static final String COL_SUMMARY_STATE_ID     = "stateid";
    static final String COL_SUMMARY_IN_OUT       = "direction";
    static final String COL_SUMMARY_SEQ_NUM      = "msgseqnum";
    // --- summary index name ---
    static final String IDX_SUMMARY_STATE_INOUT_SEQ =
            "idx_" + TABLE_SUMMARY + "_" + COL_SUMMARY_STATE_ID + "_" + COL_SUMMARY_IN_OUT + "_" + COL_SUMMARY_SEQ_NUM;
    static final String IDX_SUMMARY_RESEND_LOOKUP =
            "idx_" + TABLE_SUMMARY + "_" + COL_SUMMARY_STATE_ID + "_" + COL_SUMMARY_IN_OUT + "_resend";
    static final String COL_SUMMARY_TIMESTAMP    = "sendingtime";
    static final String COL_SUMMARY_IS_ORIGINAL  = "isoriginal";
    static final String COL_SUMMARY_MESSAGE      = "rawmessage";
}

DatabaseDialect

Supported database dialects:


/**
 * Supported database dialects for the storage schema.
 */
public enum DatabaseDialect {
    H2,
    MYSQL,
    POSTGRES
}

Storage Performance Meter

Description

This app measures session storage performance.

Usage

  • Run the sample:
    • win: StoragePerfMeter.bat
    • linux: StoragePerfMeter.sh
  • Clean everything:
    • win: clean.bat
    • linux: clean.sh

Source Code

Performance meter main class:


import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.SessionId;
import biz.onixs.fix.engine.storage.SessionStorage;
import biz.onixs.fix.engine.storage.StorageRepository;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.sample.storage.database.DatabaseStorageRepository;
import biz.onixs.util.ByteBuffer;
import biz.onixs.util.PrecisionTimer;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This app measures session storage performance.
 */
public class StoragePerfMeter {
    private static final Logger LOG = LoggerFactory.getLogger(StoragePerfMeter.class);
    private static final String SETTINGS_RESOURCE = "sample/DatabaseSessionStorage.properties";
    private final Settings storageSettings = new PropertyBasedSettings(SETTINGS_RESOURCE);
    private static final String MSG = "8=FIX.4.0\u00019=82\u000135=D\u000149=0\u000156=0\u000134=1\u0001" +
                                        "52=99990909-17:17:17\u000111=ClOrdID\u000121=1\u000155=IBM\u0001" +
                                        "54=1\u000138=1000\u000140=1\u000110=014\u0001";
    private static final int MSG_NUM = 1000000;
    private final ByteBuffer rawMessage = new ByteBuffer();
    private Message message = null;

    private void run() {
        try {
            LOG.info("StoragePerf started");
            Engine.init();
            message = new Message(MSG);
            rawMessage.clear();
            message.assemble(rawMessage);
            test();
        } catch (final RuntimeException e) {
            LOG.error(e.getMessage(), e);
        } finally {
            if (Engine.isInited()) {
                try {
                    Engine.getInstance().shutdown();
                } catch (final RuntimeException e) {
                    LOG.warn("Failed to shutdown engine: {}", e.getMessage());
                }
            }
        }
    }

    private void test() {
        //
        final StorageRepository storageRepository = new DatabaseStorageRepository(storageSettings);
        //
        final SessionId sessionId = new SessionId("A", "B", Version.FIX50_SP2);
        final PrecisionTimer timer = new PrecisionTimer();
        //
        final SessionStorage sessionStorage = storageRepository.create(sessionId, true);
        //
        LOG.info("Storing {} messages...", MSG_NUM);
        timer.start();
        for (int i = 0; MSG_NUM > i; i++) {
            sessionStorage.storeOutboundMessage(rawMessage, message.getSeqNum(), message.isOriginal(), false);
        }
        timer.stop();
        LOG.info("Messages stored in {} msecs.", timer.getElapsedMilliSeconds());
        sessionStorage.close(false);
        //
        timer.reset();
        LOG.info("Re-opening the session storage...");
        timer.start();
        final SessionStorage sessionStorageReopen = storageRepository.create(sessionId, false);
        timer.stop();
        LOG.info("Session storage is re-opened in {} msecs.", timer.getElapsedMilliSeconds());
        sessionStorageReopen.close(false);
    }

    public static void main(final String[] args) {
        (new StoragePerfMeter()).run();
    }
}