Samples :: Utils
Message Queue Runner
Description
There is a FIX Engine requirement that session listener can't perform time-consuming task. This sample shows how to use message queue and single thread to perform time-consuming tasks outside session listener. This approach is also known as “Producer-Consumer design pattern”.
Usage
- Run the sample:
- win: MessageQueueRunner.bat
- linux: MessageQueueRunner.sh
- Clean everything:
- win: clean.bat
- linux: clean.sh
Source Code
import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.Session;
import biz.onixs.fix.parser.Group;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX50;
import biz.onixs.fix.tag.Tag;
import biz.onixs.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class MessageQueueRunner implements Session.InboundApplicationMessageListener {
private static final Logger LOG = LoggerFactory.getLogger(MessageQueueRunner.class);
private static final int MSG_NUM = 5;
private final BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
private final MessageProcessor messageProcessor = new MessageProcessor(messageQueue);
public void run() {
Engine.init(-1);
messageProcessor.start();
// Emulating inbound application message event
LOG.info("Messages to process: " + MSG_NUM);
for (int i = 0; MSG_NUM > i; i++) {
final Message message = createDemoMessage(i);
final Session.InboundApplicationMessageArgs args = new Session.InboundApplicationMessageArgs(message);
onInboundApplicationMessage(this, args);
}
Utils.waitForEnterToTerminateApp();
messageProcessor.stop();
Engine.getInstance().shutdown();
}
@Override
public void onInboundApplicationMessage(final Object sender, final Session.InboundApplicationMessageArgs args) {
try {
messageQueue.put(args.getMsg());
} catch (final InterruptedException e) {
LOG.error("Error adding message to processing queue: {}", args.getMsg(), e);
}
}
private static Message createDemoMessage(final int i) {
final Message newsMessage = Message.create(FIX50.MsgType.News, Version.FIX50);
newsMessage.set(Tag.Headline, "Headline " + i);
final Group linesOfTextGrp = newsMessage.setGroup(Tag.LinesOfText, 1);
linesOfTextGrp.set(Tag.Text, 0, "Text " + i);
return newsMessage;
}
public static void main(final String[] args) {
try {
final MessageQueueRunner messageQueueRunner = new MessageQueueRunner();
messageQueueRunner.run();
} catch (final Throwable throwable) {
LOG.error(throwable.getMessage(), throwable);
}
}
}
The following class is used in the sample:
import biz.onixs.fix.parser.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
public class MessageProcessor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MessageProcessor.class);
private static final long PROCESSING_PAUSE = 1L;
private final BlockingQueue<Message> messageQueue;
private Thread thread = null;
private boolean running = true;
public MessageProcessor(final BlockingQueue<Message> messageQueue) {
this.messageQueue = messageQueue;
}
public void start() {
thread = new Thread(this, "MessageProcessor");
thread.start();
}
public void stop() {
running = false;
if (!thread.isInterrupted()) {
thread.interrupt();
}
}
public void run() {
LOG.info("MessageProcessor thread started.");
while (running) {
LOG.info("MessageProcessor is checking for messages...");
try {
final Message message = messageQueue.take();
if (running) {
process(message);
}
} catch (final InterruptedException ignored) {
}
}
LOG.info("MessageProcessor thread stopped.");
}
private static void process(final Message message) throws InterruptedException {
LOG.info("Processing message for {} secs: {}", PROCESSING_PAUSE, message);
Thread.sleep(PROCESSING_PAUSE * 1000L);
// time-consuming logic goes here
LOG.info("Processed.");
}
}
Executor Runner
Description
There is a FIX Engine requirement that session listener can't perform time-consuming task.This sample shows how to use standard thread pool to perform time-consuming tasks outside session listener.
Usage
- Run the sample:
- win: ExecutorRunner.bat
- linux: ExecutorRunner.sh
- Clean everything:
- win: clean.bat
- linux: clean.sh
Source Code
import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.Session;
import biz.onixs.fix.parser.Group;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX50;
import biz.onixs.fix.tag.Tag;
import biz.onixs.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorRunner implements Session.InboundApplicationMessageListener {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorRunner.class);
private static final int MSG_NUM = 5;
private static final int THREAD_POOL_SIZE = 2;
private final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
public void run() {
Engine.init(-1);
// Emulating inbound application message event
LOG.info("Messages to process: " + MSG_NUM);
for (int i = 0; 5 > i; i++) {
final Message message = createDemoMessage(i);
final Session.InboundApplicationMessageArgs args = new Session.InboundApplicationMessageArgs(message);
onInboundApplicationMessage(this, args);
}
Utils.waitForEnterToTerminateApp();
executorService.shutdownNow();
Engine.getInstance().shutdown();
}
@Override
public void onInboundApplicationMessage(final Object sender, final Session.InboundApplicationMessageArgs args) {
executorService.submit(new MessageProcessingTask(args.getMsg()));
}
private static Message createDemoMessage(final int i) {
final Message newsMessage = Message.create(FIX50.MsgType.News, Version.FIX50);
newsMessage.set(Tag.Headline, "Headline " + i);
final Group linesOfTextGrp = newsMessage.setGroup(Tag.LinesOfText, 1);
linesOfTextGrp.set(Tag.Text, 0, "Text " + i);
return newsMessage;
}
public static void main(final String[] args) {
try {
final ExecutorRunner executorRunner = new ExecutorRunner();
executorRunner.run();
} catch (final Throwable throwable) {
LOG.error(throwable.getMessage(), throwable);
}
}
}
The following class is used in the sample:
import biz.onixs.fix.parser.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageProcessingTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MessageProcessingTask.class);
private static final int PROCESSING_PAUSE = 1;
private final Message message;
public MessageProcessingTask(final Message message) {
this.message = message;
}
public void run() {
LOG.info("{}: processing message for {} secs: {}", Thread.currentThread(), PROCESSING_PAUSE,
message);
// time-consuming logic goes here
try {
Thread.sleep((long) (PROCESSING_PAUSE * 1000));
LOG.info("{}: message processed.", Thread.currentThread());
} catch (final InterruptedException ignored) {
}
}
}
Storage Reader
Description
This sample app demonstrates how to access FIX messages in file session storage.
Usage
- Run the sample:
- win: StorageReader.bat
- linux: StorageReader.sh
- Clean everything:
- win: clean.bat
- linux: clean.sh
Source Code
import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.EngineSettings;
import biz.onixs.fix.engine.SessionId;
import biz.onixs.fix.engine.storage.SessionStorage;
import biz.onixs.fix.engine.storage.StorageRepository;
import biz.onixs.fix.engine.storage.StorageRepositoryManager;
import biz.onixs.fix.parser.FixMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class StorageReader {
private static final Logger LOG = LoggerFactory.getLogger(StorageReader.class);
private final Version fixVersion = Version.FIX40;
public static void main(final String[] args) {
final StorageReader storageReader = new StorageReader();
storageReader.run();
}
private void run() {
try {
final EngineSettings engineSettings = new EngineSettings();
engineSettings.setStorageFolder("conf/sample/storage");
Engine.init(engineSettings);
final StorageRepositoryManager storageRepositoryManager = Engine.getInstance().getStorageRepositoryManager();
final StorageRepository storageRepository = storageRepositoryManager.getFileStorageRepository();
final SessionId sessionId = new SessionId("BuySide", "SellSide", fixVersion);
LOG.info("sessionId={}", sessionId);
final SessionStorage sessionStorage = storageRepository.create(sessionId, false);
LOG.info("sessionStorage.getId()='{}'", sessionStorage.getId());
processStorage(sessionStorage);
sessionStorage.close(false);
} catch (final RuntimeException e) {
LOG.error(e.getMessage().replace("\r", "\\r"), e);
} finally {
if (Engine.isInited())
Engine.getInstance().shutdown();
}
}
private static void processStorage(final SessionStorage sessionStorage) {
LOG.info("Getting outgoing messages...");
final long outSeqNum = sessionStorage.getOutSeqNum();
if (0L == outSeqNum) {
LOG.info("No outgoing messages found in the storage.");
} else {
final List<FixMessage> msgs = sessionStorage.getOutboundMessages(1L, outSeqNum);
for (final FixMessage msg : msgs) {
processMessage(msg);
}
}
}
private static void processMessage(final FixMessage message) {
LOG.info("message='{}'", message);
}
}
Message Fields Iteration
Description
This sample app demonstrates how to iterate over all Message fields.
Usage
- Run the sample:
- win: MessageFieldsIteration.bat
- linux: MessageFieldsIteration.sh
- Clean everything:
- win: clean.bat
- linux: clean.sh
Source Code
import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.parser.Field;
import biz.onixs.fix.parser.FixBlock;
import biz.onixs.fix.parser.Group;
import biz.onixs.fix.parser.GroupInstance;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX50;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageFieldsIteration {
private static final Logger LOG = LoggerFactory.getLogger(MessageFieldsIteration.class);
private void run() {
Engine.init(-1);
final Message msg = createMessageWithNestedRepeatingGroups();
iterateOverMessageFields(msg);
Engine.getInstance().shutdown();
}
private void iterateOverMessageFields(final Message msg) {
iterateOverFixBlockFields(msg, "");
}
private void iterateOverFixBlockFields(final FixBlock fixBlock, final String prefix) {
for (final Field fld : fixBlock) {
LOG.info("{}{}" + '=' + "{}", prefix, fld.getTag(), fld.get());
if (fixBlock.hasGroup(fld.getTag())) {
final Group group = fixBlock.getGroup(fld.getTag());
iterateOverGroupInstances(group, prefix);
}
}
}
private void iterateOverGroupInstances(final Group group, final String prefix) {
for (final GroupInstance grpInstance : group) {
iterateOverFixBlockFields(grpInstance, prefix + " ");
}
}
private static Message createMessageWithNestedRepeatingGroups() {
final Message msg = Message.create(FIX50.MsgType.NewOrderSingle, Version.FIX50);
final int CustomTag = 10000;
msg.set(CustomTag, "CustomTag");
// Create the repeating group.
final Group partiesGroup = msg.setGroup(FIX50.Tag.NoPartyIDs, 3);
// Set the given field in all group instances of the repeating group.
int i = 0;
for (final GroupInstance grpInstance : partiesGroup ) {
grpInstance.set(FIX50.Tag.PartyID, "value" + i++);
}
// Create the nested repeating group.
final Group partiesSubGroup = partiesGroup.setGroup(FIX50.Tag.NoPartySubIDs, 0, 6);
// Set the given field in all group instances of the nested repeating group.
i = 0;
for (final GroupInstance grpInstance : partiesSubGroup) {
grpInstance.set(FIX50.Tag.PartySubID, "value" + i++);
}
//
LOG.info("Message with nested repeating group: {}", msg);
//
return msg;
}
public static void main(final String[] args) {
try {
LOG.info("Message Fields Iteration");
LOG.info("The application is starting...");
final MessageFieldsIteration iteration = new MessageFieldsIteration();
iteration.run();
} catch (final Throwable throwable) {
LOG.error(throwable.getMessage(), throwable);
} finally {
LOG.info("The application is stopped.");
}
}
}
Converter Runner
Description
This sample app demonstrates how to convert Fix message to JSON or XML format.
Usage
- Run the sample:
- win: ConverterRunner.bat
- linux: ConverterRunner.sh
- Clean everything:
- win: clean.bat
- linux: clean.sh
Source Code
import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.fixml.Utils;
import biz.onixs.fix.parser.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class ConverterRunner {
private static final Logger LOG = LoggerFactory.getLogger(ConverterRunner.class);
public void run() throws IOException {
Engine.init();
//
final String inFixFile = "sample.converter/MarketDataRequest.txt";
LOG.info("Loading FIX message from file: {}", inFixFile);
final Message inFixMessage = Utils.readFixMessage(inFixFile);
LOG.info("Input FIX message: \n{}", inFixMessage);
//
final MessageToJSONConverter fix2Json = new MessageToJSONConverter(Version.FIX44);
final String jsonMessage = fix2Json.apply(inFixMessage);
LOG.info("Output converted JSON message: {}{}", System.lineSeparator(), jsonMessage);
LOG.info("Output converted out of box JSON message: {}{}", System.lineSeparator(), inFixMessage.toJson());
//
final MessageToXMLConverter fix2Xml = new MessageToXMLConverter(Version.FIX44);
final String xmlMessage = fix2Xml.apply(inFixMessage);
LOG.info("Output converted XML message: {}{}", System.lineSeparator(), xmlMessage);
LOG.info("Output converted out of box XML message: {}{}", System.lineSeparator(), inFixMessage.toXml());
//
Engine.getInstance().shutdown();
}
public static void main(final String[] args) {
try {
final ConverterRunner converterRunner = new ConverterRunner();
converterRunner.run();
} catch (final Throwable throwable) {
LOG.error(throwable.getMessage(), throwable);
}
}
}
The following classes are used in the sample:
import biz.onixs.fix.dictionary.ContainerDefinition;
import biz.onixs.fix.dictionary.Dictionary;
import biz.onixs.fix.dictionary.DictionaryManager;
import biz.onixs.fix.dictionary.FieldDefinition;
import biz.onixs.fix.dictionary.GroupDefinition;
import biz.onixs.fix.dictionary.MessageDefinition;
import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.parser.Field;
import biz.onixs.fix.parser.FixBlock;
import biz.onixs.fix.parser.Group;
import biz.onixs.fix.parser.GroupInstance;
import biz.onixs.fix.parser.Message;
import biz.onixs.util.ValuePtr;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.function.Function;
public class MessageToJSONConverter implements Function<Message, String> {
private static final String PREFIX = " ";
private static final String SPACES = System.lineSeparator() + " \t";
//List of the Header tags according to the FIX5.0SP2 specification
private static final Integer[] header = {8, 9, 35, 49, 56, 115, 128, 90, 91, 34, 50, 142, 57, 143, 116,
144, 129, 145, 43, 97, 52, 122, 212, 213, 347, 369, 1128, 1129, 1156, 627, 628, 629, 630};
private static final Collection<Integer> headerTags = new HashSet<>(Arrays.asList(header));
//List of the Trailer tags according to the FIX5.0SP2 specification
private static final Integer[] trailer = {93, 89, 10};
private static final Collection<Integer> trailerTags = new HashSet<>(Arrays.asList(trailer));
private final Dictionary dictionary;
MessageToJSONConverter(final Version fixVersion) {
dictionary = DictionaryManager.getDictionary(fixVersion);
}
public static String info() {
return "Info";
}
@Override
public String apply(final Message message) {
final StringBuilder strBdr = new StringBuilder();
final MessageDefinition msgDef = dictionary.get(new ValuePtr(message.getType()));
messageToJson(message, msgDef, strBdr);
return strBdr.toString();
}
private static StringBuilder deleteLastCommaIfPresent(final StringBuilder bdr) {
for(int i = bdr.length() - 1; 0 <= i; i--) {
final char c = bdr.charAt(i);
if (0 > SPACES.indexOf(c)) {
if (',' == c) {
return bdr.deleteCharAt(i);
} else {
break;
}
}
}
return bdr;
}
private static void messageToJson(final FixBlock msg, final MessageDefinition def, final StringBuilder strBdr) {
final StringBuilder headerBdr = new StringBuilder(PREFIX + "\"Header\": {" + System.lineSeparator());
final StringBuilder bodyBdr = new StringBuilder(PREFIX + "\"Body\": {" + System.lineSeparator());
final StringBuilder trailerBdr = new StringBuilder(PREFIX + "\"Trailer\": {" + System.lineSeparator());
for (final Field fld : msg) {
final int tag = fld.getTag();
final StringBuilder curBdr = headerTags.contains(tag) ? headerBdr :
(trailerTags.contains(tag) ? trailerBdr : bodyBdr);
fieldToJson(msg, def, fld, curBdr, PREFIX);
curBdr.append(",").append(System.lineSeparator());
}
deleteLastCommaIfPresent(headerBdr).append(PREFIX).append("},").append(System.lineSeparator());
deleteLastCommaIfPresent(bodyBdr).append(PREFIX).append("},").append(System.lineSeparator());
deleteLastCommaIfPresent(trailerBdr).append(PREFIX).append("}").append(System.lineSeparator());
strBdr.append("{").append(System.lineSeparator());
strBdr.append(headerBdr);
strBdr.append(bodyBdr);
strBdr.append(trailerBdr);
strBdr.append("}");
}
private static void fixBlockToJson(final FixBlock fixBlk, final ContainerDefinition def,
final StringBuilder strBdr, final String prefix) {
strBdr.append(prefix).append("{").append(System.lineSeparator());
for (final Iterator<Field> iterator = fixBlk.iterator(); iterator.hasNext(); ) {
final Field fld = iterator.next();
fieldToJson(fixBlk, def, fld, strBdr, prefix);
trailingComma(strBdr, iterator.hasNext());
}
strBdr.append(prefix).append("}");
}
private static void fieldToJson(final FixBlock fixBlk, final ContainerDefinition def, final Field fld,
final StringBuilder strBdr, final String prefix) {
final int tag = fld.getTag();
final FieldDefinition fldDef = def.getFieldByTag(tag);
if (fldDef.isGroupDefined()) {
final Group grp = fixBlk.getGroup(tag);
if (null != grp) {
groupToJson(grp, fldDef, strBdr, prefix + PREFIX);
}
} else {
final String name = getFieldName(fldDef);
final String value = fld.get();
strBdr.append(prefix).append(PREFIX).append("\"").append(name).append("\": \"").append(value).append("\"");
}
}
private static void groupToJson(final Iterable<GroupInstance> grp, final FieldDefinition leadingFldDef,
final StringBuilder strBdr, final String prefix) {
final String name = getFieldName(leadingFldDef);
strBdr.append(prefix).append("\"").append(name).append("\": [").append(System.lineSeparator());
final GroupDefinition grpDef = leadingFldDef.getGroupDefinition();
for (final Iterator<GroupInstance> it = grp.iterator(); it.hasNext(); ) {
final GroupInstance grpInst = it.next();
fixBlockToJson(grpInst, grpDef, strBdr, prefix);
trailingComma(strBdr, it.hasNext());
}
strBdr.append(prefix).append("]");
}
private static void trailingComma(final StringBuilder strBdr, final boolean isCommaRequired) {
if (isCommaRequired) {
strBdr.append(",");
}
strBdr.append(System.lineSeparator());
}
private static String getFieldName(final FieldDefinition def) {
return (null != def.getTagNameBytes()) ?
new String(def.getTagNameBytes(), StandardCharsets.ISO_8859_1) : "undefined";
}
}
import biz.onixs.fix.dictionary.ContainerDefinition;
import biz.onixs.fix.dictionary.Dictionary;
import biz.onixs.fix.dictionary.DictionaryManager;
import biz.onixs.fix.dictionary.FieldDefinition;
import biz.onixs.fix.dictionary.GroupDefinition;
import biz.onixs.fix.dictionary.MessageDefinition;
import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.parser.Field;
import biz.onixs.fix.parser.FixBlock;
import biz.onixs.fix.parser.Group;
import biz.onixs.fix.parser.GroupInstance;
import biz.onixs.fix.parser.Message;
import biz.onixs.util.ValuePtr;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
public class MessageToXMLConverter implements Function<Message, String> {
private static final String PREFIX = " ";
private final Dictionary dictionary;
MessageToXMLConverter(final Version fixVersion) {
dictionary = DictionaryManager.getDictionary(fixVersion);
}
@Override
public String apply(final Message message) {
final StringBuilder strBdr = new StringBuilder("<?xml version=\"1.0\" encoding=\"ISO-8859-1\"?>");
strBdr.append(System.lineSeparator()).append(System.lineSeparator());
strBdr.append("<XML>").append(System.lineSeparator());
final MessageDefinition msgDef = dictionary.get(new ValuePtr(message.getType()));
strBdr.append(PREFIX).append("<message name=\"").append(msgDef.getMsgName()).append("\"/>");
strBdr.append(System.lineSeparator());
fixBlock2xml(message, msgDef, strBdr, PREFIX + PREFIX);
strBdr.append(PREFIX).append("</message>").append(System.lineSeparator());
strBdr.append("</XML>");
return strBdr.toString();
}
private static void fixBlock2xml(final FixBlock fixBlk, final ContainerDefinition def,
final StringBuilder strBdr, final String prefix) {
for (final Field fld : fixBlk) {
final int tag = fld.getTag();
final FieldDefinition fldDef = def.getFieldByTag(tag);
if (fldDef.isGroupDefined()) {
final Group grp = fixBlk.getGroup(tag);
if (null != grp) {
group2xml(grp, fldDef, strBdr, prefix);
}
} else {
final String name = getFieldName(fldDef);
final String value = fld.get();
strBdr.append(prefix).append("<field name=\"").append(name).append("\" tag=\"").append(tag)
.append("\" value=\"").append(value).append("\"/>").append(System.lineSeparator());
}
}
}
private static void group2xml(final Group grp, final FieldDefinition leadingFldDef,
final StringBuilder strBdr, final String prefix) {
final String name = getFieldName(leadingFldDef);
final int tag = leadingFldDef.getTag();
final int numberOfInstances = grp.getNumberOfInstances();
strBdr.append(prefix).append("<group name=\"").append(name).append("\" tag=\"").append(tag)
.append("\" numberofinstances=\"").append(numberOfInstances)
.append("\">").append(System.lineSeparator());
final GroupDefinition grpDef = leadingFldDef.getGroupDefinition();
int index = 0;
for (final GroupInstance grpInst : grp) {
groupInstance2xml(grpInst, index++, grpDef, strBdr, prefix + PREFIX);
}
strBdr.append(prefix).append("</group>").append(System.lineSeparator());
}
private static void groupInstance2xml(final FixBlock grpInst, final int index, final ContainerDefinition def,
final StringBuilder strBdr, final String prefix) {
strBdr.append(prefix).append("<instance index=\"").append(index).append("\">").append(System.lineSeparator());
fixBlock2xml(grpInst, def, strBdr, prefix + PREFIX);
strBdr.append(prefix).append("</instance>").append(System.lineSeparator());
}
private static String getFieldName(final FieldDefinition def) {
return (null != def.getTagNameBytes()) ?
new String(def.getTagNameBytes(), StandardCharsets.ISO_8859_1) : Integer.toString(def.getTag());
}
}