OnixS C++ CME MDP Premium Market Data Handler  5.8.3
API Documentation
Advanced Sample

This sample shows the advanced techniques of using the Handler.

Source code

using namespace OnixS::CME::MDH;
// Aliases for code clarity.
typedef std::vector<ChannelId> ChannelIds;
// Application command-line interface.
class Configuration
: public ConnectivityConfiguration
, public FeedConfiguration
, public SessionConfiguration
, public BookManagementConfiguration
, public LoggerConfiguration
{
public:
Configuration(size_t qty, char** args)
: ConfigurationBase(qty, args)
, ConnectivityConfiguration(qty, args)
, FeedConfiguration(qty, args)
, SessionConfiguration(qty, args)
, BookManagementConfiguration(qty, args)
, LoggerConfiguration(qty, args, "LogFileForAllHandlers.txt")
{
}
bool channels(ChannelIds& ids) const
{
return args<ChannelId>(options(), "channels", std::back_inserter(ids), 1);
}
private:
void showOptions(std::ostream& out) const ONIXS_CMEMDH_OVERRIDE
{
out << " --channels id1 [id2 [id3 ..]]" << std::endl
<< "\tDefines channels whose market data is to be processed. " << std::endl
<< "\tChannel 310 is used if nothing specified. " << std::endl
<< std::endl;
SessionConfiguration::showOptions(out);
BookManagementConfiguration::showOptions(out);
ConnectivityConfiguration::showOptions(out);
FeedConfiguration::showOptions(out);
LoggerConfiguration::showOptions(out);
}
};
// Couple of utility classes to make code simpler to read.
struct HandlerStarter
{
void operator()(Handler& handler)
{
handler.start();
}
};
struct HandlerStopper
{
void operator()(Handler& handler)
{
handler.stop();
}
};
class Application
{
public:
/// Constructs handlers for the given channels.
Application(const Configuration& configuration)
: configuration_(configuration)
, handlers_(ONIXS_CMEMDH_NULLPTR)
, handlerQty_(0)
{
ChannelIds channels;
if (!configuration_.channels(channels))
channels.push_back(310);
try
{
logger_.reset(constructFileLogger(configuration_, &loggerTracer_));
const UInt32 workingThreads = ((10 > channels.size()) ? 1 : ((20 > channels.size()) ? 2 : 4));
feedEngine_.reset(constructFeedEngine());
workingThreads_.reset(constructFeedEngineThreads(*feedEngine_, workingThreads, &threadTracer_));
tcpRecovery_.reset(constructTcpRecovery(configuration_));
constructHandlers(channels);
}
catch (...)
{
destructEverything();
throw;
}
}
~Application()
{
destructEverything();
}
// Let’s start the market data processing and wait until the user interrupts it.
void run()
{
std::cout << "Starting processing market data.. " << std::endl
<< "Press Ctrl+C to stop and exit. " << std::endl;
Handler* const endOfHandlers = handlers_ + handlerQty_;
std::for_each(handlers_, endOfHandlers, HandlerStarter());
std::cout << std::endl << std::endl;
while (!InterruptDetector::instance().detected())
{
CurrentThread::sleep(1);
}
std::cout << "Stopping processing market data.. " << std::endl;
std::for_each(handlers_, endOfHandlers, HandlerStopper());
std::cout << std::endl << "Done. " << std::endl << std::endl;
}
// A bit of information on the sample.
static void identify()
{
std::cout << "Advanced Sample for CME MDP Premium Market Data Handler, v" << toStr(Version::current()) << ". "
<< std::endl
<< std::endl;
}
private:
void destructEverything()
{
delete[] handlers_;
tcpRecovery_.reset();
workingThreads_.reset();
feedEngine_.reset();
logger_.reset();
}
// Constructs handlers for the given channels.
void constructHandlers(const ChannelIds& channels)
{
handlers_ = new Handler[channels.size()]();
handlerQty_ = 0;
for (ChannelIds::const_iterator channel = channels.begin(); channel != channels.end(); ++channel, ++handlerQty_)
{
Handler& handler = handlers_[handlerQty_];
HandlerSettings& settings = handler.settings();
setLicense(settings);
settings.channel(*channel).connectivityConfigurationFile(configuration_.connectivityFile());
// Engine & Connectivity.
settings.feeds().engine(feedEngine_.get());
// Additional steps on feeds.
apply(settings.feeds(), configuration_);
// Processing flow.
apply(settings.session(), configuration_, tcpRecovery_.get());
// Books.
apply(settings.bookManagement(), configuration_);
// Logging.
apply(settings.logging(), configuration_, logger_.get());
// Events.
securityTracer_.bind(handler);
marketDataTracer_.bind(handler);
serviceTracer_.bind(handler);
}
}
const Configuration& configuration_;
ScopedPtr<Logger> logger_;
ScopedPtr<NetFeedEngine> feedEngine_;
ScopedPtr<FeedEngineThreadPool> workingThreads_;
ScopedPtr<TcpRecoveryService> tcpRecovery_;
Handler* handlers_;
size_t handlerQty_;
SecurityEventTracer securityTracer_;
MarketDataEventTracer marketDataTracer_;
ServiceEventTracer serviceTracer_;
FeedEngineThreadPoolEventTracer threadTracer_;
FileLoggerEventTracer loggerTracer_;
// Prohibits copy construction.
Application(const Application&);
// No re-assignments for this time.
Application& operator=(const Application&);
};
int main(int qty, char** args)
{
try
{
Application::identify();
Configuration configuration(qty, args);
if (configuration.show())
{
configuration.show(std::cout);
}
else
{
checkNetworkSettings(configuration);
Application(configuration).run();
}
return 0;
}
catch (const std::exception& ex)
{
std::cerr << std::endl << "ERROR: " << ex.what() << std::endl;
return 1;
}
}