OnixS C++ CME MDP Premium Market Data Handler 5.9.0
Users' manual and API documentation
Loading...
Searching...
No Matches
Advanced Sample

This sample shows the advanced techniques of using the Handler.

using namespace OnixS::CME::MDH;
// Aliases for code clarity.
typedef std::vector<ChannelId> ChannelIds;
// Application command-line interface.
class Configuration
: public ConnectivityConfiguration
, public FeedConfiguration
, public SessionConfiguration
, public BookManagementConfiguration
, public LoggerConfiguration
{
public:
Configuration(size_t qty, char** args)
: ConfigurationBase(qty, args)
, ConnectivityConfiguration(qty, args)
, FeedConfiguration(qty, args)
, SessionConfiguration(qty, args)
, BookManagementConfiguration(qty, args)
, LoggerConfiguration(qty, args, "LogFileForAllHandlers.txt")
{
}
bool channels(ChannelIds& ids) const
{
return args<ChannelId>(options(), "channels", std::back_inserter(ids), 1);
}
private:
void showOptions(std::ostream& out) const ONIXS_CMEMDH_OVERRIDE
{
out << " --channels id1 [id2 [id3 ..]]" << std::endl
<< "\tDefines channels whose market data is to be processed. " << std::endl
<< "\tChannel 310 is used if nothing specified. " << std::endl
<< std::endl;
SessionConfiguration::showOptions(out);
BookManagementConfiguration::showOptions(out);
ConnectivityConfiguration::showOptions(out);
FeedConfiguration::showOptions(out);
LoggerConfiguration::showOptions(out);
}
};
// Couple of utility classes to make code simpler to read.
struct HandlerStarter
{
void operator()(Handler& handler)
{
handler.start();
}
};
struct HandlerStopper
{
void operator()(Handler& handler)
{
handler.stop();
}
};
class Application
{
public:
Application(const Configuration& configuration)
: configuration_(configuration)
, handlers_(ONIXS_CMEMDH_NULLPTR)
, handlerQty_(0)
{
ChannelIds channels;
if (!configuration_.channels(channels))
channels.push_back(310);
try
{
logger_.reset(constructFileLogger(configuration_, &loggerTracer_));
const UInt32 workingThreads = ((10 > channels.size()) ? 1 : ((20 > channels.size()) ? 2 : 4));
feedEngine_.reset(constructFeedEngine());
workingThreads_.reset(constructFeedEngineThreads(*feedEngine_, workingThreads, &threadTracer_));
tcpRecovery_.reset(constructTcpRecovery(configuration_));
constructHandlers(channels);
}
catch (...)
{
destructEverything();
throw;
}
}
~Application()
{
destructEverything();
}
// Let’s start the market data processing and wait until the user interrupts it.
void run()
{
std::cout << "Starting processing market data.. " << std::endl
<< "Press Ctrl+C to stop and exit. " << std::endl;
Handler* const endOfHandlers = handlers_ + handlerQty_;
std::for_each(handlers_, endOfHandlers, HandlerStarter());
std::cout << std::endl << std::endl;
while (!InterruptDetector::instance().detected())
{
CurrentThread::sleep(1);
}
std::cout << "Stopping processing market data.. " << std::endl;
std::for_each(handlers_, endOfHandlers, HandlerStopper());
std::cout << std::endl << "Done. " << std::endl << std::endl;
}
// A bit of information on the sample.
static void identify()
{
std::cout << "Advanced Sample for CME MDP Premium Market Data Handler, v" << toStr(Version::current()) << ". "
<< std::endl
<< std::endl;
}
private:
void destructEverything()
{
delete[] handlers_;
tcpRecovery_.reset();
workingThreads_.reset();
feedEngine_.reset();
logger_.reset();
}
// Constructs handlers for the given channels.
void constructHandlers(const ChannelIds& channels)
{
handlers_ = new Handler[channels.size()]();
handlerQty_ = 0;
for (ChannelIds::const_iterator channel = channels.begin(); channel != channels.end(); ++channel, ++handlerQty_)
{
Handler& handler = handlers_[handlerQty_];
HandlerSettings& settings = handler.settings();
setLicense(settings);
settings.channel(*channel).connectivityConfigurationFile(configuration_.connectivityFile());
// Engine & Connectivity.
settings.feeds().engine(feedEngine_.get());
// Additional steps on feeds.
apply(settings.feeds(), configuration_);
// Processing flow.
apply(settings.session(), configuration_, tcpRecovery_.get());
// Books.
apply(settings.bookManagement(), configuration_);
// Logging.
apply(settings.logging(), configuration_, logger_.get());
// Events.
securityTracer_.bind(handler);
marketDataTracer_.bind(handler);
serviceTracer_.bind(handler);
}
}
const Configuration& configuration_;
ScopedPtr<Logger> logger_;
ScopedPtr<NetFeedEngine> feedEngine_;
ScopedPtr<FeedEngineThreadPool> workingThreads_;
ScopedPtr<TcpRecoveryService> tcpRecovery_;
Handler* handlers_;
size_t handlerQty_;
SecurityEventTracer securityTracer_;
MarketDataEventTracer marketDataTracer_;
ServiceEventTracer serviceTracer_;
FeedEngineThreadPoolEventTracer threadTracer_;
FileLoggerEventTracer loggerTracer_;
// Prohibits copy construction.
Application(const Application&);
// No re-assignments for this time.
Application& operator=(const Application&);
};
int main(int qty, char** args)
{
try
{
Application::identify();
Configuration configuration(qty, args);
if (configuration.show())
{
configuration.show(std::cout);
}
else
{
checkNetworkSettings(configuration);
Application(configuration).run();
}
return 0;
}
catch (const std::exception& ex)
{
std::cerr << std::endl << "ERROR: " << ex.what() << std::endl;
return 1;
}
}