Samples :: Message Throttling

Throttling Buy Side

Description

The sample illustrates throttling for buy side and for outgoing messages.

This sample can be run together with the “ThrottlingSellSide” sample. The “ThrottlingSellSide” must be started first.

Usage

  • Run the sample:
    • win: 2-ThrottlingBuySide.bat
    • linux: 2-ThrottlingBuySide.sh

Source Code

import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.Session;
import biz.onixs.fix.engine.SessionState;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX40;
import biz.onixs.fix.tag.Tag;
import biz.onixs.util.TimestampFormat;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Locale;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * FIX Initiator - Buy Side.
 */
public class ThrottlingBuySide implements Session.InboundApplicationMessageListener, Session.StateChangeListener,
        Session.MessageResendingListener, Session.InboundSessionMessageListener, Session.WarningListener,
        Session.ErrorListener, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ThrottlingBuySide.class);
    private static final String SETTINGS_RESOURCE = "sample/ThrottlingBuySide.properties";
    private final Semaphore sessionIsEstablished = new Semaphore(0);
    private final Semaphore orderHandlingComplete = new Semaphore(0);
    private Settings settings = null;
    private Version fixVersion = null;
    private Session session = null;
    private int totalOrdersHandled = 0;
    private int totalMessagesRejected = 0;

    public void run() {
        try {
            LOG.info("ThrottlingBuySide");
            LOG.info("The application is starting...");
            //
            LOG.info("Loading settings from: {}", SETTINGS_RESOURCE);
            settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
            //
            LOG.info("Starting the Engine...");
            final Engine engine = Engine.init(settings);
            //
            createSession();
            //
            establishConnection();
            //
            LOG.info("Waiting for session establishment...");
            sessionIsEstablished.acquire();
            //
            int noOfMessages = settings.getInteger("NoOfMessages");
            for(int i = 0; i < noOfMessages; ++i) {
                final Message order = createOrder();
                session.throttle();
                session.send(order);
                //
                orderHandlingComplete.acquire();
            }
            //
            session.logout("The session is disconnected by ThrottlingBuySide");
            session.dispose();
            //
            LOG.info("Orders handled: " + totalOrdersHandled + ", messages rejected: " + totalMessagesRejected);
            //
            LOG.info("Engine shutdown...");
            engine.shutdown();
        } catch (final Exception e) {
            LOG.error(e.getMessage(), e);
        } finally {
            LOG.info("The application is stopped.");
        }
    }

    private void createSession() {
        fixVersion = Version.getByNumber(settings.getString("FIXVersion"));
        session = new Session(settings.getString("SenderCompID"), settings.getString("TargetCompID"),
                fixVersion, settings.getBoolean("keepSequenceNumbersBetweenFixConnections"));
        //
        session.setSenderSubID("SenderSubID (50) field")
                .setSendingTimeFormat(TimestampFormat.YYYYMMDDHHMMSSMsec)
                .setInboundApplicationMessageListener(this)
                .setInboundSessionMessageListener(this)
                .addStateChangeListener(this)
                .setMessageResendingListener(this)
                .setErrorListener(this)
                .setWarningListener(this);
        updateThrottlingLimit(session);
    }

    private void updateThrottlingLimit(Session session) {
        final int messages = settings.getInteger("ThrottlingMessages");
        if (messages < 0) {
            return;
        }
        final long interval = settings.getLong("ThrottlingInterval");
        final TimeUnit timeUnit = TimeUnit.valueOf(settings.getString("ThrottlingIntervalTimeUnit").toUpperCase(Locale.ROOT));
        session.setThrottlingLimit(messages, interval, timeUnit);
    }

    private void establishConnection() {
        final String host = settings.getString("CounterpartyHost");
        final int port = settings.getInteger("CounterpartyPort");
        LOG.info("Establishing connection to {}:{}...", host, port);
        session.logonAsInitiator(host, port, true);
    }

    private Message createOrder() {
        final Message order = Message.create(FIX40.MsgType.Order_Single, fixVersion);
        order.set(Tag.HandlInst, "1")
                .set(Tag.ClOrdID, "Unique identifier for Order")
                .set(Tag.Symbol, "IBM")
                .set(Tag.Side, "1")
                .set(Tag.OrderQty, 1000)
                .set(Tag.OrdType, "1");
        return order;
    }

    @Override
    public void onInboundApplicationMessage(final Object sender, final Session.InboundApplicationMessageArgs args) {
        final Message message = args.getMsg();
        LOG.info("Incoming application-level message: {}", message);
        if (message.checkType(FIX40.MsgType.Execution_Report)) {
            LOG.info("Execution report received.");
            ++totalOrdersHandled;
            orderHandlingComplete.release();
        }
        // Processing of the incoming application-level message...
    }

    @Override
    public void onInboundSessionMessage(final Object sender, final Session.InboundSessionMessageArgs args) {
        final Message message = args.getMsg();
        LOG.info("Incoming session-level message: {}", message);

        if (message.getType().equals(FIX40.MsgType.Reject)) {
            ++totalMessagesRejected;
            LOG.info("Rejection " + totalMessagesRejected + ": message " + message.get(Tag.RefSeqNum) + " rejected.");
            orderHandlingComplete.release();
        }

        // Processing of the incoming session-level message...
    }

    @Override
    public void onStateChange(final Object sender, final Session.StateChangeArgs args) {
        final SessionState newState = args.getNewState();
        final SessionState prevState = args.getPrevState();
        LOG.info("Session state changed from {} to {}", prevState, newState);
        if (SessionState.ESTABLISHED == newState) {
            sessionIsEstablished.release();
        }
    }

    @Override
    public void onError(final Object sender, final Session.ErrorArgs args) {
        LOG.error("{}", args);
    }

    @Override
    public void onWarning(final Object sender, final Session.WarningArgs args) {
        LOG.warn("{}", args);
    }

    @Override
    public boolean onMessageResending(final Object sender, final Session.MessageResendingArgs args) {
        LOG.info("Message resending request: {}", args.getMsg());
        // Return false if it's necessary to skip this message (GapFill will be sent).
        return false;
    }

    @Override
    public void onMessageResendingStarted(final Object sender, final long beginSeqNum, final long endSeqNum) {
        LOG.info("Message resending is about to start from {} to {} seq. numbers", beginSeqNum, endSeqNum);
    }

    @Override
    public void onMessageResendingFinished(final Object sender, final long beginSeqNum, final long endSeqNum) {
        LOG.info("Message resending is finished from {} to {} seq. numbers", beginSeqNum, endSeqNum);
    }

    public static void main(final String[] args) {
        (new ThrottlingBuySide()).run();
    }
}

Throttling Sell Side

Description

The sample illustrates throttling for sell side and for incoming messages.

This sample can be run together with the “ThrottlingBuySide” sample. The “ThrottlingSellSide” must be started first.

Usage

  • Run the sample:
    • win: 2-ThrottlingSellSide.bat
    • linux: 2-ThrottlingSellSide.sh

Source Code

import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.Session;
import biz.onixs.fix.engine.SessionState;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX40;
import biz.onixs.fix.tag.Tag;
import biz.onixs.util.GuidGenerator;
import biz.onixs.util.Throttler;
import biz.onixs.util.TimestampFormat;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Locale;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * Processes incoming messages.
 */
public class ThrottlingSellSide implements Session.InboundApplicationMessageListener, Session.StateChangeListener,
        Session.ErrorListener, Session.WarningListener, Session.MessageResendingListener,
        Session.InboundSessionMessageListener, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ThrottlingSellSide.class);
    private static final String SETTINGS_RESOURCE = "sample/ThrottlingSellSide.properties";
    private final Semaphore sessionIsDisconnected = new Semaphore(0);
    private final Semaphore sessionIsEstablished = new Semaphore(0);
    private final GuidGenerator guidGenerator = new GuidGenerator();
    private Settings settings = null;
    private Version fixVersion = null;
    private Session session = null;
    private Throttler throttler = new Throttler();
    private int totalOrdersHandled = 0;
    private int totalMessagesRejected = 0;

    public void run() {
        try {
            LOG.info("ThrottlingSellSide");
            LOG.info("The application is starting...");
            //
            LOG.info("Loading settings from: {}", SETTINGS_RESOURCE);
            settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
            //
            updateThrottlingLimit();
            //
            LOG.info("Starting the Engine...");
            final Engine engine = Engine.init(settings);
            //
            createSession();
            session.logonAsAcceptor();
            sessionIsDisconnected.acquire();
            //
            LOG.info("Waiting for connection...");
            sessionIsEstablished.acquire();
            //
            LOG.info("Waiting for disconnection...");
            sessionIsDisconnected.acquire();
            session.logout();
            session.dispose();
            //
            LOG.info("Orders handled: " + totalOrdersHandled + ", messages rejected: " + totalMessagesRejected);
            //
            LOG.info("Engine shutdown...");
            engine.shutdown();
        } catch (final Exception e) {
            LOG.error(e.getMessage(), e);
        } finally {
            LOG.info("The application is stopped.");
        }
    }

    private void createSession() {
        fixVersion = Version.getByNumber(settings.getString("FIXVersion"));
        session = new Session(settings.getString("SenderCompID"),
                settings.getString("TargetCompID"),
                fixVersion, settings.getBoolean("keepSequenceNumbersBetweenFixConnections"));
        //
        session.setInboundApplicationMessageListener(this)
                .setInboundSessionMessageListener(this)
                .addStateChangeListener(this)
                .setErrorListener(this)
                .setWarningListener(this)
                .setMessageResendingListener(this)
                .setSendingTimeFormat(TimestampFormat.YYYYMMDDHHMMSSMsec);
    }

    private void updateThrottlingLimit() {
        final int messages = settings.getInteger("ThrottlingMessages");
        if (messages < 0) {
            return;
        }
        final long interval = settings.getLong("ThrottlingInterval");
        final TimeUnit timeUnit = TimeUnit.valueOf(settings.getString("ThrottlingIntervalTimeUnit").toUpperCase(Locale.ROOT));
        throttler.reset(messages, timeUnit.toMillis(interval));
    }

    private Message createExecutionReport(final Message order) {
        final Message execReport = Message.create(FIX40.MsgType.Execution_Report, fixVersion);
        execReport.set(Tag.OrderID, order.get(Tag.ClOrdID))
                .set(Tag.ExecID, guidGenerator.generate())
                .set(Tag.ExecTransType, "0")
                .set(Tag.OrdStatus, "0")
                .set(Tag.Symbol, order.get(Tag.Symbol))
                .set(Tag.Side, order.get(Tag.Side))
                .set(Tag.LastQty, order.get(Tag.OrderQty))
                .set(Tag.LastPx, "100.0")
                .set(Tag.OrderQty, order.get(Tag.OrderQty))
                .set(Tag.CumQty, order.get(Tag.OrderQty))
                .set(Tag.AvgPx, "100.0");
        return execReport;
    }

    @Override
    public void onInboundApplicationMessage(final Object sender, final Session.InboundApplicationMessageArgs args) {
        final Message message = args.getMsg();
        LOG.info("Incoming application-level message: {}", message);

        // Throttle message
        synchronized (throttler) {
            if (throttler.tryAdd(System.currentTimeMillis()) != 0) {
                // Reject the message
                final Message rejectMessage = Message.create(FIX40.MsgType.Reject, fixVersion);
                rejectMessage.set(Tag.RefSeqNum, message.getSeqNum());
                session.send(rejectMessage);
                ++totalMessagesRejected;
                LOG.info("Rejection " + totalMessagesRejected + ": message " + message.getSeqNum() + " rejected due to throttling limit exhausting.");
                return;
            }
        }

        if (message.checkType(FIX40.MsgType.Order_Single)) {
            LOG.info("Order received.");
            final Message executionReport = createExecutionReport(message);
            LOG.info("Sending execution report in response: {}", executionReport);
            ++totalOrdersHandled;
            session.send(executionReport);
        }
    }

    @Override
    public void onInboundSessionMessage(final Object sender, final Session.InboundSessionMessageArgs args) {
        final Message message = args.getMsg();
        LOG.info("Incoming session-level message: {}", message);
        // Processing of the incoming session-level message...
    }

    @Override
    public void onStateChange(final Object sender, final Session.StateChangeArgs args) {
        final SessionState newState = args.getNewState();
        final SessionState prevState = args.getPrevState();
        LOG.info("Session state changed from {} to {}", prevState, newState);
        if (SessionState.AWAIT_LOGON == newState) {
            sessionIsDisconnected.release();
        } else if (SessionState.ESTABLISHED == newState) {
            sessionIsEstablished.release();
        }
    }

    @Override
    public void onError(final Object sender, final Session.ErrorArgs args) {
        LOG.error("{}", args);
    }

    @Override
    public void onWarning(final Object sender, final Session.WarningArgs args) {
        LOG.warn("{}", args);
    }

    @Override
    public boolean onMessageResending(final Object sender, final Session.MessageResendingArgs args) {
        LOG.info("Message resending request: {}", args.getMsg());
        // Return false if it's necessary to skip this message (GapFill will be sent).
        return false;
    }

    @Override
    public void onMessageResendingStarted(final Object sender, final long beginSeqNum, final long endSeqNum) {
        LOG.info("Message resending is about to start from {} to {} seq. numbers", beginSeqNum, endSeqNum);
    }

    @Override
    public void onMessageResendingFinished(final Object sender, final long beginSeqNum, final long endSeqNum) {
        LOG.info("Message resending is finished from {} to {} seq. numbers", beginSeqNum, endSeqNum);
    }

    public static void main(final String[] args) {
        (new ThrottlingSellSide()).run();
    }
}