Samples :: Benchmark
Throughput Benchmark
Description
This sample is intended for measuring overall message transfer speed.
Usage
- Run the sample:
- win: ThroughputBenchmark.bat
- linux: ThroughputBenchmark.sh
- Clean everything:
- win: clean.bat
- linux: clean.sh
Source Code
import biz.onixs.cme.ilink3.handler.ApplicationLayerMsgType;
import biz.onixs.cme.ilink3.handler.CME;
import biz.onixs.cme.ilink3.handler.Handler;
import biz.onixs.cme.ilink3.handler.Session;
import biz.onixs.cme.ilink3.handler.Tag;
import biz.onixs.cme.ilink3.handler.session.SessionSettings;
import biz.onixs.cme.ilink3.handler.storage.SessionStorageType;
import biz.onixs.cme.ilink3.testing.Emulator;
import biz.onixs.cme.ilink3.testing.TestUtility;
import biz.onixs.sbe.IMessage;
import biz.onixs.util.PrecisionTimer;
import biz.onixs.util.ScaledDecimal;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
/**
* Throughput Benchmark.
*/
public class ThroughputBenchmark implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ThroughputBenchmark.class);
private static final String SETTINGS_RESOURCE = "sample/ThroughputBenchmark.properties";
private static final Settings settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
private static final long WARM_UP_MESSAGE_NUM = settings.getLong("WarmUpMessageNumber", 20_000L);
private static final long MESSAGE_NUM = settings.getLong("MessageNumber", 100_000L);
private static final long LOG_STAT_NUMBER = settings.getLong("LogStatNumber", 10_000L);
private static final String sessionStorageTypeId = settings.getString("SessionStorageType", "NullStorage");
private final int MarketSegmentId = 54;
private Session session = null;
public void run() {
try {
LOG.info("Getting Started Sample");
final Thread threadAcceptor = new Thread(new Acceptor());
final Handler handler = Handler.init(settings);
LOG.info("Starting the Handler...");
threadAcceptor.start();
createSession();
establishConnection();
final IMessage report = createExecutionReportNewOrder(session.getSessionId().getUuid(), 1);
for (long l = 0L; (WARM_UP_MESSAGE_NUM + MESSAGE_NUM) > l; ++l) {
session.send(report);
}
threadAcceptor.join();
LOG.info("Handler shutdown...");
handler.shutdown();
} catch (final RuntimeException | InterruptedException e) {
LOG.error(e.getMessage(), e);
} finally {
LOG.info("The application is stopped.");
}
}
private void createSession() {
final SessionSettings sessionSettings = new SessionSettings();
sessionSettings.init(settings);
sessionSettings.setKeepAliveInterval(60);
sessionSettings.setOutputQueueMaxSize(1024 * 1024 * 1024);
session = new Session(sessionSettings, MarketSegmentId, false,
Handler.getInstance().getStorageRepositoryManager().getStorageType(sessionStorageTypeId));
}
private void establishConnection() {
final String host = settings.getString("CounterpartyHost");
final int port = settings.getInteger("CounterpartyPort");
LOG.info("Establishing connection to {}:{} ...", host, port);
session.logon(host, port);
}
public IMessage createExecutionReportNewOrder(final long uuid, final long orderId) {
final byte[] buffer = new byte[1024];
Arrays.fill(buffer, (byte) 0);
final IMessage message = session.getDecoder().encode(buffer, 0, buffer.length,
ApplicationLayerMsgType.ExecutionReportNewId);
message.setLong(Tag.UUID, uuid);
message.setBoolean(Tag.PossRetransFlag, false);
message.setLong(Tag.PartyDetailsListReqID, 1);
message.setInt(Tag.SecurityID, 205787);
message.setUnsignedInt(Tag.OrderQty, 1000L);
final ScaledDecimal price = new ScaledDecimal(10L, 1);
message.setDecimal(Tag.Price, price);
message.setUnsignedByte(Tag.Side, CME.SideReq.Sell);
message.setString(Tag.SenderID, "MST");
message.setString(Tag.ClOrdID, "SomeOrderID");
message.setLong(Tag.OrderRequestID, 1L);
message.setString(Tag.Location, "UK");
message.setUnsignedByte(Tag.TimeInForce, CME.TimeInForce.Day);
message.setUnsignedByte(Tag.ManualOrderIndicator, CME.ManualOrdIndReq.Automated);
message.setUnsignedByte(Tag.ExecInst, CME.ExecInst.AON);
message.setUnsignedByte(Tag.ExecutionMode, CME.ExecMode.Aggressive);
return message;
}
public static class Acceptor implements Runnable {
private PrecisionTimer timer = null;
private long warmUpCounter = 0L;
private long benchmarkCounter = 0L;
@Override
public void run() {
final int port = settings.getInteger("CounterpartyPort");
try {
final Emulator emulator = new Emulator(new TestUtility());
emulator.setPort(port);
emulator.acceptConnection();
emulator.acceptNegotiation();
emulator.acceptEstablishment();
while (true) {
final int msgsReceived = emulator.receiveMessageWithoutDecodingFixedSize(238);
for (int i = 0; i < msgsReceived; ++i) {
if (WARM_UP_MESSAGE_NUM > warmUpCounter) {
warmUpCounter++;
if (WARM_UP_MESSAGE_NUM == warmUpCounter) {
LOG.info("Warm-up: {} messages transferred", WARM_UP_MESSAGE_NUM);
timer = new PrecisionTimer();
}
} else {
benchmarkCounter++;
if (0L == (benchmarkCounter % LOG_STAT_NUMBER)) {
logStat();
}
if (0L == benchmarkCounter % 1000) {
emulator.sendSequence(1, 0);
}
if (MESSAGE_NUM == benchmarkCounter) {
emulator.terminate();
return;
}
}
}
}
} catch (final IOException e) {
LOG.error(e.getMessage(), e);
}
}
private void logStat() {
timer.update();
final double speed = timer.getItemsPerSecond(benchmarkCounter);
final double latency = timer.getNanoSecondsPerItem(benchmarkCounter);
LOG.info("Benchmark: {} / {} messages transferred, avg throughput {} msgs/sec, avg latency {} nanos",
benchmarkCounter, MESSAGE_NUM, speed, latency);
}
}
public static void main(final String[] args) {
(new ThroughputBenchmark()).run();
}
}
Latency Benchmark
Description
This sample is intended for measuring message transfer latency.
Usage
- Run the sample:
- win: LatencyBenchmark.bat
- linux: LatencyBenchmark.sh
- Clean everything:
- win: clean.bat
- linux: clean.sh
Source Code
import biz.onixs.cme.ilink3.handler.ApplicationLayerMsgType;
import biz.onixs.cme.ilink3.handler.CME;
import biz.onixs.cme.ilink3.handler.Handler;
import biz.onixs.cme.ilink3.handler.Session;
import biz.onixs.cme.ilink3.handler.Tag;
import biz.onixs.cme.ilink3.handler.session.InboundApplicationMessageArgs;
import biz.onixs.cme.ilink3.handler.session.InboundApplicationMessageListener;
import biz.onixs.cme.ilink3.handler.session.SessionSettings;
import biz.onixs.cme.ilink3.handler.storage.SessionStorageType;
import biz.onixs.cme.ilink3.testing.Emulator;
import biz.onixs.cme.ilink3.testing.TestUtility;
import biz.onixs.sbe.IMessage;
import biz.onixs.util.ScaledDecimal;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.HdrHistogram.Histogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Arrays;
/**
* Latency Benchmark.
*/
public class LatencyBenchmark implements InboundApplicationMessageListener, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(LatencyBenchmark.class);
private static final String SETTINGS_RESOURCE = "sample/LatencyBenchmark.properties";
private static final Settings settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
private static final long WARM_UP_MESSAGE_NUM = settings.getLong("WarmUpMessageNumber", 40_000L);
private static final long MESSAGE_NUM = settings.getLong("MessageNumber", 200_000L);
private static final long LOG_STAT_NUMBER = settings.getLong("LogStatNumber", 20_000L);
private static final String sessionStorageTypeId = settings.getString("SessionStorageType", "NullStorage");
private final Histogram histogram = new Histogram(1_000_000_000L, 4);
private final int MarketSegmentId = 54;
private Session session = null;
private long departureTime = 0L;
private long warmUpCounter = 0L;
private long benchmarkCounter = 0L;
@Override
public void onInboundApplicationMessage(final Object sender, final InboundApplicationMessageArgs args) {
final long arrivalTime = System.nanoTime();
final IMessage message = args.getMsg();
if (warmUpCounter < WARM_UP_MESSAGE_NUM) {
warmUpCounter++;
if (warmUpCounter == WARM_UP_MESSAGE_NUM) {
LOG.info("Warm-up: {} messages transferred", warmUpCounter);
departureTime = System.nanoTime();
}
session.send(message);
} else {
benchmarkCounter++;
final long latency = (arrivalTime - departureTime) / 4L;
if (0L > latency) {
LOG.warn("Negative latency: {}", latency);
} else {
histogram.recordValue(latency);
}
if (0L == (benchmarkCounter % LOG_STAT_NUMBER)) {
logStat();
}
if (benchmarkCounter == MESSAGE_NUM) {
logHistogram();
} else {
departureTime = System.nanoTime();
session.send(message);
}
}
}
public void run() {
try {
LOG.info("Getting Started Sample");
final Thread threadAcceptor = new Thread(new Acceptor());
final Handler handler = Handler.init(settings);
LOG.info("Starting the Handler...");
threadAcceptor.start();
createSession();
establishConnection();
final IMessage report = createExecutionReportNewOrder(session.getSessionId().getUuid(), 1);
session.send(report);
threadAcceptor.join();
LOG.info("Handler shutdown...");
handler.shutdown();
} catch (final RuntimeException | InterruptedException e) {
LOG.error(e.getMessage(), e);
} finally {
LOG.info("The application is stopped.");
}
}
private void createSession() {
final SessionSettings sessionSettings = new SessionSettings();
sessionSettings.init(settings);
session = new Session(sessionSettings, MarketSegmentId, false,
Handler.getInstance().getStorageRepositoryManager().getStorageType(sessionStorageTypeId));
session.setInboundApplicationMessageListener(this);
}
private void establishConnection() {
final String host = settings.getString("CounterpartyHost");
final int port = settings.getInteger("CounterpartyPort");
LOG.info("Establishing connection to {}:{} ...", host, port);
session.logon(host, port);
}
public IMessage createExecutionReportNewOrder(final long uuid, final long orderId) {
final byte[] buffer = new byte[1024];
Arrays.fill(buffer, (byte) 0);
final IMessage message = session.getDecoder().encode(buffer, 0, buffer.length,
ApplicationLayerMsgType.ExecutionReportNewId);
message.setLong(Tag.UUID, uuid);
message.setBoolean(Tag.PossRetransFlag, false);
message.setLong(Tag.PartyDetailsListReqID, 1);
message.setInt(Tag.SecurityID, 205787);
message.setUnsignedInt(Tag.OrderQty, 1000L);
final ScaledDecimal price = new ScaledDecimal(10L, 1);
message.setDecimal(Tag.Price, price);
message.setUnsignedByte(Tag.Side, CME.SideReq.Sell);
message.setString(Tag.SenderID, "MST");
message.setString(Tag.ClOrdID, "SomeOrderID");
message.setLong(Tag.OrderRequestID, 1L);
message.setString(Tag.Location, "UK");
message.setUnsignedByte(Tag.TimeInForce, CME.TimeInForce.Day);
message.setUnsignedByte(Tag.ManualOrderIndicator, CME.ManualOrdIndReq.Automated);
message.setUnsignedByte(Tag.ExecInst, CME.ExecInst.AON);
message.setUnsignedByte(Tag.ExecutionMode, CME.ExecMode.Aggressive);
return message;
}
private void logHistogram() {
LOG.info("Histogram estimated memory footprint in bytes: {}", histogram.getEstimatedFootprintInBytes());
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
histogram.outputPercentileDistribution(new PrintStream(byteArrayOutputStream), 1.0);
LOG.info("Histogram:\n{}", byteArrayOutputStream);
}
private void logStat() {
LOG.info("Benchmark: {} / {} messages transferred, mean/min latency {} / {} nanos",
benchmarkCounter, MESSAGE_NUM, histogram.getMean(), histogram.getMinValue());
}
public static class Acceptor implements Runnable {
private long counter = 0L;
@Override
public void run() {
final int port = settings.getInteger("CounterpartyPort");
try {
final Emulator emulator = new Emulator(new TestUtility());
emulator.setPort(port);
emulator.acceptConnection();
emulator.acceptNegotiation();
emulator.acceptEstablishment();
while (true) {
final IMessage msg = emulator.receiveMessage();
emulator.send(msg);
counter++;
if ((WARM_UP_MESSAGE_NUM + MESSAGE_NUM) == counter) {
emulator.terminate();
break;
}
}
} catch (final IOException e) {
LOG.error(e.getMessage(), e);
}
}
}
public static void main(final String[] args) {
(new LatencyBenchmark()).run();
}
}
Java CME iLink3 Handler