Samples :: Market Data Feed

Market Data Feed Sample

Description

This sample illustrates receiving of market data.

Directory Contents

Item Description
conf/sample/MarketDataBuySide.properties Buy side configuration
conf/sample/MarketDataSellSide.properties Sell side configuration
conf/logback.xml Logback logging facility configuration

Usage

  • Run the Sell Side sample:
    • win: 1-MarketDataSellSide.bat
    • linux: 1-MarketDataSellSide.sh
  • Run the Buy Side sample:
    • win: 2-MarketDataBuySide.bat
    • linux: 2-MarketDataBuySide.sh
  • Clean everything:
    • win: clean.bat
    • linux: clean.sh

Source Code

import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.Session;
import biz.onixs.fix.parser.FixMessage;
import biz.onixs.fix.parser.Group;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX42;
import biz.onixs.fix.tag.FIX44;
import biz.onixs.fix.tag.Tag;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Scanner;

/**
 * Processes incoming messages.
 */
public class MarketDataSellSide implements
        Engine.DynamicAcceptorListener,
        Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MarketDataSellSide.class);
    private static final String SETTINGS_RESOURCE = "sample/MarketDataSellSide.properties";
    private Settings settings = null;
    MarketDataPublisher publisher = new MarketDataPublisher();
    Thread marketDataSource = null;
    volatile boolean stopMarketData = false;

    public void run() {
        try {
            LOG.info("SimpleSellSide");
            LOG.info("The application is starting...");
            //
            LOG.info("Loading settings from: {}", SETTINGS_RESOURCE);
            settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
            //
            LOG.info("Starting the Engine...");
            final Engine engine = Engine.init(settings);
            engine.addDynamicAcceptorListener(this);
            //
            stopMarketData = false;
            marketDataSource = new Thread(() -> generateMarketData());
            marketDataSource.start();
            //
            System.out.println("Type \"stop\" and press Enter to exit MD publisher...");
            Scanner scanner = new Scanner(System.in);
            for(String line = scanner.nextLine();!line.toLowerCase().equals("stop");line = scanner.nextLine()) {}
            stopMarketData = true;
            marketDataSource.join();
            //
            publisher.shutdown();
            //
            LOG.info("Engine shutdown...");
            engine.shutdown();
        } catch (final Exception e) {
            LOG.error(e.getMessage(), e);
        } finally {
            LOG.info("The application is stopped.");
        }
    }

    @Override
    public void onDynamicAcceptor(Object o, Engine.DynamicAcceptorArgs dynamicAcceptorArgs) {
        FixMessage fixMsg = dynamicAcceptorArgs.getIncomingLogonMessage();
        Message msg = fixMsg.getStructuredMessage(fixMsg.getVersion());
        String username = msg.get(Tag.Username);
        String password = msg.get(Tag.Password);
        if (settings.getString("Username").equals(username) && settings.getString("Password").equals(password)) {
            LOG.info("New subscription requested");
            Session session = new Session(fixMsg.getSenderCompID(), fixMsg.getTargetCompID(), fixMsg.getVersion());
            dynamicAcceptorArgs.setCreatedSession(session);
            publisher.addSession(session);
            LOG.info("New subscription created: " + session.getSenderCompID() + "-" + session.getTargetCompID());
        } else {
            dynamicAcceptorArgs.setRejectReason("Wrong username/password pair.");
            LOG.warn("Wrong logon attempt");
        }
    }

    public void generateMarketData() {
        while(!stopMarketData) {
            Message msg = Message.create(FIX44.MsgType.MarketDataSnapshotFullRefresh, Version.FIX44);
            msg.set(Tag.Symbol, "OIL");
            Group grp = msg.setGroup(Tag.NoMDEntries, 2);
            grp.set(Tag.MDEntryType, 0, FIX44.MDEntryType.Bid);
            grp.set(Tag.MDEntryType, 1, FIX44.MDEntryType.Offer);
            msg.updateChecksum();
            publisher.dispatchMessage(msg);
            publisher.scanSessions();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void main(final String[] args) {
        (new MarketDataSellSide()).run();
    }
}
import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.Session;
import biz.onixs.fix.engine.SessionState;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX44;
import biz.onixs.fix.tag.Tag;
import biz.onixs.util.TimestampFormat;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Scanner;
import java.util.concurrent.Semaphore;

/**
 * FIX Initiator - Buy Side.
 */
public class MarketDataBuySide implements
        Session.InboundApplicationMessageListener,
        Session.StateChangeListener,
        Session.OutboundSessionMessageListener,
        Session.MessageResendingListener,
        Session.InboundSessionMessageListener,
        Session.WarningListener,
        Session.ErrorListener,
        Runnable {
    private final Logger LOG = LoggerFactory.getLogger(MarketDataBuySide.class);
    private final String SETTINGS_RESOURCE = "sample/MarketDataBuySide.properties";
    private final Semaphore sessionIsEstablished = new Semaphore(0);
    private final Semaphore sessionStopped = new Semaphore(0);
    private Settings settings = null;
    private Version fixVersion = null;
    private Session session = null;
    private final String senderCompId;
    private final String targetCompId;

    public MarketDataBuySide(String[] args) {
        LOG.info("Loading settings from: {}", SETTINGS_RESOURCE);
        settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
        //
        senderCompId = args.length > 0 ? args[0] : settings.getString("SenderCompID");
        targetCompId = args.length > 1 ? args[1] : settings.getString("TargetCompID");
    }

    public void run() {
        try {
            LOG.info("MarketDataBuySide");
            LOG.info("The application is starting...");
            //
            LOG.info("Starting the Engine...");
            final Engine engine = Engine.init(settings);
            //
            createSession();
            //
            establishConnection();
            //
            LOG.info("Waiting for session establishment...");
            sessionIsEstablished.acquire();
            //
            LOG.info("Waiting for session stop...");
            System.out.println("Type \"stop\" and press Enter to exit MD publisher...");
            Scanner scanner = new Scanner(System.in);
            for(String line = scanner.nextLine();!line.toLowerCase().equals("stop");line = scanner.nextLine()) {}
            //
            session.logout("The session is disconnected by SimpleBuySide");
            session.dispose();
            //
            LOG.info("Engine shutdown...");
            engine.shutdown();
        } catch (final Exception e) {
            LOG.error(e.getMessage(), e);
        } finally {
            LOG.info("The application is stopped.");
        }
    }

    private void createSession() {
        fixVersion = Version.getByNumber(settings.getString("FIXVersion"));
        session = new Session(senderCompId, targetCompId, fixVersion,
                settings.getBoolean("keepSequenceNumbersBetweenFixConnections"));
        //
        session.setSenderSubID("SenderSubID (50) field")
                .setSendingTimeFormat(TimestampFormat.YYYYMMDDHHMMSSMsec)
                .setInboundApplicationMessageListener(this)
                .setInboundSessionMessageListener(this)
                .setOutboundSessionMessageListener(this)
                .addStateChangeListener(this)
                .setMessageResendingListener(this)
                .setErrorListener(this)
                .setWarningListener(this);
    }

    private void establishConnection() {
        final String host = settings.getString("CounterpartyHost");
        final int port = settings.getInteger("CounterpartyPort");
        LOG.info("Establishing connection to {}:{}...", host, port);
        session.logonAsInitiator(host, port, true);
    }

    @Override
    public void onInboundApplicationMessage(final Object sender, final Session.InboundApplicationMessageArgs args) {
        final Message message = args.getMsg();
        LOG.info("Incoming application-level message: {}", message);
        if (message.checkType(FIX44.MsgType.MarketDataSnapshotFullRefresh)) {
            LOG.info("MD Snapshot Full Refresh received.");
        }
    }

    @Override
    public void onInboundSessionMessage(final Object sender, final Session.InboundSessionMessageArgs args) {
        final Message message = args.getMsg();
        LOG.info("Incoming session-level message: {}", message);
        // Processing of the incoming session-level message...
    }

    @Override
    public void onStateChange(final Object sender, final Session.StateChangeArgs args) {
        final SessionState newState = args.getNewState();
        final SessionState prevState = args.getPrevState();
        LOG.info("Session state changed from {} to {}", prevState, newState);
        if (SessionState.ESTABLISHED == newState) {
            sessionIsEstablished.release();
        }
    }

    @Override
    public void onError(final Object sender, final Session.ErrorArgs args) {
        LOG.error("{}", args);
    }

    @Override
    public void onWarning(final Object sender, final Session.WarningArgs args) {
        LOG.warn("{}", args);
    }

    @Override
    public boolean onMessageResending(final Object sender, final Session.MessageResendingArgs args) {
        LOG.info("Message resending request: {}", args.getMsg());
        // Return false if it's necessary to skip this message (GapFill will be sent).
        return false;
    }

    @Override
    public void onMessageResendingStarted(final Object sender, final long beginSeqNum, final long endSeqNum) {
        LOG.info("Message resending is about to start from {} to {} seq. numbers", beginSeqNum, endSeqNum);
    }

    @Override
    public void onMessageResendingFinished(final Object sender, final long beginSeqNum, final long endSeqNum) {
        LOG.info("Message resending is finished from {} to {} seq. numbers", beginSeqNum, endSeqNum);
    }

    @Override
    public void onOutboundSessionMessage(Object o, Session.OutboundSessionMessageArgs outboundSessionMessageArgs) {
        final Message msg = outboundSessionMessageArgs.getMsg();
        if (msg.checkType(FIX44.MsgType.Logon)) {
            msg.set(Tag.Username, settings.getString("Username"));
            msg.set(Tag.Password, settings.getString("Password"));
        }
    }

    public static void main(final String[] args) {
        (new MarketDataBuySide(args)).run();
    }
}
Error during retrieving content skip as ignoreDownloadError activated.