Sample :: Thread Pool

Thread Pool

Description

This sample demonstrates how to use the ThreadPool mode.

Usage

  • Run the sample:
    • win: ThreadPool.bat
    • linux: ThreadPool.sh
  • Clean everything:
    • win: clean.bat
    • linux: clean.sh

Source Code


import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.ConnectionMode;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.EngineSettings;
import biz.onixs.fix.engine.Session;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX40;
import biz.onixs.fix.tag.Tag;
import biz.onixs.util.Utils;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Processes incoming messages.
 */
public class ThreadPool implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ThreadPool.class);
    private static final String SETTINGS_RESOURCE = "sample/ThreadPool.properties";
    private Version fixVersion;

    @Override
    public void run() {
        try {
            LOG.info("ThreadPool");
            LOG.info("The application is starting...");
            //
            LOG.info("Loading settings from: {}", SETTINGS_RESOURCE);
            final Settings settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
            fixVersion = Version.getByNumber(settings.getString("FIXVersion"));
            //
            final EngineSettings engineSettings = new EngineSettings();
            engineSettings.init(settings);
            // Set connection threading mode to thread pool.
            engineSettings.setConnectionMode(ConnectionMode.THREAD_POOL);
            // Set the number of threads in the pool to 2.
            engineSettings.setThreadPoolSize(2);
            //
            engineSettings.setThreadPoolSpinningTimeout(30_000L);
            //
            LOG.info("Starting the Engine...");
            final Engine engine = Engine.init(engineSettings);
            //
            final Session acceptor = new Session(
                    settings.getString("TargetCompID"), settings.getString("SenderCompID"), fixVersion);
            acceptor.logonAsAcceptor();
            //
            final Session initiator = new Session(
                    settings.getString("SenderCompID"), settings.getString("TargetCompID"), fixVersion);
            initiator.logonAsInitiator(
                    settings.getString("CounterpartyHost"), settings.getInteger("CounterpartyPort"),
                    settings.getInteger("HeartBtInt"), true);
            //
            Utils.waitForEnter("Press 'Enter' to send an order, disconnect the session and terminate the application.");
            //
            initiator.send(createOrder());
            //
            initiator.logout("The session is disconnected");
            //
            initiator.dispose();
            acceptor.dispose();
            //
            LOG.info("Engine shutdown...");
            engine.shutdown();
        } catch (final Exception e) {
            LOG.error(e.getMessage(), e);
        } finally {
            LOG.info("The application is stopped.");
        }
    }

    private Message createOrder() {
        final Message order = Message.create(FIX40.MsgType.Order_Single, fixVersion);
        order.set(Tag.HandlInst, "1")
                .set(Tag.ClOrdID, "Unique identifier for Order")
                .set(Tag.Symbol, "IBM")
                .set(Tag.Side, "1")
                .set(Tag.OrderQty, 1000)
                .set(Tag.OrdType, "1");
        return order;
    }

    public static void main(final String[] args) {
        (new ThreadPool()).run();
    }
}