Samples :: Message Queue

Message Queue

Description

CME iLink 3 Binary Order Entry that connects to the pre-defined host and port. When the session is established, the ‘SingleOrder - New’(MsgType=‘D’) SME message is sent to the counterparty. Incoming messages stored in queue and processed in the different thread

Usage

  • Run the sample:
    • win: MessageQueue.bat
    • linux: MessageQueue.sh
  • Clean everything:
    • win: clean.bat
    • linux: clean.sh

Source Code

import biz.onixs.cme.ilink3.handler.ApplicationLayerMsgType;
import biz.onixs.cme.ilink3.handler.CME;
import biz.onixs.cme.ilink3.handler.Handler;
import biz.onixs.cme.ilink3.handler.Session;
import biz.onixs.cme.ilink3.handler.SessionState;
import biz.onixs.cme.ilink3.handler.Tag;
import biz.onixs.cme.ilink3.handler.session.*;
import biz.onixs.sbe.ByteDecoder;
import biz.onixs.sbe.IMessage;
import biz.onixs.util.ScaledDecimal;
import biz.onixs.util.Utils;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Message Queue sample.
 */
public class MessageQueue implements InboundSessionMessageListener, InboundApplicationMessageListener,
        OutboundSessionMessageListener, OutboundApplicationMessageListener,
        StateChangeListener, WarningListener, ErrorListener, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MessageQueue.class);
    private static final String SETTINGS_RESOURCE = "sample/MessageQueue.properties";
    private final int marketSegmentId;
    private final String host;
    private final int port;
    private Session session = null;
    private Settings settings = null;
    private final BlockingQueue<IMessage> messageQueue = new LinkedBlockingQueue<>();
    private final MessageProcessor messageProcessor = new MessageProcessor(messageQueue);
    private static final long PartyDetailsListReqId = 11012;

    public MessageQueue(final int marketSegmentId, final String host, final int port) {
        this.marketSegmentId = marketSegmentId;
        this.host = host;
        this.port = port;
    }

    public void run() {
        try {
            LOG.info("Message Queue Sample");
            LOG.info("The application is starting...");
            //
            LOG.info("Loading settings from: {}", SETTINGS_RESOURCE);
            settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
            //
            LOG.info("Starting the Handler...");
            final Handler handler = Handler.init(settings);
            //
            messageProcessor.start();
            //
            createSession();
            //
            subscribeListeners();
            //
            establishConnection();
            //
            Utils.waitForEnter("\nPress \"Enter\" to send the order.\n");
            //
            final IMessage order = createLimitOrder(session.getDecoder(), 205787, 100, 5);
            session.send(order);
            //
            Utils.waitForEnter("\nPress \"Enter\" to disconnect the session and terminate the application.\n");
            session.terminate();
            //
            messageProcessor.stop();
            //
            LOG.info("Handler shutdown...");
            handler.shutdown();
        } catch (final RuntimeException e) {
            LOG.error(e.getMessage(), e);
        } finally {
            LOG.info("The application is stopped.");
        }
    }

    private void createSession() {
        final SessionSettings sessionSettings = new SessionSettings();
        sessionSettings.init(settings);
        session = new Session(sessionSettings, marketSegmentId);
    }

    private void subscribeListeners() {
        session.addStateChangeListener(this);
        session.setOutboundSessionMessageListener(this);
        session.setOutboundApplicationMessageListener(this);
        session.setInboundSessionMessageListener(this);
        session.setInboundApplicationMessageListener(this);
        session.setErrorListener(this);
        session.setWarningListener(this);
    }

    private void establishConnection() {
        LOG.info("Establishing connection to {}:{} ...", host, port);
        session.logon(host, port);
    }

    public static IMessage createLimitOrder(final ByteDecoder decoder, final int securityId, final int securityPrice,
                                            final int orderQuantity) {
        final byte[] buffer = new byte[1024];
        Arrays.fill(buffer, (byte) 0);
        final IMessage message = decoder.encode(buffer, 0, buffer.length, ApplicationLayerMsgType.NewOrderSingleId);
        setCommonOrderFields(message, securityId, orderQuantity);
        message.setChar(Tag.OrdType, CME.OrderTypeReq.Limit);
        final ScaledDecimal priceDecimal = new ScaledDecimal(securityPrice, -9);
        message.setDecimal(Tag.Price, priceDecimal);
        return message;
    }

    private static void setCommonOrderFields(final IMessage message, final int securityId, final int orderQuantity) {
        message.setLong(Tag.PartyDetailsListReqID, PartyDetailsListReqId);
        message.setInt(Tag.SecurityID, securityId);
        message.setUnsignedInt(Tag.OrderQty, orderQuantity);
        message.setUnsignedByte(Tag.Side, CME.SideReq.Sell);
        message.setString(Tag.SenderID, "GFP");
        message.setString(Tag.ClOrdID, "SomeOrderID");
        message.setLong(Tag.OrderRequestID, 1L);
        message.setString(Tag.Location, "UK");
        message.setUnsignedByte(Tag.TimeInForce, CME.TimeInForce.Day);
        message.setUnsignedByte(Tag.ManualOrderIndicator, CME.ManualOrdIndReq.Automated);
        message.setUnsignedByte(Tag.ExecInst, CME.ExecInst.AON);
        message.setUnsignedByte(Tag.ExecutionMode, CME.ExecMode.Aggressive);
    }

    @Override
    public void onInboundSessionMessage(final Object sender, final InboundSessionMessageArgs args) {
        final IMessage message = session.getSettings().isInboundMessageReuse() ?
                args.getMsg().clone() : args.getMsg();
        LOG.info("Incoming session-level message: {}", message);
        try {
            messageQueue.put(message);
        } catch (final InterruptedException e) {
            LOG.error("Error adding message to processing queue: " + args.getMsg(), e);
        }
    }

    @Override
    public void onInboundApplicationMessage(final Object sender, final InboundApplicationMessageArgs args) {
        final IMessage message = session.getSettings().isInboundMessageReuse() ?
                args.getMsg().clone() : args.getMsg();
        LOG.info("Incoming application-level message: {}", message);
        try {
            messageQueue.put(message);
        } catch (final InterruptedException e) {
            LOG.error("Error adding message to processing queue: " + args.getMsg(), e);
        }
    }

    @Override
    public void onOutboundSessionMessage(final Object sender, final OutboundSessionMessageArgs args) {
        final IMessage message = args.getMsg();
        LOG.info("Outgoing session-level message: {}", message);
    }

    @Override
    public void onOutboundApplicationMessage(final Object sender, final OutboundApplicationMessageArgs args) {
        final IMessage message = args.getMsg();
        LOG.info("Outgoing application-level message: {}", message);
    }

    @Override
    public void onStateChange(final Object sender, final StateChangeArgs args) {
        final SessionState newState = args.getNewState();
        final SessionState prevState = args.getPrevState();
        LOG.info("Session state changed from {} to {}", prevState, newState);
    }

    @Override
    public void onWarning(final Object sender, final WarningArgs args) {
        LOG.warn("{}", args.getDescription());
    }

    @Override
    public void onError(final Object sender, final ErrorArgs args) {
        LOG.error("{}", args.getDescription());
    }

    public static void main(final String[] args) {
        if (3 > args.length) {
            LOG.error("Usage: [MarketSegmentId] [Host] [Port]");
        } else {
            final int marketSegmentId = Integer.parseInt(args[0]);
            final String host = args[1];
            final int port = Integer.parseInt(args[2]);
            final MessageQueue messageQueue = new MessageQueue(marketSegmentId, host, port);
            messageQueue.run();
        }
    }
}

The following code is used in the sample:

import biz.onixs.sbe.IMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;

public class MessageProcessor implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MessageProcessor.class);
    private static final long PROCESSING_PAUSE = 1L;
    private final BlockingQueue<IMessage> messageQueue;
    private Thread thread = null;
    private boolean running = true;

    public MessageProcessor(final BlockingQueue<IMessage> messageQueue) {
        this.messageQueue = messageQueue;
    }

    public void start() {
        thread = new Thread(this, "MessageProcessor");
        thread.start();
    }

    public void stop() {
        running = false;
        if (!thread.isInterrupted()) {
            thread.interrupt();
        }
    }

    public void run() {
        LOG.info("MessageProcessor thread started.");
        while (running) {
            LOG.info("MessageProcessor is checking for messages...");
            try {
                final IMessage message = messageQueue.take();
                if (running) {
                    process(message);
                }
            } catch (final InterruptedException ignored) {
            }
        }
        LOG.info("MessageProcessor thread stopped.");
    }

    private static void process(final IMessage message) throws InterruptedException {
        LOG.info("Processing message for {} secs: {}", PROCESSING_PAUSE, message);
        Thread.sleep(PROCESSING_PAUSE * 1000L);
        // time-consuming logic goes here
        LOG.info("Processed.");
    }
}