Sample :: Thread Pool
Thread Pool
Description
This sample demonstrates how to use the ThreadPool mode.
Usage
- Run the sample:
- win: ThreadPool.bat
- linux: ThreadPool.sh
- Clean everything:
- win: clean.bat
- linux: clean.sh
Source Code
import biz.onixs.fix.dictionary.Version;
import biz.onixs.fix.engine.ConnectionMode;
import biz.onixs.fix.engine.Engine;
import biz.onixs.fix.engine.EngineSettings;
import biz.onixs.fix.engine.Session;
import biz.onixs.fix.parser.Message;
import biz.onixs.fix.tag.FIX40;
import biz.onixs.fix.tag.Tag;
import biz.onixs.util.Utils;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Processes incoming messages.
*/
public class ThreadPool implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPool.class);
private static final String SETTINGS_RESOURCE = "sample/ThreadPool.properties";
private Version fixVersion;
@Override
public void run() {
try {
LOG.info("ThreadPool");
LOG.info("The application is starting...");
//
LOG.info("Loading settings from: {}", SETTINGS_RESOURCE);
final Settings settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
fixVersion = Version.getByNumber(settings.getString("FIXVersion"));
//
final EngineSettings engineSettings = new EngineSettings();
engineSettings.init(settings);
// Set connection threading mode to thread pool.
engineSettings.setConnectionMode(ConnectionMode.THREAD_POOL);
// Set the number of threads in the pool to 2.
engineSettings.setThreadPoolSize(2);
//
engineSettings.setThreadPoolSpinningTimeout(30_000L);
//
LOG.info("Starting the Engine...");
final Engine engine = Engine.init(engineSettings);
//
final Session acceptor = new Session(
settings.getString("TargetCompID"), settings.getString("SenderCompID"), fixVersion);
acceptor.logonAsAcceptor();
//
final Session initiator = new Session(
settings.getString("SenderCompID"), settings.getString("TargetCompID"), fixVersion);
initiator.logonAsInitiator(
settings.getString("CounterpartyHost"), settings.getInteger("CounterpartyPort"),
settings.getInteger("HeartBtInt"), true);
//
Utils.waitForEnter("Press 'Enter' to send an order, disconnect the session and terminate the application.");
//
initiator.send(createOrder());
//
initiator.logout("The session is disconnected");
//
initiator.dispose();
acceptor.dispose();
//
LOG.info("Engine shutdown...");
engine.shutdown();
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
} finally {
LOG.info("The application is stopped.");
}
}
private Message createOrder() {
final Message order = Message.create(FIX40.MsgType.Order_Single, fixVersion);
order.set(Tag.HandlInst, "1")
.set(Tag.ClOrdID, "Unique identifier for Order")
.set(Tag.Symbol, "IBM")
.set(Tag.Side, "1")
.set(Tag.OrderQty, 1000)
.set(Tag.OrdType, "1");
return order;
}
public static void main(final String[] args) {
(new ThreadPool()).run();
}
}