Samples :: Message Queue
Message Queue
Description
CME iLink 3 Binary Order Entry sample that connects to the configured host and port. After the
session is established, it sends a SingleOrder - New (MsgType D) message. Incoming messages are
queued and processed on a separate thread.
Directory Contents
| Item | Description |
|---|---|
| conf | configuration directory |
| src | source code |
Usage
- Run the sample (scripts are under
src/main/script/):- win: MessageQueue.bat
- linux: MessageQueue.sh
- Clean everything (scripts are under
samples/src/main/script/at the repo root):- win: clean.bat
- linux: clean.sh
Arguments
The sample expects [MarketSegmentId] [Host] [Port]. Set CMD_LINE_ARGS in the script or pass the
arguments when running the main class directly.
Source Code
import biz.onixs.cme.ilink3.handler.ApplicationLayerMsgType;
import biz.onixs.cme.ilink3.handler.CME;
import biz.onixs.cme.ilink3.handler.Handler;
import biz.onixs.cme.ilink3.handler.Session;
import biz.onixs.cme.ilink3.handler.SessionState;
import biz.onixs.cme.ilink3.handler.Tag;
import biz.onixs.cme.ilink3.handler.session.*;
import biz.onixs.sbe.ByteDecoder;
import biz.onixs.sbe.IMessage;
import biz.onixs.util.ScaledDecimal;
import biz.onixs.util.Utils;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Message Queue sample.
*/
public class MessageQueue implements InboundSessionMessageListener, InboundApplicationMessageListener,
OutboundSessionMessageListener, OutboundApplicationMessageListener,
StateChangeListener, WarningListener, ErrorListener, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MessageQueue.class);
private static final String SETTINGS_RESOURCE = "sample/MessageQueue.properties";
private final int marketSegmentId;
private final String host;
private final int port;
private Session session = null;
private Settings settings = null;
private final BlockingQueue<IMessage> messageQueue = new LinkedBlockingQueue<>();
private final MessageProcessor messageProcessor = new MessageProcessor(messageQueue);
private static final long PartyDetailsListReqId = 11012;
public MessageQueue(final int marketSegmentId, final String host, final int port) {
this.marketSegmentId = marketSegmentId;
this.host = host;
this.port = port;
}
public void run() {
try {
LOG.info("Message Queue Sample");
LOG.info("The application is starting...");
//
LOG.info("Loading settings from: {}", SETTINGS_RESOURCE);
settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
//
LOG.info("Starting the Handler...");
// Init the Handler singleton once per process; shutdown in finally.
Handler.init(settings);
//
messageProcessor.start();
//
createSession();
//
subscribeListeners();
//
establishConnection();
//
Utils.waitForEnter("\nPress \"Enter\" to send the order.\n");
//
// Send one order; inbound/outbound messages are queued for processing.
final IMessage order = createLimitOrder(session.getDecoder(), 205787, 100, 5);
session.send(order);
//
Utils.waitForEnter("\nPress \"Enter\" to disconnect the session and terminate the application.\n");
session.terminate();
} catch (final RuntimeException e) {
LOG.error(e.getMessage(), e);
} finally {
messageProcessor.stop();
if (Handler.isInited()) {
try {
LOG.info("Handler shutdown...");
Handler.getInstance().shutdown();
} catch (final RuntimeException e) {
LOG.error("Handler shutdown error", e);
}
}
LOG.info("The application is stopped.");
}
}
private void createSession() {
final SessionSettings sessionSettings = new SessionSettings();
sessionSettings.init(settings);
session = new Session(sessionSettings, marketSegmentId);
}
private void subscribeListeners() {
session.addStateChangeListener(this);
session.setOutboundSessionMessageListener(this);
session.setOutboundApplicationMessageListener(this);
session.setInboundSessionMessageListener(this);
session.setInboundApplicationMessageListener(this);
session.setErrorListener(this);
session.setWarningListener(this);
}
private void establishConnection() {
LOG.info("Establishing connection to {}:{} ...", host, port);
session.logon(host, port);
}
public static IMessage createLimitOrder(final ByteDecoder decoder, final int securityId, final int securityPrice,
final int orderQuantity) {
final byte[] buffer = new byte[1024];
Arrays.fill(buffer, (byte) 0);
final IMessage message = decoder.encode(buffer, 0, buffer.length, ApplicationLayerMsgType.NewOrderSingleId);
setCommonOrderFields(message, securityId, orderQuantity);
message.setChar(Tag.OrdType, CME.OrderTypeReq.Limit);
final ScaledDecimal priceDecimal = new ScaledDecimal(securityPrice, -9);
message.setDecimal(Tag.Price, priceDecimal);
return message;
}
private static void setCommonOrderFields(final IMessage message, final int securityId, final int orderQuantity) {
message.setLong(Tag.PartyDetailsListReqID, PartyDetailsListReqId);
message.setInt(Tag.SecurityID, securityId);
message.setUnsignedInt(Tag.OrderQty, orderQuantity);
message.setUnsignedByte(Tag.Side, CME.SideReq.Sell);
message.setString(Tag.SenderID, "GFP");
message.setString(Tag.ClOrdID, "SomeOrderID");
message.setLong(Tag.OrderRequestID, 1L);
message.setString(Tag.Location, "UK");
message.setUnsignedByte(Tag.TimeInForce, CME.TimeInForce.Day);
message.setUnsignedByte(Tag.ManualOrderIndicator, CME.ManualOrdIndReq.Automated);
message.setUnsignedByte(Tag.ExecInst, CME.ExecInst.AON);
message.setUnsignedByte(Tag.ExecutionMode, CME.ExecMode.Aggressive);
}
@Override
public void onInboundSessionMessage(final Object sender, final InboundSessionMessageArgs args) {
final IMessage message = session.getSettings().isInboundMessageReuse() ?
args.getMsg().clone() : args.getMsg();
LOG.info("Incoming session-level message: {}", message);
try {
messageQueue.put(message);
} catch (final InterruptedException e) {
LOG.error("Error adding message to processing queue: " + args.getMsg(), e);
}
}
@Override
public void onInboundApplicationMessage(final Object sender, final InboundApplicationMessageArgs args) {
final IMessage message = session.getSettings().isInboundMessageReuse() ?
args.getMsg().clone() : args.getMsg();
LOG.info("Incoming application-level message: {}", message);
try {
messageQueue.put(message);
} catch (final InterruptedException e) {
LOG.error("Error adding message to processing queue: " + args.getMsg(), e);
}
}
@Override
public void onOutboundSessionMessage(final Object sender, final OutboundSessionMessageArgs args) {
final IMessage message = args.getMsg();
LOG.info("Outgoing session-level message: {}", message);
}
@Override
public void onOutboundApplicationMessage(final Object sender, final OutboundApplicationMessageArgs args) {
final IMessage message = args.getMsg();
LOG.info("Outgoing application-level message: {}", message);
}
@Override
public void onStateChange(final Object sender, final StateChangeArgs args) {
final SessionState newState = args.getNewState();
final SessionState prevState = args.getPrevState();
LOG.info("Session state changed from {} to {}", prevState, newState);
}
@Override
public void onWarning(final Object sender, final WarningArgs args) {
LOG.warn("{}", args.getDescription());
}
@Override
public void onError(final Object sender, final ErrorArgs args) {
LOG.error("{}", args.getDescription());
}
public static void main(final String[] args) {
if (3 > args.length) {
LOG.error("Usage: [MarketSegmentId] [Host] [Port]");
} else {
final int marketSegmentId = Integer.parseInt(args[0]);
final String host = args[1];
final int port = Integer.parseInt(args[2]);
final MessageQueue messageQueue = new MessageQueue(marketSegmentId, host, port);
messageQueue.run();
}
}
}
The following code is used in the sample:
import biz.onixs.sbe.IMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
public class MessageProcessor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MessageProcessor.class);
private static final long PROCESSING_PAUSE = 1L;
private final BlockingQueue<IMessage> messageQueue;
private Thread thread = null;
private volatile boolean running = true;
public MessageProcessor(final BlockingQueue<IMessage> messageQueue) {
this.messageQueue = messageQueue;
}
public void start() {
thread = new Thread(this, "MessageProcessor");
thread.start();
}
public void stop() {
running = false;
final Thread activeThread = thread;
if (null == activeThread) {
return;
}
if (activeThread == Thread.currentThread()) {
activeThread.interrupt();
return;
}
// Cooperative shutdown; interrupt take() and wait briefly.
activeThread.interrupt();
try {
activeThread.join(2000L);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void run() {
LOG.info("MessageProcessor thread started.");
while (running) {
LOG.info("MessageProcessor is checking for messages...");
try {
final IMessage message = messageQueue.take();
if (!running) {
break;
}
process(message);
} catch (final InterruptedException e) {
// Exit cleanly on shutdown interrupt.
if (!running) {
Thread.currentThread().interrupt();
break;
}
}
}
LOG.info("MessageProcessor thread stopped.");
}
private static void process(final IMessage message) throws InterruptedException {
LOG.info("Processing message for {} secs: {}", PROCESSING_PAUSE, message);
Thread.sleep(PROCESSING_PAUSE * 1000L);
// time-consuming logic goes here
LOG.info("Processed.");
}
}
Java CME iLink3 Handler