Samples :: Message Queue
Message Queue
Description
CME iLink 3 Binary Order Entry that connects to the pre-defined host and port. When the session is established, the ‘SingleOrder - New’(MsgType=‘D’) SME message is sent to the counterparty. Incoming messages stored in queue and processed in the different thread
Usage
- Run the sample:
- win: MessageQueue.bat
- linux: MessageQueue.sh
- Clean everything:
- win: clean.bat
- linux: clean.sh
Source Code
import biz.onixs.cme.ilink3.handler.ApplicationLayerMsgType;
import biz.onixs.cme.ilink3.handler.CME;
import biz.onixs.cme.ilink3.handler.Handler;
import biz.onixs.cme.ilink3.handler.Session;
import biz.onixs.cme.ilink3.handler.SessionState;
import biz.onixs.cme.ilink3.handler.Tag;
import biz.onixs.cme.ilink3.handler.session.*;
import biz.onixs.sbe.ByteDecoder;
import biz.onixs.sbe.IMessage;
import biz.onixs.util.ScaledDecimal;
import biz.onixs.util.Utils;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Message Queue sample.
*/
public class MessageQueue implements InboundSessionMessageListener, InboundApplicationMessageListener,
OutboundSessionMessageListener, OutboundApplicationMessageListener,
StateChangeListener, WarningListener, ErrorListener, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MessageQueue.class);
private static final String SETTINGS_RESOURCE = "sample/MessageQueue.properties";
private final int marketSegmentId;
private final String host;
private final int port;
private Session session = null;
private Settings settings = null;
private final BlockingQueue<IMessage> messageQueue = new LinkedBlockingQueue<>();
private final MessageProcessor messageProcessor = new MessageProcessor(messageQueue);
private static final long PartyDetailsListReqId = 11012;
public MessageQueue(final int marketSegmentId, final String host, final int port) {
this.marketSegmentId = marketSegmentId;
this.host = host;
this.port = port;
}
public void run() {
try {
LOG.info("Message Queue Sample");
LOG.info("The application is starting...");
//
LOG.info("Loading settings from: {}", SETTINGS_RESOURCE);
settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
//
LOG.info("Starting the Handler...");
final Handler handler = Handler.init(settings);
//
messageProcessor.start();
//
createSession();
//
subscribeListeners();
//
establishConnection();
//
Utils.waitForEnter("\nPress \"Enter\" to send the order.\n");
//
final IMessage order = createLimitOrder(session.getDecoder(), 205787, 100, 5);
session.send(order);
//
Utils.waitForEnter("\nPress \"Enter\" to disconnect the session and terminate the application.\n");
session.terminate();
//
messageProcessor.stop();
//
LOG.info("Handler shutdown...");
handler.shutdown();
} catch (final RuntimeException e) {
LOG.error(e.getMessage(), e);
} finally {
LOG.info("The application is stopped.");
}
}
private void createSession() {
final SessionSettings sessionSettings = new SessionSettings();
sessionSettings.init(settings);
session = new Session(sessionSettings, marketSegmentId);
}
private void subscribeListeners() {
session.addStateChangeListener(this);
session.setOutboundSessionMessageListener(this);
session.setOutboundApplicationMessageListener(this);
session.setInboundSessionMessageListener(this);
session.setInboundApplicationMessageListener(this);
session.setErrorListener(this);
session.setWarningListener(this);
}
private void establishConnection() {
LOG.info("Establishing connection to {}:{} ...", host, port);
session.logon(host, port);
}
public static IMessage createLimitOrder(final ByteDecoder decoder, final int securityId, final int securityPrice,
final int orderQuantity) {
final byte[] buffer = new byte[1024];
Arrays.fill(buffer, (byte) 0);
final IMessage message = decoder.encode(buffer, 0, buffer.length, ApplicationLayerMsgType.NewOrderSingleId);
setCommonOrderFields(message, securityId, orderQuantity);
message.setChar(Tag.OrdType, CME.OrderTypeReq.Limit);
final ScaledDecimal priceDecimal = new ScaledDecimal(securityPrice, -9);
message.setDecimal(Tag.Price, priceDecimal);
return message;
}
private static void setCommonOrderFields(final IMessage message, final int securityId, final int orderQuantity) {
message.setLong(Tag.PartyDetailsListReqID, PartyDetailsListReqId);
message.setInt(Tag.SecurityID, securityId);
message.setUnsignedInt(Tag.OrderQty, orderQuantity);
message.setUnsignedByte(Tag.Side, CME.SideReq.Sell);
message.setString(Tag.SenderID, "GFP");
message.setString(Tag.ClOrdID, "SomeOrderID");
message.setLong(Tag.OrderRequestID, 1L);
message.setString(Tag.Location, "UK");
message.setUnsignedByte(Tag.TimeInForce, CME.TimeInForce.Day);
message.setUnsignedByte(Tag.ManualOrderIndicator, CME.ManualOrdIndReq.Automated);
message.setUnsignedByte(Tag.ExecInst, CME.ExecInst.AON);
message.setUnsignedByte(Tag.ExecutionMode, CME.ExecMode.Aggressive);
}
@Override
public void onInboundSessionMessage(final Object sender, final InboundSessionMessageArgs args) {
final IMessage message = session.getSettings().isInboundMessageReuse() ?
args.getMsg().clone() : args.getMsg();
LOG.info("Incoming session-level message: {}", message);
try {
messageQueue.put(message);
} catch (final InterruptedException e) {
LOG.error("Error adding message to processing queue: " + args.getMsg(), e);
}
}
@Override
public void onInboundApplicationMessage(final Object sender, final InboundApplicationMessageArgs args) {
final IMessage message = session.getSettings().isInboundMessageReuse() ?
args.getMsg().clone() : args.getMsg();
LOG.info("Incoming application-level message: {}", message);
try {
messageQueue.put(message);
} catch (final InterruptedException e) {
LOG.error("Error adding message to processing queue: " + args.getMsg(), e);
}
}
@Override
public void onOutboundSessionMessage(final Object sender, final OutboundSessionMessageArgs args) {
final IMessage message = args.getMsg();
LOG.info("Outgoing session-level message: {}", message);
}
@Override
public void onOutboundApplicationMessage(final Object sender, final OutboundApplicationMessageArgs args) {
final IMessage message = args.getMsg();
LOG.info("Outgoing application-level message: {}", message);
}
@Override
public void onStateChange(final Object sender, final StateChangeArgs args) {
final SessionState newState = args.getNewState();
final SessionState prevState = args.getPrevState();
LOG.info("Session state changed from {} to {}", prevState, newState);
}
@Override
public void onWarning(final Object sender, final WarningArgs args) {
LOG.warn("{}", args.getDescription());
}
@Override
public void onError(final Object sender, final ErrorArgs args) {
LOG.error("{}", args.getDescription());
}
public static void main(final String[] args) {
if (3 > args.length) {
LOG.error("Usage: [MarketSegmentId] [Host] [Port]");
} else {
final int marketSegmentId = Integer.parseInt(args[0]);
final String host = args[1];
final int port = Integer.parseInt(args[2]);
final MessageQueue messageQueue = new MessageQueue(marketSegmentId, host, port);
messageQueue.run();
}
}
}
The following code is used in the sample:
import biz.onixs.sbe.IMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
public class MessageProcessor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MessageProcessor.class);
private static final long PROCESSING_PAUSE = 1L;
private final BlockingQueue<IMessage> messageQueue;
private Thread thread = null;
private boolean running = true;
public MessageProcessor(final BlockingQueue<IMessage> messageQueue) {
this.messageQueue = messageQueue;
}
public void start() {
thread = new Thread(this, "MessageProcessor");
thread.start();
}
public void stop() {
running = false;
if (!thread.isInterrupted()) {
thread.interrupt();
}
}
public void run() {
LOG.info("MessageProcessor thread started.");
while (running) {
LOG.info("MessageProcessor is checking for messages...");
try {
final IMessage message = messageQueue.take();
if (running) {
process(message);
}
} catch (final InterruptedException ignored) {
}
}
LOG.info("MessageProcessor thread stopped.");
}
private static void process(final IMessage message) throws InterruptedException {
LOG.info("Processing message for {} secs: {}", PROCESSING_PAUSE, message);
Thread.sleep(PROCESSING_PAUSE * 1000L);
// time-consuming logic goes here
LOG.info("Processed.");
}
}