The sample illustrates throttling for buy side and for outgoing messages.
This sample can be run together with the “ThrottlingSellSide” sample. The “ThrottlingSellSide” must be started first.
| import biz.onixs.fix.dictionary.Version; import biz.onixs.fix.engine.Engine; import biz.onixs.fix.engine.Session; import biz.onixs.fix.engine.SessionState; import biz.onixs.fix.parser.Message; import biz.onixs.fix.tag.FIX40; import biz.onixs.fix.tag.Tag; import biz.onixs.util.TimestampFormat; import biz.onixs.util.settings.PropertyBasedSettings; import biz.onixs.util.settings.Settings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Locale; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * FIX Initiator - Buy Side. */ public class ThrottlingBuySide implements Session.InboundApplicationMessageListener, Session.StateChangeListener, Session.MessageResendingListener, Session.InboundSessionMessageListener, Session.WarningListener, Session.ErrorListener, Runnable { private static final Logger LOG = LoggerFactory.getLogger(ThrottlingBuySide. class ); private static final String SETTINGS_RESOURCE = "sample/ThrottlingBuySide.properties" ; private final Semaphore sessionIsEstablished = new Semaphore( 0 ); private final Semaphore orderHandlingComplete = new Semaphore( 0 ); private Settings settings = null ; private Version fixVersion = null ; private Session session = null ; private int totalOrdersHandled = 0 ; private int totalMessagesRejected = 0 ; public void run() { try { LOG.info( "ThrottlingBuySide" ); LOG.info( "The application is starting..." ); // LOG.info( "Loading settings from: {}" , SETTINGS_RESOURCE); settings = new PropertyBasedSettings(SETTINGS_RESOURCE); // LOG.info( "Starting the Engine..." ); final Engine engine = Engine.init(settings); // createSession(); // establishConnection(); // LOG.info( "Waiting for session establishment..." ); sessionIsEstablished.acquire(); // int noOfMessages = settings.getInteger( "NoOfMessages" ); for ( int i = 0 ; i < noOfMessages; ++i) { final Message order = createOrder(); session.throttle(); session.send(order); // orderHandlingComplete.acquire(); } // session.logout( "The session is disconnected by ThrottlingBuySide" ); session.dispose(); // LOG.info( "Orders handled: " + totalOrdersHandled + ", messages rejected: " + totalMessagesRejected); // LOG.info( "Engine shutdown..." ); engine.shutdown(); } catch ( final Exception e) { LOG.error(e.getMessage(), e); } finally { LOG.info( "The application is stopped." ); } } private void createSession() { fixVersion = Version.getByNumber(settings.getString( "FIXVersion" )); session = new Session(settings.getString( "SenderCompID" ), settings.getString( "TargetCompID" ), fixVersion, settings.getBoolean( "keepSequenceNumbersBetweenFixConnections" )); // session.setSenderSubID( "SenderSubID (50) field" ) .setSendingTimeFormat(TimestampFormat.YYYYMMDDHHMMSSMsec) .setInboundApplicationMessageListener( this ) .setInboundSessionMessageListener( this ) .addStateChangeListener( this ) .setMessageResendingListener( this ) .setErrorListener( this ) .setWarningListener( this ); updateThrottlingLimit(session); } private void updateThrottlingLimit(Session session) { final int messages = settings.getInteger( "ThrottlingMessages" ); if (messages < 0 ) { return ; } final long interval = settings.getLong( "ThrottlingInterval" ); final TimeUnit timeUnit = TimeUnit.valueOf(settings.getString( "ThrottlingIntervalTimeUnit" ).toUpperCase(Locale.ROOT)); session.setThrottlingLimit(messages, interval, timeUnit); } private void establishConnection() { final String host = settings.getString( "CounterpartyHost" ); final int port = settings.getInteger( "CounterpartyPort" ); LOG.info( "Establishing connection to {}:{}..." , host, port); session.logonAsInitiator(host, port, true ); } private Message createOrder() { final Message order = Message.create(FIX40.MsgType.Order_Single, fixVersion); order.set(Tag.HandlInst, "1" ) .set(Tag.ClOrdID, "Unique identifier for Order" ) .set(Tag.Symbol, "IBM" ) .set(Tag.Side, "1" ) .set(Tag.OrderQty, 1000 ) .set(Tag.OrdType, "1" ); return order; } @Override public void onInboundApplicationMessage( final Object sender, final Session.InboundApplicationMessageArgs args) { final Message message = args.getMsg(); LOG.info( "Incoming application-level message: {}" , message); if (message.checkType(FIX40.MsgType.Execution_Report)) { LOG.info( "Execution report received." ); ++totalOrdersHandled; orderHandlingComplete.release(); } // Processing of the incoming application-level message... } @Override public void onInboundSessionMessage( final Object sender, final Session.InboundSessionMessageArgs args) { final Message message = args.getMsg(); LOG.info( "Incoming session-level message: {}" , message); if (message.getType().equals(FIX40.MsgType.Reject)) { ++totalMessagesRejected; LOG.info( "Rejection " + totalMessagesRejected + ": message " + message.get(Tag.RefSeqNum) + " rejected." ); orderHandlingComplete.release(); } // Processing of the incoming session-level message... } @Override public void onStateChange( final Object sender, final Session.StateChangeArgs args) { final SessionState newState = args.getNewState(); final SessionState prevState = args.getPrevState(); LOG.info( "Session state changed from {} to {}" , prevState, newState); if (SessionState.ESTABLISHED == newState) { sessionIsEstablished.release(); } } @Override public void onError( final Object sender, final Session.ErrorArgs args) { LOG.error( "{}" , args); } @Override public void onWarning( final Object sender, final Session.WarningArgs args) { LOG.warn( "{}" , args); } @Override public boolean onMessageResending( final Object sender, final Session.MessageResendingArgs args) { LOG.info( "Message resending request: {}" , args.getMsg()); // Return false if it's necessary to skip this message (GapFill will be sent). return false ; } @Override public void onMessageResendingStarted( final Object sender, final long beginSeqNum, final long endSeqNum) { LOG.info( "Message resending is about to start from {} to {} seq. numbers" , beginSeqNum, endSeqNum); } @Override public void onMessageResendingFinished( final Object sender, final long beginSeqNum, final long endSeqNum) { LOG.info( "Message resending is finished from {} to {} seq. numbers" , beginSeqNum, endSeqNum); } public static void main( final String[] args) { ( new ThrottlingBuySide()).run(); } } |
The sample illustrates throttling for sell side and for incoming messages.
This sample can be run together with the “ThrottlingBuySide” sample. The “ThrottlingSellSide” must be started first.
| import biz.onixs.fix.dictionary.Version; import biz.onixs.fix.engine.Engine; import biz.onixs.fix.engine.Session; import biz.onixs.fix.engine.SessionState; import biz.onixs.fix.parser.Message; import biz.onixs.fix.tag.FIX40; import biz.onixs.fix.tag.Tag; import biz.onixs.util.GuidGenerator; import biz.onixs.util.Throttler; import biz.onixs.util.TimestampFormat; import biz.onixs.util.settings.PropertyBasedSettings; import biz.onixs.util.settings.Settings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Locale; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * Processes incoming messages. */ public class ThrottlingSellSide implements Session.InboundApplicationMessageListener, Session.StateChangeListener, Session.ErrorListener, Session.WarningListener, Session.MessageResendingListener, Session.InboundSessionMessageListener, Runnable { private static final Logger LOG = LoggerFactory.getLogger(ThrottlingSellSide. class ); private static final String SETTINGS_RESOURCE = "sample/ThrottlingSellSide.properties" ; private final Semaphore sessionIsDisconnected = new Semaphore( 0 ); private final Semaphore sessionIsEstablished = new Semaphore( 0 ); private final GuidGenerator guidGenerator = new GuidGenerator(); private Settings settings = null ; private Version fixVersion = null ; private Session session = null ; private Throttler throttler = new Throttler(); private int totalOrdersHandled = 0 ; private int totalMessagesRejected = 0 ; public void run() { try { LOG.info( "ThrottlingSellSide" ); LOG.info( "The application is starting..." ); // LOG.info( "Loading settings from: {}" , SETTINGS_RESOURCE); settings = new PropertyBasedSettings(SETTINGS_RESOURCE); // updateThrottlingLimit(); // LOG.info( "Starting the Engine..." ); final Engine engine = Engine.init(settings); // createSession(); session.logonAsAcceptor(); sessionIsDisconnected.acquire(); // LOG.info( "Waiting for connection..." ); sessionIsEstablished.acquire(); // LOG.info( "Waiting for disconnection..." ); sessionIsDisconnected.acquire(); session.logout(); session.dispose(); // LOG.info( "Orders handled: " + totalOrdersHandled + ", messages rejected: " + totalMessagesRejected); // LOG.info( "Engine shutdown..." ); engine.shutdown(); } catch ( final Exception e) { LOG.error(e.getMessage(), e); } finally { LOG.info( "The application is stopped." ); } } private void createSession() { fixVersion = Version.getByNumber(settings.getString( "FIXVersion" )); session = new Session(settings.getString( "SenderCompID" ), settings.getString( "TargetCompID" ), fixVersion, settings.getBoolean( "keepSequenceNumbersBetweenFixConnections" )); // session.setInboundApplicationMessageListener( this ) .setInboundSessionMessageListener( this ) .addStateChangeListener( this ) .setErrorListener( this ) .setWarningListener( this ) .setMessageResendingListener( this ) .setSendingTimeFormat(TimestampFormat.YYYYMMDDHHMMSSMsec); } private void updateThrottlingLimit() { final int messages = settings.getInteger( "ThrottlingMessages" ); if (messages < 0 ) { return ; } final long interval = settings.getLong( "ThrottlingInterval" ); final TimeUnit timeUnit = TimeUnit.valueOf(settings.getString( "ThrottlingIntervalTimeUnit" ).toUpperCase(Locale.ROOT)); throttler.reset(messages, timeUnit.toMillis(interval)); } private Message createExecutionReport( final Message order) { final Message execReport = Message.create(FIX40.MsgType.Execution_Report, fixVersion); execReport.set(Tag.OrderID, order.get(Tag.ClOrdID)) .set(Tag.ExecID, guidGenerator.generate()) .set(Tag.ExecTransType, "0" ) .set(Tag.OrdStatus, "0" ) .set(Tag.Symbol, order.get(Tag.Symbol)) .set(Tag.Side, order.get(Tag.Side)) .set(Tag.LastQty, order.get(Tag.OrderQty)) .set(Tag.LastPx, "100.0" ) .set(Tag.OrderQty, order.get(Tag.OrderQty)) .set(Tag.CumQty, order.get(Tag.OrderQty)) .set(Tag.AvgPx, "100.0" ); return execReport; } @Override public void onInboundApplicationMessage( final Object sender, final Session.InboundApplicationMessageArgs args) { final Message message = args.getMsg(); LOG.info( "Incoming application-level message: {}" , message); // Throttle message synchronized (throttler) { if (throttler.tryAdd(System.currentTimeMillis()) != 0 ) { // Reject the message final Message rejectMessage = Message.create(FIX40.MsgType.Reject, fixVersion); rejectMessage.set(Tag.RefSeqNum, message.getSeqNum()); session.send(rejectMessage); ++totalMessagesRejected; LOG.info( "Rejection " + totalMessagesRejected + ": message " + message.getSeqNum() + " rejected due to throttling limit exhausting." ); return ; } } if (message.checkType(FIX40.MsgType.Order_Single)) { LOG.info( "Order received." ); final Message executionReport = createExecutionReport(message); LOG.info( "Sending execution report in response: {}" , executionReport); ++totalOrdersHandled; session.send(executionReport); } } @Override public void onInboundSessionMessage( final Object sender, final Session.InboundSessionMessageArgs args) { final Message message = args.getMsg(); LOG.info( "Incoming session-level message: {}" , message); // Processing of the incoming session-level message... } @Override public void onStateChange( final Object sender, final Session.StateChangeArgs args) { final SessionState newState = args.getNewState(); final SessionState prevState = args.getPrevState(); LOG.info( "Session state changed from {} to {}" , prevState, newState); if (SessionState.AWAIT_LOGON == newState) { sessionIsDisconnected.release(); } else if (SessionState.ESTABLISHED == newState) { sessionIsEstablished.release(); } } @Override public void onError( final Object sender, final Session.ErrorArgs args) { LOG.error( "{}" , args); } @Override public void onWarning( final Object sender, final Session.WarningArgs args) { LOG.warn( "{}" , args); } @Override public boolean onMessageResending( final Object sender, final Session.MessageResendingArgs args) { LOG.info( "Message resending request: {}" , args.getMsg()); // Return false if it's necessary to skip this message (GapFill will be sent). return false ; } @Override public void onMessageResendingStarted( final Object sender, final long beginSeqNum, final long endSeqNum) { LOG.info( "Message resending is about to start from {} to {} seq. numbers" , beginSeqNum, endSeqNum); } @Override public void onMessageResendingFinished( final Object sender, final long beginSeqNum, final long endSeqNum) { LOG.info( "Message resending is finished from {} to {} seq. numbers" , beginSeqNum, endSeqNum); } public static void main( final String[] args) { ( new ThrottlingSellSide()).run(); } } |