Samples :: Benchmark

Throughput Benchmark

Description

This sample is intended for measuring overall message transfer speed.

Usage

  • Run the sample:
    • win: ThroughputBenchmark.bat
    • linux: ThroughputBenchmark.sh
  • Clean everything:
    • win: clean.bat
    • linux: clean.sh

Source Code

import biz.onixs.cme.ilink3.handler.ApplicationLayerMsgType;
import biz.onixs.cme.ilink3.handler.CME;
import biz.onixs.cme.ilink3.handler.Handler;
import biz.onixs.cme.ilink3.handler.Session;
import biz.onixs.cme.ilink3.handler.Tag;
import biz.onixs.cme.ilink3.handler.session.SessionSettings;
import biz.onixs.cme.ilink3.handler.storage.SessionStorageType;
import biz.onixs.cme.ilink3.testing.Emulator;
import biz.onixs.cme.ilink3.testing.TestUtility;
import biz.onixs.sbe.IMessage;
import biz.onixs.util.PrecisionTimer;
import biz.onixs.util.ScaledDecimal;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;

/**
 * Throughput Benchmark.
 */
public class ThroughputBenchmark implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ThroughputBenchmark.class);
    private static final String SETTINGS_RESOURCE = "sample/ThroughputBenchmark.properties";
    private static final Settings settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
    private static final long WARM_UP_MESSAGE_NUM = settings.getLong("WarmUpMessageNumber", 20_000L);
    private static final long MESSAGE_NUM = settings.getLong("MessageNumber", 100_000L);
    private static final long LOG_STAT_NUMBER = settings.getLong("LogStatNumber", 10_000L);
    private static final String sessionStorageTypeId = settings.getString("SessionStorageType", "NullStorage");
    private final int MarketSegmentId = 54;
    private Session session = null;

    public void run() {
        try {
            LOG.info("Getting Started Sample");
            final Thread threadAcceptor = new Thread(new Acceptor());
            final Handler handler = Handler.init(settings);
            LOG.info("Starting the Handler...");
            threadAcceptor.start();
            createSession();
            establishConnection();
            final IMessage report = createExecutionReportNewOrder(session.getSessionId().getUuid(), 1);
            for (long l = 0L; (WARM_UP_MESSAGE_NUM + MESSAGE_NUM) > l; ++l) {
                session.send(report);
            }
            threadAcceptor.join();
            LOG.info("Handler shutdown...");
            handler.shutdown();
        } catch (final RuntimeException | InterruptedException e) {
            LOG.error(e.getMessage(), e);
        } finally {
            LOG.info("The application is stopped.");
        }
    }

    private void createSession() {
        final SessionSettings sessionSettings = new SessionSettings();
        sessionSettings.init(settings);
        sessionSettings.setKeepAliveInterval(60);
        sessionSettings.setOutputQueueMaxSize(1024 * 1024 * 1024);
        session = new Session(sessionSettings, MarketSegmentId, false,
                Handler.getInstance().getStorageRepositoryManager().getStorageType(sessionStorageTypeId));
    }

    private void establishConnection() {
        final String host = settings.getString("CounterpartyHost");
        final int port = settings.getInteger("CounterpartyPort");
        LOG.info("Establishing connection to {}:{} ...", host, port);
        session.logon(host, port);
    }

    public IMessage createExecutionReportNewOrder(final long uuid, final long orderId) {
        final byte[] buffer = new byte[1024];
        Arrays.fill(buffer, (byte) 0);
        final IMessage message = session.getDecoder().encode(buffer, 0, buffer.length,
                ApplicationLayerMsgType.ExecutionReportNewId);
        message.setLong(Tag.UUID, uuid);
        message.setBoolean(Tag.PossRetransFlag, false);
        message.setLong(Tag.PartyDetailsListReqID, 1);
        message.setInt(Tag.SecurityID, 205787);
        message.setUnsignedInt(Tag.OrderQty, 1000L);
        final ScaledDecimal price = new ScaledDecimal(10L, 1);
        message.setDecimal(Tag.Price, price);
        message.setUnsignedByte(Tag.Side, CME.SideReq.Sell);
        message.setString(Tag.SenderID, "MST");
        message.setString(Tag.ClOrdID, "SomeOrderID");
        message.setLong(Tag.OrderRequestID, 1L);
        message.setString(Tag.Location, "UK");
        message.setUnsignedByte(Tag.TimeInForce, CME.TimeInForce.Day);
        message.setUnsignedByte(Tag.ManualOrderIndicator, CME.ManualOrdIndReq.Automated);
        message.setUnsignedByte(Tag.ExecInst, CME.ExecInst.AON);
        message.setUnsignedByte(Tag.ExecutionMode, CME.ExecMode.Aggressive);
        return message;
    }

    public static class Acceptor implements Runnable {
        private PrecisionTimer timer = null;
        private long warmUpCounter = 0L;
        private long benchmarkCounter = 0L;

        @Override
        public void run() {
            final int port = settings.getInteger("CounterpartyPort");
            try {
                final Emulator emulator = new Emulator(new TestUtility());
                emulator.setPort(port);
                emulator.acceptConnection();
                emulator.acceptNegotiation();
                emulator.acceptEstablishment();
                while (true) {
                    final int msgsReceived = emulator.receiveMessageWithoutDecodingFixedSize(238);
                    for (int i = 0; i < msgsReceived; ++i) {
                        if (WARM_UP_MESSAGE_NUM > warmUpCounter) {
                            warmUpCounter++;
                            if (WARM_UP_MESSAGE_NUM == warmUpCounter) {
                                LOG.info("Warm-up: {} messages transferred", WARM_UP_MESSAGE_NUM);
                                timer = new PrecisionTimer();
                            }
                        } else {
                            benchmarkCounter++;
                            if (0L == (benchmarkCounter % LOG_STAT_NUMBER)) {
                                logStat();
                            }
                            if (0L == benchmarkCounter % 1000) {
                                emulator.sendSequence(1, 0);
                            }
                            if (MESSAGE_NUM == benchmarkCounter) {
                                emulator.terminate();
                                return;
                            }
                        }
                    }
                }
            } catch (final IOException e) {
                LOG.error(e.getMessage(), e);
            }
        }

        private void logStat() {
            timer.update();
            final double speed = timer.getItemsPerSecond(benchmarkCounter);
            final double latency = timer.getNanoSecondsPerItem(benchmarkCounter);
            LOG.info("Benchmark: {} / {} messages transferred, avg throughput {} msgs/sec, avg latency {} nanos",
                    benchmarkCounter, MESSAGE_NUM, speed, latency);
        }
    }

    public static void main(final String[] args) {
        (new ThroughputBenchmark()).run();
    }
}

Latency Benchmark

Description

This sample is intended for measuring message transfer latency.

Usage

  • Run the sample:
    • win: LatencyBenchmark.bat
    • linux: LatencyBenchmark.sh
  • Clean everything:
    • win: clean.bat
    • linux: clean.sh

Source Code

import biz.onixs.cme.ilink3.handler.ApplicationLayerMsgType;
import biz.onixs.cme.ilink3.handler.CME;
import biz.onixs.cme.ilink3.handler.Handler;
import biz.onixs.cme.ilink3.handler.Session;
import biz.onixs.cme.ilink3.handler.Tag;
import biz.onixs.cme.ilink3.handler.session.InboundApplicationMessageArgs;
import biz.onixs.cme.ilink3.handler.session.InboundApplicationMessageListener;
import biz.onixs.cme.ilink3.handler.session.SessionSettings;
import biz.onixs.cme.ilink3.handler.storage.SessionStorageType;
import biz.onixs.cme.ilink3.testing.Emulator;
import biz.onixs.cme.ilink3.testing.TestUtility;
import biz.onixs.sbe.IMessage;
import biz.onixs.util.ScaledDecimal;
import biz.onixs.util.settings.PropertyBasedSettings;
import biz.onixs.util.settings.Settings;
import org.HdrHistogram.Histogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Arrays;

/**
 * Latency Benchmark.
 */
public class LatencyBenchmark implements InboundApplicationMessageListener, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(LatencyBenchmark.class);
    private static final String SETTINGS_RESOURCE = "sample/LatencyBenchmark.properties";
    private static final Settings settings = new PropertyBasedSettings(SETTINGS_RESOURCE);
    private static final long WARM_UP_MESSAGE_NUM = settings.getLong("WarmUpMessageNumber", 40_000L);
    private static final long MESSAGE_NUM = settings.getLong("MessageNumber", 200_000L);
    private static final long LOG_STAT_NUMBER = settings.getLong("LogStatNumber", 20_000L);
    private static final String sessionStorageTypeId = settings.getString("SessionStorageType", "NullStorage");
    private final Histogram histogram = new Histogram(1_000_000_000L, 4);
    private final int MarketSegmentId = 54;
    private Session session = null;
    private long departureTime = 0L;
    private long warmUpCounter = 0L;
    private long benchmarkCounter = 0L;


    @Override
    public void onInboundApplicationMessage(final Object sender, final InboundApplicationMessageArgs args) {
        final long arrivalTime = System.nanoTime();
        final IMessage message = args.getMsg();
        if (warmUpCounter < WARM_UP_MESSAGE_NUM) {
            warmUpCounter++;
            if (warmUpCounter == WARM_UP_MESSAGE_NUM) {
                LOG.info("Warm-up: {} messages transferred", warmUpCounter);
                departureTime = System.nanoTime();
            }
            session.send(message);
        } else {
            benchmarkCounter++;
            final long latency = (arrivalTime - departureTime) / 4L;
            if (0L > latency) {
                LOG.warn("Negative latency: {}", latency);
            } else {
                histogram.recordValue(latency);
            }
            if (0L == (benchmarkCounter % LOG_STAT_NUMBER)) {
                logStat();
            }
            if (benchmarkCounter == MESSAGE_NUM) {
                logHistogram();
            } else {
                departureTime = System.nanoTime();
                session.send(message);
            }
        }
    }

    public void run() {
        try {
            LOG.info("Getting Started Sample");
            final Thread threadAcceptor = new Thread(new Acceptor());
            final Handler handler = Handler.init(settings);
            LOG.info("Starting the Handler...");
            threadAcceptor.start();
            createSession();
            establishConnection();
            final IMessage report = createExecutionReportNewOrder(session.getSessionId().getUuid(), 1);
            session.send(report);
            threadAcceptor.join();
            LOG.info("Handler shutdown...");
            handler.shutdown();
        } catch (final RuntimeException | InterruptedException e) {
            LOG.error(e.getMessage(), e);
        } finally {
            LOG.info("The application is stopped.");
        }
    }

    private void createSession() {
        final SessionSettings sessionSettings = new SessionSettings();
        sessionSettings.init(settings);
        session = new Session(sessionSettings, MarketSegmentId, false,
                Handler.getInstance().getStorageRepositoryManager().getStorageType(sessionStorageTypeId));
        session.setInboundApplicationMessageListener(this);
    }

    private void establishConnection() {
        final String host = settings.getString("CounterpartyHost");
        final int port = settings.getInteger("CounterpartyPort");
        LOG.info("Establishing connection to {}:{} ...", host, port);
        session.logon(host, port);
    }

    public IMessage createExecutionReportNewOrder(final long uuid, final long orderId) {
        final byte[] buffer = new byte[1024];
        Arrays.fill(buffer, (byte) 0);
        final IMessage message = session.getDecoder().encode(buffer, 0, buffer.length,
                ApplicationLayerMsgType.ExecutionReportNewId);
        message.setLong(Tag.UUID, uuid);
        message.setBoolean(Tag.PossRetransFlag, false);
        message.setLong(Tag.PartyDetailsListReqID, 1);
        message.setInt(Tag.SecurityID, 205787);
        message.setUnsignedInt(Tag.OrderQty, 1000L);
        final ScaledDecimal price = new ScaledDecimal(10L, 1);
        message.setDecimal(Tag.Price, price);
        message.setUnsignedByte(Tag.Side, CME.SideReq.Sell);
        message.setString(Tag.SenderID, "MST");
        message.setString(Tag.ClOrdID, "SomeOrderID");
        message.setLong(Tag.OrderRequestID, 1L);
        message.setString(Tag.Location, "UK");
        message.setUnsignedByte(Tag.TimeInForce, CME.TimeInForce.Day);
        message.setUnsignedByte(Tag.ManualOrderIndicator, CME.ManualOrdIndReq.Automated);
        message.setUnsignedByte(Tag.ExecInst, CME.ExecInst.AON);
        message.setUnsignedByte(Tag.ExecutionMode, CME.ExecMode.Aggressive);
        return message;
    }

    private void logHistogram() {
        LOG.info("Histogram estimated memory footprint in bytes: {}", histogram.getEstimatedFootprintInBytes());
        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        histogram.outputPercentileDistribution(new PrintStream(byteArrayOutputStream), 1.0);
        LOG.info("Histogram:\n{}", byteArrayOutputStream);
    }

    private void logStat() {
        LOG.info("Benchmark: {} / {} messages transferred, mean/min latency {} / {} nanos",
                benchmarkCounter, MESSAGE_NUM, histogram.getMean(), histogram.getMinValue());
    }

    public static class Acceptor implements Runnable {
        private long counter = 0L;

        @Override
        public void run() {
            final int port = settings.getInteger("CounterpartyPort");
            try {
                final Emulator emulator = new Emulator(new TestUtility());
                emulator.setPort(port);
                emulator.acceptConnection();
                emulator.acceptNegotiation();
                emulator.acceptEstablishment();
                while (true) {
                    final IMessage msg = emulator.receiveMessage();
                    emulator.send(msg);
                    counter++;
                    if ((WARM_UP_MESSAGE_NUM + MESSAGE_NUM) == counter) {
                        emulator.terminate();
                        break;
                    }
                }
            } catch (final IOException e) {
                LOG.error(e.getMessage(), e);
            }
        }
    }

    public static void main(final String[] args) {
        (new LatencyBenchmark()).run();
    }
}