OnixS C++ CME MDP Premium Market Data Handler  5.8.9
API Documentation
Replay Sample

This sample demonstetes how to use replay functional of the Handler.

Source code

using namespace OnixS::CME::MDH;
// Options.
class Configuration
: public ChannelConfiguration
, public ConnectivityConfiguration
, public LoggerConfiguration
{
public:
Configuration(size_t qty, char** args)
: ConfigurationBase(qty, args)
, ChannelConfiguration(qty, args, 310)
, ConnectivityConfiguration(qty, args)
, LoggerConfiguration(qty, args, "Replay.log")
{
const unsigned optionQty = countOption("log") + countOption("pcap") + countOption("datamine");
if (1 < optionQty)
{
throw std::runtime_error("Cannot initialize market data replay "
"due to conflict in the input parameters. "
"Multiple replay sources (Log, PCAP, DataMine) "
"selected at the same time. Please, fix the command "
"line arguments to use only one type of market data "
"source (either log or PCAP or DataMine). ");
}
}
bool replayPcap() const
{
return exist(options(), "pcap");
}
bool replayDatamine() const
{
return exist(options(), "datamine");
}
bool filesToReplay(FileList& files) const
{
const OptionArgs* args = options().args("log");
if (!args)
{
args = options().args("pcap");
if (!args)
{
args = options().args("datamine");
}
}
if (args)
{
std::copy(args->begin(), args->end(), std::back_inserter(files));
return true;
}
return false;
}
private:
unsigned countOption(const char* name) const
{
assert(ONIXS_CMEMDH_NULLPTR != name);
return exist(options(), name) ? 1 : 0;
}
void showOptions(std::ostream& out) const ONIXS_CMEMDH_OVERRIDE
{
out << " --log [file1 [file2 [..]]]" << std::endl
<< "\tTells to replay the given list of log files. " << std::endl
<< "\tIf no files are specified, sample data is replayed. " << std::endl
<< std::endl
<< " --pcap [file1 [file2 [..]]]" << std::endl
<< "\tTells to replay the given list of PCAP files. " << std::endl
<< "\tIf no files are specified, sample data is replayed. " << std::endl
<< std::endl
<< " --datamine [file1 [file2 [..]]]" << std::endl
<< "\tTells to replay the given list of CME DataMine (historical) files. " << std::endl
<< "\tIf no files are specified, sample data is replayed. " << std::endl
<< std::endl;
ChannelConfiguration::showOptions(out);
ConnectivityConfiguration::showOptions(out);
LoggerConfiguration::showOptions(out);
}
};
class ReplayManager
{
public:
virtual ~ReplayManager() {}
virtual void gather(FileList&, const std::string&, ChannelId) = 0;
virtual void configure(HandlerSettings&) = 0;
virtual void replay(const FileList&, Handler&) = 0;
protected:
ReplayManager() {}
};
struct LogReplayManager : ReplayManager
{
LogReplayManager() {}
void gather(FileList& list, const std::string& folder, ChannelId channel) ONIXS_CMEMDH_OVERRIDE
{
gatherLogFiles(list, channel, folder);
}
void configure(HandlerSettings& /*settings*/) ONIXS_CMEMDH_OVERRIDE
{
// In the most common way, the Handler grabs all required
// parameters from the log files it replays. If the data
// has to be replayed with the different session settings,
// this is the right place to update Handler's parameters.
// However, in such case, the replay must be configured
// additionally.
}
void replay(const FileList& files, Handler& handler) ONIXS_CMEMDH_OVERRIDE
{
replayLogFiles(files, handler);
}
};
class PcapReplayManager : public ReplayManager
{
public:
PcapReplayManager() {}
NetAddressAliases& aliases()
{
return supplements_.aliases();
}
void gather(FileList& files, const std::string& folder, ChannelId) ONIXS_CMEMDH_OVERRIDE
{
gatherFiles(files, folder, ".pcap");
}
void configure(HandlerSettings& settings) ONIXS_CMEMDH_OVERRIDE
{
}
void replay(const FileList& files, Handler& handler) ONIXS_CMEMDH_OVERRIDE
{
if (supplements_.aliases().empty())
{
replayPcapFiles(files, handler);
}
else
{
replayPcapFiles(files, handler, supplements_);
}
}
private:
PcapReplaySettings supplements_;
};
class DatamineReplayManager : public ReplayManager
{
public:
explicit DatamineReplayManager(bool useInstrumentCache)
: instruments_(useInstrumentCache ? "data/SecDef-310.dat" : "")
{
}
void gather(FileList& files, const std::string& folder, ChannelId) ONIXS_CMEMDH_OVERRIDE
{
gatherFiles(files, folder, ".gz");
}
void configure(HandlerSettings& settings) ONIXS_CMEMDH_OVERRIDE
{
settings.session().joinRecovery(
);
settings.instrumentCache(instruments_);
}
void replay(const FileList& files, Handler& handler) ONIXS_CMEMDH_OVERRIDE
{
replayDatamineFiles(files, handler);
}
private:
std::string instruments_;
};
// Spawns market data processing from the previously recorded log file.
class Application
{
public:
// Initializes the sample according to the arguments.
Application(const Configuration& configuration)
: configuration_(configuration)
{
// Configures replay manager for the
// selected data (either log or PCAP).
configuration_.filesToReplay(files_);
if (configuration_.replayPcap())
{
ScopedPtr<PcapReplayManager> pcapManager(new PcapReplayManager());
if (files_.empty())
{
// The Handler is configured to use connectivity
// configuration for NRCert environment, whereas
// recorded packets belong to production environment.
// The following steps force the PCAP replay machinery
// to map data from production incremental feed to the
// certification one.
pcapManager->aliases()[NetFeedConnection("224.0.31.1", 14310)] = NetFeedConnection("224.0.28.1", 14310);
}
replayManager_.reset(pcapManager.release());
}
else if (configuration_.replayDatamine())
{
replayManager_.reset(new DatamineReplayManager(files_.empty()));
}
else
{
replayManager_.reset(new LogReplayManager());
}
if (files_.empty())
{
replayManager_->gather(files_, "data", configuration_.channel());
}
}
// Allows instances to be destroyed.
~Application() {}
// Spawns market data processing from
// the previously recorded log file.
void run()
{
// Even in replay, handler still logs its events.
FileLoggerEventTracer loggerTracer;
const ScopedPtr<Logger> logger(constructFileLogger(configuration_, &loggerTracer));
// Constructs handler for replay in regular way.
Handler handler;
// A bit of configuring for the instance.
HandlerSettings& settings = handler.settings();
setLicense(settings);
settings.channel(configuration_.channel()).connectivityConfigurationFile(configuration_.connectivityFile());;
settings.bookManagement().mboBooks().maintain(true);
// Additional turn-up.
replayManager_->configure(settings);
// Binds the given logger to the Handler.
apply(settings.logging(), configuration_, logger.get());
// Associates events processing with handler.
SecurityEventTracer securityEventTracer;
securityEventTracer.bind(handler);
MarketDataEventTracer marketDataTracer;
marketDataTracer.bind(handler);
ServiceEventTracer serviceTracer;
serviceTracer.bind(handler);
std::cout << "Replaying the recorded market data.. " << std::endl;
replayManager_->replay(files_, handler);
std::cout << std::endl << "Done. " << std::endl << std::endl;
}
// A bit of information on the sample.
static void identify()
{
std::cout << "Market Data Replay for the CME MDP Premium Market Data Handler, v" << toStr(Version::current())
<< ". "
<< std::endl
<< std::endl;
}
private:
const Configuration& configuration_;
ScopedPtr<ReplayManager> replayManager_;
FileList files_;
// Prohibits copy construction.
Application(const Application&);
// No re-assignments for this time.
Application& operator=(const Application&);
};
int main(int qty, char** args)
{
try
{
Application::identify();
const Configuration configuration(qty, args);
if (configuration.show())
{
configuration.show(std::cout);
}
else
{
Application(configuration).run();
}
return 0;
}
catch (const std::exception& ex)
{
std::cerr << std::endl << "ERROR: " << ex.what() << std::endl;
return 1;
}
}