The sample illustrates throttling for buy side and for outgoing messages.
This sample can be run together with the “ThrottlingSellSide” sample. The “ThrottlingSellSide” must be started first.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | import biz.onixs.fix.dictionary.Version; import biz.onixs.fix.engine.Engine; import biz.onixs.fix.engine.Session; import biz.onixs.fix.engine.SessionState; import biz.onixs.fix.parser.Message; import biz.onixs.fix.tag.FIX40; import biz.onixs.fix.tag.Tag; import biz.onixs.util.TimestampFormat; import biz.onixs.util.settings.PropertyBasedSettings; import biz.onixs.util.settings.Settings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Locale; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * FIX Initiator - Buy Side. */ public class ThrottlingBuySide implements Session.InboundApplicationMessageListener, Session.StateChangeListener, Session.MessageResendingListener, Session.InboundSessionMessageListener, Session.WarningListener, Session.ErrorListener, Runnable { private static final Logger LOG = LoggerFactory.getLogger(ThrottlingBuySide. class ); private static final String SETTINGS_RESOURCE = "sample/ThrottlingBuySide.properties" ; private final Semaphore sessionIsEstablished = new Semaphore( 0 ); private final Semaphore orderHandlingComplete = new Semaphore( 0 ); private Settings settings = null ; private Version fixVersion = null ; private Session session = null ; private int totalOrdersHandled = 0 ; private int totalMessagesRejected = 0 ; public void run() { try { LOG.info( "ThrottlingBuySide" ); LOG.info( "The application is starting..." ); // LOG.info( "Loading settings from: {}" , SETTINGS_RESOURCE); settings = new PropertyBasedSettings(SETTINGS_RESOURCE); // LOG.info( "Starting the Engine..." ); final Engine engine = Engine.init(settings); // createSession(); // establishConnection(); // LOG.info( "Waiting for session establishment..." ); sessionIsEstablished.acquire(); // int noOfMessages = settings.getInteger( "NoOfMessages" ); for ( int i = 0 ; i < noOfMessages; ++i) { final Message order = createOrder(); session.throttle(); session.send(order); // orderHandlingComplete.acquire(); } // session.logout( "The session is disconnected by ThrottlingBuySide" ); session.dispose(); // LOG.info( "Orders handled: " + totalOrdersHandled + ", messages rejected: " + totalMessagesRejected); // LOG.info( "Engine shutdown..." ); engine.shutdown(); } catch ( final Exception e) { LOG.error(e.getMessage(), e); } finally { LOG.info( "The application is stopped." ); } } private void createSession() { fixVersion = Version.getByNumber(settings.getString( "FIXVersion" )); session = new Session(settings.getString( "SenderCompID" ), settings.getString( "TargetCompID" ), fixVersion, settings.getBoolean( "keepSequenceNumbersBetweenFixConnections" )); // session.setSenderSubID( "SenderSubID (50) field" ) .setSendingTimeFormat(TimestampFormat.YYYYMMDDHHMMSSMsec) .setInboundApplicationMessageListener( this ) .setInboundSessionMessageListener( this ) .addStateChangeListener( this ) .setMessageResendingListener( this ) .setErrorListener( this ) .setWarningListener( this ); updateThrottlingLimit(session); } private void updateThrottlingLimit(Session session) { final int messages = settings.getInteger( "ThrottlingMessages" ); if (messages < 0 ) { return ; } final long interval = settings.getLong( "ThrottlingInterval" ); final TimeUnit timeUnit = TimeUnit.valueOf(settings.getString( "ThrottlingIntervalTimeUnit" ).toUpperCase(Locale.ROOT)); session.setThrottlingLimit(messages, interval, timeUnit); } private void establishConnection() { final String host = settings.getString( "CounterpartyHost" ); final int port = settings.getInteger( "CounterpartyPort" ); LOG.info( "Establishing connection to {}:{}..." , host, port); session.logonAsInitiator(host, port, true ); } private Message createOrder() { final Message order = Message.create(FIX40.MsgType.Order_Single, fixVersion); order.set(Tag.HandlInst, "1" ) .set(Tag.ClOrdID, "Unique identifier for Order" ) .set(Tag.Symbol, "IBM" ) .set(Tag.Side, "1" ) .set(Tag.OrderQty, 1000 ) .set(Tag.OrdType, "1" ); return order; } @Override public void onInboundApplicationMessage( final Object sender, final Session.InboundApplicationMessageArgs args) { final Message message = args.getMsg(); LOG.info( "Incoming application-level message: {}" , message); if (message.checkType(FIX40.MsgType.Execution_Report)) { LOG.info( "Execution report received." ); ++totalOrdersHandled; orderHandlingComplete.release(); } // Processing of the incoming application-level message... } @Override public void onInboundSessionMessage( final Object sender, final Session.InboundSessionMessageArgs args) { final Message message = args.getMsg(); LOG.info( "Incoming session-level message: {}" , message); if (message.getType().equals(FIX40.MsgType.Reject)) { ++totalMessagesRejected; LOG.info( "Rejection " + totalMessagesRejected + ": message " + message.get(Tag.RefSeqNum) + " rejected." ); orderHandlingComplete.release(); } // Processing of the incoming session-level message... } @Override public void onStateChange( final Object sender, final Session.StateChangeArgs args) { final SessionState newState = args.getNewState(); final SessionState prevState = args.getPrevState(); LOG.info( "Session state changed from {} to {}" , prevState, newState); if (SessionState.ESTABLISHED == newState) { sessionIsEstablished.release(); } } @Override public void onError( final Object sender, final Session.ErrorArgs args) { LOG.error( "{}" , args); } @Override public void onWarning( final Object sender, final Session.WarningArgs args) { LOG.warn( "{}" , args); } @Override public boolean onMessageResending( final Object sender, final Session.MessageResendingArgs args) { LOG.info( "Message resending request: {}" , args.getMsg()); // Return false if it's necessary to skip this message (GapFill will be sent). return false ; } @Override public void onMessageResendingStarted( final Object sender, final long beginSeqNum, final long endSeqNum) { LOG.info( "Message resending is about to start from {} to {} seq. numbers" , beginSeqNum, endSeqNum); } @Override public void onMessageResendingFinished( final Object sender, final long beginSeqNum, final long endSeqNum) { LOG.info( "Message resending is finished from {} to {} seq. numbers" , beginSeqNum, endSeqNum); } public static void main( final String[] args) { ( new ThrottlingBuySide()).run(); } } |
The sample illustrates throttling for sell side and for incoming messages.
This sample can be run together with the “ThrottlingBuySide” sample. The “ThrottlingSellSide” must be started first.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | import biz.onixs.fix.dictionary.Version; import biz.onixs.fix.engine.Engine; import biz.onixs.fix.engine.Session; import biz.onixs.fix.engine.SessionState; import biz.onixs.fix.parser.Message; import biz.onixs.fix.tag.FIX40; import biz.onixs.fix.tag.Tag; import biz.onixs.util.GuidGenerator; import biz.onixs.util.Throttler; import biz.onixs.util.TimestampFormat; import biz.onixs.util.settings.PropertyBasedSettings; import biz.onixs.util.settings.Settings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Locale; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * Processes incoming messages. */ public class ThrottlingSellSide implements Session.InboundApplicationMessageListener, Session.StateChangeListener, Session.ErrorListener, Session.WarningListener, Session.MessageResendingListener, Session.InboundSessionMessageListener, Runnable { private static final Logger LOG = LoggerFactory.getLogger(ThrottlingSellSide. class ); private static final String SETTINGS_RESOURCE = "sample/ThrottlingSellSide.properties" ; private final Semaphore sessionIsDisconnected = new Semaphore( 0 ); private final Semaphore sessionIsEstablished = new Semaphore( 0 ); private final GuidGenerator guidGenerator = new GuidGenerator(); private Settings settings = null ; private Version fixVersion = null ; private Session session = null ; private Throttler throttler = new Throttler(); private int totalOrdersHandled = 0 ; private int totalMessagesRejected = 0 ; public void run() { try { LOG.info( "ThrottlingSellSide" ); LOG.info( "The application is starting..." ); // LOG.info( "Loading settings from: {}" , SETTINGS_RESOURCE); settings = new PropertyBasedSettings(SETTINGS_RESOURCE); // updateThrottlingLimit(); // LOG.info( "Starting the Engine..." ); final Engine engine = Engine.init(settings); // createSession(); session.logonAsAcceptor(); sessionIsDisconnected.acquire(); // LOG.info( "Waiting for connection..." ); sessionIsEstablished.acquire(); // LOG.info( "Waiting for disconnection..." ); sessionIsDisconnected.acquire(); session.logout(); session.dispose(); // LOG.info( "Orders handled: " + totalOrdersHandled + ", messages rejected: " + totalMessagesRejected); // LOG.info( "Engine shutdown..." ); engine.shutdown(); } catch ( final Exception e) { LOG.error(e.getMessage(), e); } finally { LOG.info( "The application is stopped." ); } } private void createSession() { fixVersion = Version.getByNumber(settings.getString( "FIXVersion" )); session = new Session(settings.getString( "SenderCompID" ), settings.getString( "TargetCompID" ), fixVersion, settings.getBoolean( "keepSequenceNumbersBetweenFixConnections" )); // session.setInboundApplicationMessageListener( this ) .setInboundSessionMessageListener( this ) .addStateChangeListener( this ) .setErrorListener( this ) .setWarningListener( this ) .setMessageResendingListener( this ) .setSendingTimeFormat(TimestampFormat.YYYYMMDDHHMMSSMsec); } private void updateThrottlingLimit() { final int messages = settings.getInteger( "ThrottlingMessages" ); if (messages < 0 ) { return ; } final long interval = settings.getLong( "ThrottlingInterval" ); final TimeUnit timeUnit = TimeUnit.valueOf(settings.getString( "ThrottlingIntervalTimeUnit" ).toUpperCase(Locale.ROOT)); throttler.reset(messages, timeUnit.toMillis(interval)); } private Message createExecutionReport( final Message order) { final Message execReport = Message.create(FIX40.MsgType.Execution_Report, fixVersion); execReport.set(Tag.OrderID, order.get(Tag.ClOrdID)) .set(Tag.ExecID, guidGenerator.generate()) .set(Tag.ExecTransType, "0" ) .set(Tag.OrdStatus, "0" ) .set(Tag.Symbol, order.get(Tag.Symbol)) .set(Tag.Side, order.get(Tag.Side)) .set(Tag.LastQty, order.get(Tag.OrderQty)) .set(Tag.LastPx, "100.0" ) .set(Tag.OrderQty, order.get(Tag.OrderQty)) .set(Tag.CumQty, order.get(Tag.OrderQty)) .set(Tag.AvgPx, "100.0" ); return execReport; } @Override public void onInboundApplicationMessage( final Object sender, final Session.InboundApplicationMessageArgs args) { final Message message = args.getMsg(); LOG.info( "Incoming application-level message: {}" , message); // Throttle message synchronized (throttler) { if (throttler.tryAdd(System.currentTimeMillis()) != 0 ) { // Reject the message final Message rejectMessage = Message.create(FIX40.MsgType.Reject, fixVersion); rejectMessage.set(Tag.RefSeqNum, message.getSeqNum()); session.send(rejectMessage); ++totalMessagesRejected; LOG.info( "Rejection " + totalMessagesRejected + ": message " + message.getSeqNum() + " rejected due to throttling limit exhausting." ); return ; } } if (message.checkType(FIX40.MsgType.Order_Single)) { LOG.info( "Order received." ); final Message executionReport = createExecutionReport(message); LOG.info( "Sending execution report in response: {}" , executionReport); ++totalOrdersHandled; session.send(executionReport); } } @Override public void onInboundSessionMessage( final Object sender, final Session.InboundSessionMessageArgs args) { final Message message = args.getMsg(); LOG.info( "Incoming session-level message: {}" , message); // Processing of the incoming session-level message... } @Override public void onStateChange( final Object sender, final Session.StateChangeArgs args) { final SessionState newState = args.getNewState(); final SessionState prevState = args.getPrevState(); LOG.info( "Session state changed from {} to {}" , prevState, newState); if (SessionState.AWAIT_LOGON == newState) { sessionIsDisconnected.release(); } else if (SessionState.ESTABLISHED == newState) { sessionIsEstablished.release(); } } @Override public void onError( final Object sender, final Session.ErrorArgs args) { LOG.error( "{}" , args); } @Override public void onWarning( final Object sender, final Session.WarningArgs args) { LOG.warn( "{}" , args); } @Override public boolean onMessageResending( final Object sender, final Session.MessageResendingArgs args) { LOG.info( "Message resending request: {}" , args.getMsg()); // Return false if it's necessary to skip this message (GapFill will be sent). return false ; } @Override public void onMessageResendingStarted( final Object sender, final long beginSeqNum, final long endSeqNum) { LOG.info( "Message resending is about to start from {} to {} seq. numbers" , beginSeqNum, endSeqNum); } @Override public void onMessageResendingFinished( final Object sender, final long beginSeqNum, final long endSeqNum) { LOG.info( "Message resending is finished from {} to {} seq. numbers" , beginSeqNum, endSeqNum); } public static void main( final String[] args) { ( new ThrottlingSellSide()).run(); } } |