This sample shows how to connect to all B3 UMDF channels.
Source code
#include <iostream>
#include <Common/Configuring.h>
#include <Common/Options.h>
#include <Common/Utils.h>
#include <Common/Helpers.h>
#include <OnixS/B3/MarketData/UMDF.h>
#include "MyListener.h"
using namespace OnixS::B3::MarketData::UMDF;
using namespace OnixS::B3::MarketData::UMDF::Messaging;
using Sample::MyListener;
typedef shared_ptr<Handler> HandlerPtr;
typedef shared_ptr<OrderBookPool> OrderBookPoolPtr;
typedef std::vector<HandlerPtr> Handlers;
void stopAndDeleteHandlers(Handlers& handlers)
{
for (auto& handler : handlers)
handler->stop(true);
handlers.clear();
}
class Configuration
: public ConnectivityConfiguration
, public FeedConfiguration
{
public:
Configuration(size_t qty, Char** args)
: ConfigurationBase(qty, args)
, ConnectivityConfiguration(qty, args)
, FeedConfiguration(qty, args)
{
}
bool buildBooks() const
{
return exist(options(), "buildBooks");
}
std::string logDir() const
{
return argOrDefault(options(), "logDir", "logs");
}
private:
void showOptions(std::ostream& out) const ONIXS_B3_UMDF_MD_OVERRIDE
{
ConnectivityConfiguration::showOptions(out);
FeedConfiguration::showOptions(out);
out << " --buildBooks" << std::endl
<< "\tBuild order books. " << std::endl
<< std::endl;
out << " --logDir <directory>" << std::endl
<< "\tTells to use the given log directory. " << std::endl
<< std::endl;
}
};
int main(int qty, char** args)
{
clog << "OnixS C++ B3 Binary UMDF Market Data Handler All Channels sample, version " << Handler::version() << '.' << endl << endl;
try
{
manageLinuxSignals();
const Configuration configuration(qty, args);
if (configuration.show())
{
configuration.show(std::cout);
return 0;
}
checkNetworkSettings(configuration);
std::vector<int> channelIds;
HandlerSettings::gatherChannelIds(channelIds, configuration.connectivityFile());
HandlerSettings handlerSettings;
handlerSettings.licenseDirectory = "../../license";
handlerSettings.loggerSettings.logDirectory = configuration.logDir();
handlerSettings.loggerSettings.logLevel = LogLevel::Info;
handlerSettings.networkInterfaceA = configuration.ifA();
handlerSettings.networkInterfaceB = configuration.ifB();
SocketFeedEngine feedEngine;
FeedEngineThreadPoolSettings feedEngineThreadPoolSettings;
feedEngineThreadPoolSettings.threadCount( static_cast<UInt32>(channelIds.size()) / 2 + 1 );
FeedEngineThreadPool pool(feedEngineThreadPoolSettings, &feedEngine);
handlerSettings.feedEngine = &feedEngine;
OrderBookPoolPtr orderBookPool;
if(configuration.buildBooks())
{
OrderBookPoolSettings orderBookPoolSettings;
orderBookPoolSettings.chunksAmount = channelIds.size() * 1000;
orderBookPoolSettings.chunkLength = 128 * 1024;
orderBookPool = make_shared<OrderBookPool>(orderBookPoolSettings);
handlerSettings.orderBookPool = orderBookPool.get();
handlerSettings.buildOrderBooks = true;
}
MyListener myListener;
Handlers handlers;
for (const int& channelId : channelIds)
{
handlerSettings.loggerSettings.logFileNamePrefix = "channel_" + toStr(channelId);
handlerSettings.loadFeeds(channelId, configuration.connectivityFile());
auto handler = make_shared<Handler>(handlerSettings);
handler->registerErrorListener(&myListener);
handler->registerMessageListener(&myListener);
handler->registerWarningListener(&myListener);
handler->registerOrderBookListener(&myListener);
handlers.push_back(handler);
handler->start();
}
#ifdef _WIN32
clog << "Press Enter key to to stop the handlers..." << endl;
waitUntilEnterKey();
#else
clog << "Send SIGINT to to stop the handlers..." << endl;
SignalHelper::waitUntilSignal();
#endif
clog << "Stopping..." << endl;
stopAndDeleteHandlers(handlers);
clog << "Handlers are stopped." << endl;
}
catch (const Exception& ex)
{
cerr << "EXCEPTION: " << ex.what() << endl;
}
catch (const std::exception& ex)
{
cerr << "EXCEPTION: " << ex.what() << endl;
}
catch (...)
{
cerr << "UNKNOWN EXCEPTION" << endl;
}
return 0;
}
MyListener.h:
#include <fstream>
#include <OnixS/B3/MarketData/UMDF/ErrorListener.h>
#include <OnixS/B3/MarketData/UMDF/Messaging.h>
#include <OnixS/B3/MarketData/UMDF/MessageListener.h>
#include <OnixS/B3/MarketData/UMDF/WarningListener.h>
#include <OnixS/B3/MarketData/UMDF/OrderBookListener.h>
namespace Sample
{
using namespace OnixS::B3::MarketData::UMDF;
class MyListener :
public MessageListener,
public WarningListener,
public ErrorListener,
public OrderBookListener
{
public:
MyListener();
void onWarning (const std::string& reason) ONIXS_B3_UMDF_MD_FINAL;
void onError (ErrorCode::Enum code, const std::string& description) ONIXS_B3_UMDF_MD_FINAL;
void onSequenceReset_1(const Messaging::SequenceReset_1, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onSequence_2(const Messaging::Sequence_2, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onEmptyBook_9(const Messaging::EmptyBook_9, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onChannelReset_11(const Messaging::ChannelReset_11, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onSecurityStatus_3(const Messaging::SecurityStatus_3, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onSecurityGroupPhase_10(const Messaging::SecurityGroupPhase_10, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onSecurityDefinition_12(const Messaging::SecurityDefinition_12, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onSnapshotFullRefresh_Header_30(const Messaging::SnapshotFullRefresh_Header_30, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onSnapshotFullRefresh_Orders_MBO_71(const Messaging::SnapshotFullRefresh_Orders_MBO_71, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onNews_5(const Messaging::News_5, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onClosingPrice_17(const Messaging::ClosingPrice_17, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onQuantityBand_21(const Messaging::QuantityBand_21, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onPriceBand_22(const Messaging::PriceBand_22, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onOpeningPrice_15(const Messaging::OpeningPrice_15, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onTheoreticalOpeningPrice_16(const Messaging::TheoreticalOpeningPrice_16, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onAuctionImbalance_19(const Messaging::AuctionImbalance_19, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onHighPrice_24(const Messaging::HighPrice_24, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onLowPrice_25(const Messaging::LowPrice_25, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onLastTradePrice_27(const Messaging::LastTradePrice_27, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onSettlementPrice_28(const Messaging::SettlementPrice_28, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onOpenInterest_29(const Messaging::OpenInterest_29, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onOrder_MBO_50(const Messaging::Order_MBO_50, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onDeleteOrder_MBO_51(const Messaging::DeleteOrder_MBO_51, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onMassDeleteOrders_MBO_52(const Messaging::MassDeleteOrders_MBO_52, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onTrade_53(const Messaging::Trade_53, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onForwardTrade_54(const Messaging::ForwardTrade_54, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onExecutionSummary_55(const Messaging::ExecutionSummary_55, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onExecutionStatistics_56(const Messaging::ExecutionStatistics_56, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onTradeBust_57(const Messaging::TradeBust_57, const DataSource&) ONIXS_B3_UMDF_MD_FINAL;
void onOrderBookChanged(const OrderBook& book, const Messaging::SbeMessage message) ONIXS_B3_UMDF_MD_FINAL;
void onOrderBookUpdated(const OrderBook& book) ONIXS_B3_UMDF_MD_FINAL;
void onOrderBookOutOfDate(const OrderBook& book) ONIXS_B3_UMDF_MD_FINAL;
private:
ofstream marketDataLog_;
};
}
MyListener.cpp:
#include <iostream>
#include "MyListener.h"
#include <OnixS/B3/MarketData/UMDF/OrderBook.h>
namespace Sample
{
MyListener::MyListener()
{
const std::string marketDataLogName = "logs/MarketData.txt";
marketDataLog_.open (marketDataLogName.c_str() );
if (!marketDataLog_)
throw domain_error ("Cannot open " + marketDataLogName);
}
void MyListener::onWarning (const std::string& reason)
{
clog << "Warning occurred. Description: '" << reason << "'" << endl;
}
void MyListener::onError (ErrorCode::Enum code, const std::string& description)
{
clog << "Error occurred, errorCode = " << enumToString (code) << ". Description: '" << description << "'" << endl;
}
void MyListener::onSequenceReset_1(const Messaging::SequenceReset_1 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onSequence_2(const Messaging::Sequence_2 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onEmptyBook_9(const Messaging::EmptyBook_9 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onChannelReset_11(const Messaging::ChannelReset_11 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onSecurityStatus_3(const Messaging::SecurityStatus_3 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onSecurityGroupPhase_10(const Messaging::SecurityGroupPhase_10 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onSecurityDefinition_12(const Messaging::SecurityDefinition_12 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onSnapshotFullRefresh_Header_30(const Messaging::SnapshotFullRefresh_Header_30 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onSnapshotFullRefresh_Orders_MBO_71(const Messaging::SnapshotFullRefresh_Orders_MBO_71 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onNews_5(const Messaging::News_5 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onClosingPrice_17(const Messaging::ClosingPrice_17 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onQuantityBand_21(const Messaging::QuantityBand_21 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onPriceBand_22(const Messaging::PriceBand_22 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onOpeningPrice_15(const Messaging::OpeningPrice_15 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onTheoreticalOpeningPrice_16(const Messaging::TheoreticalOpeningPrice_16 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onAuctionImbalance_19(const Messaging::AuctionImbalance_19 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onHighPrice_24(const Messaging::HighPrice_24 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onLowPrice_25(const Messaging::LowPrice_25 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onLastTradePrice_27(const Messaging::LastTradePrice_27 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onSettlementPrice_28(const Messaging::SettlementPrice_28 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onOpenInterest_29(const Messaging::OpenInterest_29 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onOrder_MBO_50(const Messaging::Order_MBO_50 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onDeleteOrder_MBO_51(const Messaging::DeleteOrder_MBO_51 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onMassDeleteOrders_MBO_52(const Messaging::MassDeleteOrders_MBO_52 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onTrade_53(const Messaging::Trade_53 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onForwardTrade_54(const Messaging::ForwardTrade_54 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onExecutionSummary_55(const Messaging::ExecutionSummary_55 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onExecutionStatistics_56(const Messaging::ExecutionStatistics_56 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onTradeBust_57(const Messaging::TradeBust_57 msg, const DataSource&)
{
marketDataLog_ << msg << endl;
}
void MyListener::onOrderBookChanged(const OrderBook&, const Messaging::SbeMessage)
{
}
void MyListener::onOrderBookUpdated(const OrderBook& book)
{
marketDataLog_ << "Order book updated: " << endl << book.toFormattedString() << endl;
}
void MyListener::onOrderBookOutOfDate(const OrderBook& book)
{
marketDataLog_ << "Order book out of date: " << book.instrumentId() << endl;
}
}