OnixS C++ CME MDP Premium Market Data Handler  5.8.9
API Documentation
FixMessaging Sample

This sample demonstetes how to use FIX-like messaging.

Source code

using namespace OnixS::CME::MDH;
// Command-line parameters affecting behavior of the sample.
class Configuration
: public ChannelConfiguration
, public ConnectivityConfiguration
, public FeedConfiguration
, public LoggerConfiguration
{
public:
Configuration(size_t qty, char** args)
: ConfigurationBase(qty, args)
, ChannelConfiguration(qty, args, 310)
, ConnectivityConfiguration(qty, args)
, FeedConfiguration(qty, args)
, LoggerConfiguration(qty, args, "FixMessaging.log")
{
}
private:
void showOptions(std::ostream& out) const ONIXS_CMEMDH_OVERRIDE
{
ChannelConfiguration::showOptions(out);
ConnectivityConfiguration::showOptions(out);
FeedConfiguration::showOptions(out);
LoggerConfiguration::showOptions(out);
}
};
// FIX message manipulations.
void trace(std::ostream& output, const FIX::Message& message)
{
using namespace OnixS::CME::MDH::FIX;
if (message.type() == "X")
{
const Field transact = message[Tags::TransactTime];
const Field indicator = message[Tags::MatchEventIndicator];
// Any time-stamp field can be successfully converted into
// unsigned 64-bit integer representing ticks since the epoch.
// Bit-set fields can be accessed either as raw integers or
// as structured entities as they are exposed by SBE messages.
output << "Transact time: " << toStr(transact.cast<Timestamp>()) << "." << std::endl
<< "Transact time (Ticks from Epoch): " << toStr(transact.cast<UInt64>()) << "." << std::endl
<< "Event Indicator: " << toStr(indicator.cast<MatchEventIndicator>()) << "." << std::endl
<< "Event Indicator (Raw Numeric Value): " << toStr(indicator.cast<UInt32>()) << "." << std::endl;
const Group entries = message.group(Tags::NoMDEntries);
if (entries)
{
for (Group::Size index = 0; index != entries.size(); ++index)
{
const GroupEntry entry = entries[index];
std::cout << "[" << index << "] {Price=";
if (const Field px = entry[Tags::MDEntryPx])
{
std::cout << toStr(px.cast<Decimal>());
}
Int32 size;
// If side is not available then
// the casting operation will fail.
if (entry[Tags::MDEntrySize].tryCast(size))
{
std::cout << ",Size=" << size;
}
const Field side = entry[Tags::MDEntryType];
std::cout << ",Side=" << toStr(side.cast<EntryType>())
<< ",Side(Raw)=" << toStr(side.cast<EntryType::Base>());
const Field action = entry[Tags::MDUpdateAction];
// In addition to accessing the field value in a raw FIX
// format, it's possible to cast the value to the corresponding
// enumeration type. In such case, the field value can be
// compared with enumeration constants.
std::cout << ",Action=" << toStr(action.cast<UpdateAction>())
<< ",Action(Raw)=" << toStr(action.cast<UpdateAction::Base>());
std::cout << "}" << std::endl;
}
}
}
else if (message.type() == "d")
{
if (message[Tags::MaturityMonthYear].tryCast(mmy))
{
std::cout << "Maturity Month-Year for $" << securityId << ": " << toStr(mmy) << std::endl;
}
}
}
//
class FixMessageTracer : FIX::MessageListener
{
public:
void bind(Handler& handler)
{
handler.settings().listeners().marketData(this);
}
private:
void onMessage(Handler&, const FIX::MessageArgs& args) ONIXS_CMEMDH_OVERRIDE
{
trace(std::cout, args.message());
}
};
// Traces issues from various sources (Handler, Feed Engine, Logger, etc.).
class IssueTracer
: public HandlerListener
{
public:
IssueTracer()
: out_(std::cout)
{
}
IssueTracer(std::ostream& output)
: out_(output)
{
}
~IssueTracer() {}
void bind(Handler& handler)
{
handler.settings().listeners().handler(this);
}
private:
// Handler issues.
void onWarning(Handler& handler, const WarningArgs& warning) ONIXS_CMEMDH_OVERRIDE
{
std::stringstream message;
message << '[' << handler.settings().channel() << "] WARNING: " << warning << std::endl;
out_ << message.str();
}
void onError(Handler& handler, const ErrorArgs& error) ONIXS_CMEMDH_OVERRIDE
{
std::stringstream message;
message << '[' << handler.settings().channel() << "] ERROR: " << error << std::endl;
out_ << message.str();
}
// Feed Engine issues.
void onFeedEngineThreadIssue(const FeedEngineThreadPool&, const Char* issue) ONIXS_CMEMDH_OVERRIDE
{
std::stringstream message;
message << "ALERT!!! "
"Issue occurred in working thread. "
<< issue << std::endl;
out_ << message.str();
}
// File Logger issues.
void onFileLoggerIssue(const FileLogger&, const Char* issue) ONIXS_CMEMDH_OVERRIDE
{
std::stringstream message;
message << "ALERT!!! "
"Issue occurred while logging data. "
<< issue << std::endl;
out_ << message.str();
}
std::ostream& out_;
};
class Application
{
public:
// Constructs handler for the given channel.
Application(const Configuration& configuration)
: configuration_(configuration)
{
try
{
logger_.reset(constructFileLogger(configuration_, &issueTracer_));
feedEngine_.reset(constructFeedEngine());
handler_.reset(new Handler());
HandlerSettings& settings = handler_->settings();
setLicense(settings);
settings.channel(configuration_.channel()).connectivityConfigurationFile(configuration_.connectivityFile());
settings.feeds().engine(feedEngine_.get());
apply(settings.feeds(), configuration);
applyDefaults(settings.session());
apply(settings.logging(), configuration, logger_.get());
fixMessageTracer_.bind(*handler_);
issueTracer_.bind(*handler_);
}
catch (...)
{
handler_.reset();
feedEngine_.reset();
logger_.reset();
throw;
}
}
// Utilizes all the handlers.
~Application() {}
// Spawns market data processing and waits until user interrupts it.
void run()
{
std::cout << "Starting processing market data.. " << std::endl << "Press Ctrl+C to stop exit. " << std::endl;
handler_->start();
while (!InterruptDetector::instance().detected())
{
try
{
feedEngine_->process();
}
catch (const std::exception& ex)
{
std::cerr << std::endl
<< "WARNING! Feed engine raised an "
"issue while processing incoming data. "
<< ex.what() << std::endl;
}
}
std::cout << "Stopping processing market data.. " << std::endl;
handler_->stop();
std::cout << std::endl << std::endl << "Done. " << std::endl << std::endl;
}
static void identify()
{
std::cout << "FIX Messaging for CME MDP Premium Market Data Handler, v" << toStr(Version::current()) << ". "
<< std::endl
<< std::endl;
}
private:
const Configuration& configuration_;
ScopedPtr<Logger> logger_;
ScopedPtr<NetFeedEngine> feedEngine_;
ScopedPtr<Handler> handler_;
FixMessageTracer fixMessageTracer_;
IssueTracer issueTracer_;
// Prohibits copy construction.
Application(const Application&);
// No re-assignments for this time.
Application& operator=(const Application&);
};
int main(int qty, char** args)
{
try
{
Application::identify();
const Configuration configuration(qty, args);
if (configuration.show())
{
configuration.show(std::cout);
}
else
{
checkNetworkSettings(configuration);
Application app(configuration);
app.run();
}
return 0;
}
catch (const std::exception& ex)
{
std::cerr << std::endl << "ERROR: " << ex.what() << std::endl;
return 1;
}
}