Samples :: Market Data Feed
Market Data Feed Sample
Description
This sample illustrates receiving of market data.
Directory Contents
Item | Description |
---|---|
conf/sample/MarketDataBuySide.properties | Buy side configuration |
conf/sample/MarketDataSellSide.properties | Sell side configuration |
conf/logback.xml | Logback logging facility configuration |
Usage
- Run the Sell Side sample:
- win: 1-MarketDataSellSide.bat
- linux: 1-MarketDataSellSide.sh
- Run the Buy Side sample:
- win: 2-MarketDataBuySide.bat
- linux: 2-MarketDataBuySide.sh
- Clean everything:
- win: clean.bat
- linux: clean.sh
Source Code
import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.Session;
import biz.onixs.fix.parser.FixMessage;
import biz.onixs.fix.parser.Group;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX42;
import biz.onixs.fix.tag.FIX44;
import biz.onixs.fix.tag.Tag;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Scanner;
/**
* Processes incoming messages.
*/
public class MarketDataSellSide implements
Engine.DynamicAcceptorListener,
Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MarketDataSellSide.class);
private static final String SETTINGS_RESOURCE = "sample/MarketDataSellSide.properties";
private Settings settings = null;
MarketDataPublisher publisher = new MarketDataPublisher();
Thread marketDataSource = null;
volatile boolean stopMarketData = false;
public void run() {
try {
LOG.info("SimpleSellSide");
LOG.info("The application is starting...");
//
LOG.info("Loading settings from: {}", SETTINGS_RESOURCE);
settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
//
LOG.info("Starting the Engine...");
final Engine engine = Engine.init(settings);
engine.addDynamicAcceptorListener(this);
//
stopMarketData = false;
marketDataSource = new Thread(() -> generateMarketData());
marketDataSource.start();
//
System.out.println("Type \"stop\" and press Enter to exit MD publisher...");
Scanner scanner = new Scanner(System.in);
for(String line = scanner.nextLine();!line.toLowerCase().equals("stop");line = scanner.nextLine()) {}
stopMarketData = true;
marketDataSource.join();
//
publisher.shutdown();
//
LOG.info("Engine shutdown...");
engine.shutdown();
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
} finally {
LOG.info("The application is stopped.");
}
}
@Override
public void onDynamicAcceptor(Object o, Engine.DynamicAcceptorArgs dynamicAcceptorArgs) {
FixMessage fixMsg = dynamicAcceptorArgs.getIncomingLogonMessage();
Message msg = fixMsg.getStructuredMessage(fixMsg.getVersion());
String username = msg.get(Tag.Username);
String password = msg.get(Tag.Password);
if (settings.getString("Username").equals(username) && settings.getString("Password").equals(password)) {
LOG.info("New subscription requested");
Session session = new Session(fixMsg.getSenderCompID(), fixMsg.getTargetCompID(), fixMsg.getVersion());
dynamicAcceptorArgs.setCreatedSession(session);
publisher.addSession(session);
LOG.info("New subscription created: " + session.getSenderCompID() + "-" + session.getTargetCompID());
} else {
dynamicAcceptorArgs.setRejectReason("Wrong username/password pair.");
LOG.warn("Wrong logon attempt");
}
}
public void generateMarketData() {
while(!stopMarketData) {
Message msg = Message.create(FIX44.MsgType.MarketDataSnapshotFullRefresh, Version.FIX44);
msg.set(Tag.Symbol, "OIL");
Group grp = msg.setGroup(Tag.NoMDEntries, 2);
grp.set(Tag.MDEntryType, 0, FIX44.MDEntryType.Bid);
grp.set(Tag.MDEntryType, 1, FIX44.MDEntryType.Offer);
msg.updateChecksum();
publisher.dispatchMessage(msg);
publisher.scanSessions();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static void main(final String[] args) {
(new MarketDataSellSide()).run();
}
}
import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.Session;
import biz.onixs.fix.engine.SessionState;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX44;
import biz.onixs.fix.tag.Tag;
import biz.onixs.util.TimestampFormat;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Scanner;
import java.util.concurrent.Semaphore;
/**
* FIX Initiator - Buy Side.
*/
public class MarketDataBuySide implements
Session.InboundApplicationMessageListener,
Session.StateChangeListener,
Session.OutboundSessionMessageListener,
Session.MessageResendingListener,
Session.InboundSessionMessageListener,
Session.WarningListener,
Session.ErrorListener,
Runnable {
private final Logger LOG = LoggerFactory.getLogger(MarketDataBuySide.class);
private final String SETTINGS_RESOURCE = "sample/MarketDataBuySide.properties";
private final Semaphore sessionIsEstablished = new Semaphore(0);
private final Semaphore sessionStopped = new Semaphore(0);
private Settings settings = null;
private Version fixVersion = null;
private Session session = null;
private final String senderCompId;
private final String targetCompId;
public MarketDataBuySide(String[] args) {
LOG.info("Loading settings from: {}", SETTINGS_RESOURCE);
settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
//
senderCompId = args.length > 0 ? args[0] : settings.getString("SenderCompID");
targetCompId = args.length > 1 ? args[1] : settings.getString("TargetCompID");
}
public void run() {
try {
LOG.info("MarketDataBuySide");
LOG.info("The application is starting...");
//
LOG.info("Starting the Engine...");
final Engine engine = Engine.init(settings);
//
createSession();
//
establishConnection();
//
LOG.info("Waiting for session establishment...");
sessionIsEstablished.acquire();
//
LOG.info("Waiting for session stop...");
System.out.println("Type \"stop\" and press Enter to exit MD publisher...");
Scanner scanner = new Scanner(System.in);
for(String line = scanner.nextLine();!line.toLowerCase().equals("stop");line = scanner.nextLine()) {}
//
session.logout("The session is disconnected by SimpleBuySide");
session.dispose();
//
LOG.info("Engine shutdown...");
engine.shutdown();
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
} finally {
LOG.info("The application is stopped.");
}
}
private void createSession() {
fixVersion = Version.getByNumber(settings.getString("FIXVersion"));
session = new Session(senderCompId, targetCompId, fixVersion,
settings.getBoolean("keepSequenceNumbersBetweenFixConnections"));
//
session.setSenderSubID("SenderSubID (50) field")
.setSendingTimeFormat(TimestampFormat.YYYYMMDDHHMMSSMsec)
.setInboundApplicationMessageListener(this)
.setInboundSessionMessageListener(this)
.setOutboundSessionMessageListener(this)
.addStateChangeListener(this)
.setMessageResendingListener(this)
.setErrorListener(this)
.setWarningListener(this);
}
private void establishConnection() {
final String host = settings.getString("CounterpartyHost");
final int port = settings.getInteger("CounterpartyPort");
LOG.info("Establishing connection to {}:{}...", host, port);
session.logonAsInitiator(host, port, true);
}
@Override
public void onInboundApplicationMessage(final Object sender, final Session.InboundApplicationMessageArgs args) {
final Message message = args.getMsg();
LOG.info("Incoming application-level message: {}", message);
if (message.checkType(FIX44.MsgType.MarketDataSnapshotFullRefresh)) {
LOG.info("MD Snapshot Full Refresh received.");
}
}
@Override
public void onInboundSessionMessage(final Object sender, final Session.InboundSessionMessageArgs args) {
final Message message = args.getMsg();
LOG.info("Incoming session-level message: {}", message);
// Processing of the incoming session-level message...
}
@Override
public void onStateChange(final Object sender, final Session.StateChangeArgs args) {
final SessionState newState = args.getNewState();
final SessionState prevState = args.getPrevState();
LOG.info("Session state changed from {} to {}", prevState, newState);
if (SessionState.ESTABLISHED == newState) {
sessionIsEstablished.release();
}
}
@Override
public void onError(final Object sender, final Session.ErrorArgs args) {
LOG.error("{}", args);
}
@Override
public void onWarning(final Object sender, final Session.WarningArgs args) {
LOG.warn("{}", args);
}
@Override
public boolean onMessageResending(final Object sender, final Session.MessageResendingArgs args) {
LOG.info("Message resending request: {}", args.getMsg());
// Return false if it's necessary to skip this message (GapFill will be sent).
return false;
}
@Override
public void onMessageResendingStarted(final Object sender, final long beginSeqNum, final long endSeqNum) {
LOG.info("Message resending is about to start from {} to {} seq. numbers", beginSeqNum, endSeqNum);
}
@Override
public void onMessageResendingFinished(final Object sender, final long beginSeqNum, final long endSeqNum) {
LOG.info("Message resending is finished from {} to {} seq. numbers", beginSeqNum, endSeqNum);
}
@Override
public void onOutboundSessionMessage(Object o, Session.OutboundSessionMessageArgs outboundSessionMessageArgs) {
final Message msg = outboundSessionMessageArgs.getMsg();
if (msg.checkType(FIX44.MsgType.Logon)) {
msg.set(Tag.Username, settings.getString("Username"));
msg.set(Tag.Password, settings.getString("Password"));
}
}
public static void main(final String[] args) {
(new MarketDataBuySide(args)).run();
}
}
Error during retrieving content skip as ignoreDownloadError activated.