OnixS C++ CME MDP Premium Market Data Handler 5.9.0
API Documentation
Loading...
Searching...
No Matches
Replay Sample

This sample demonstetes how to use replay functional of the Handler.

Source code

using namespace OnixS::CME::MDH;
// Options.
class Configuration
: public ChannelConfiguration
, public ConnectivityConfiguration
, public LoggerConfiguration
{
public:
Configuration(size_t qty, char** args)
: ConfigurationBase(qty, args)
, ChannelConfiguration(qty, args, 310)
, ConnectivityConfiguration(qty, args)
, LoggerConfiguration(qty, args, "Replay.log")
{
const unsigned optionQty = countOption("log") + countOption("pcap") + countOption("datamine");
if (1 < optionQty)
{
throw std::runtime_error("Cannot initialize market data replay "
"due to conflict in the input parameters. "
"Multiple replay sources (Log, PCAP, DataMine) "
"selected at the same time. Please, fix the command "
"line arguments to use only one type of market data "
"source (either log or PCAP or DataMine). ");
}
}
bool replayPcap() const
{
return exist(options(), "pcap");
}
bool replayDatamine() const
{
return exist(options(), "datamine");
}
bool filesToReplay(FileList& files) const
{
const OptionArgs* args = options().args("log");
if (!args)
{
args = options().args("pcap");
if (!args)
{
args = options().args("datamine");
}
}
if (args)
{
std::copy(args->begin(), args->end(), std::back_inserter(files));
return true;
}
return false;
}
private:
unsigned countOption(const char* name) const
{
assert(ONIXS_CMEMDH_NULLPTR != name);
return exist(options(), name) ? 1 : 0;
}
void showOptions(std::ostream& out) const ONIXS_CMEMDH_OVERRIDE
{
out << " --log [file1 [file2 [..]]]" << std::endl
<< "\tTells to replay the given list of log files. " << std::endl
<< "\tIf no files are specified, sample data is replayed. " << std::endl
<< std::endl
<< " --pcap [file1 [file2 [..]]]" << std::endl
<< "\tTells to replay the given list of PCAP files. " << std::endl
<< "\tIf no files are specified, sample data is replayed. " << std::endl
<< std::endl
<< " --datamine [file1 [file2 [..]]]" << std::endl
<< "\tTells to replay the given list of CME DataMine (historical) files. " << std::endl
<< "\tIf no files are specified, sample data is replayed. " << std::endl
<< std::endl;
ChannelConfiguration::showOptions(out);
ConnectivityConfiguration::showOptions(out);
LoggerConfiguration::showOptions(out);
}
};
class ReplayManager
{
public:
virtual ~ReplayManager() {}
virtual void gather(FileList&, const std::string&, ChannelId) = 0;
virtual void configure(HandlerSettings&) = 0;
virtual void replay(const FileList&, Handler&) = 0;
protected:
ReplayManager() {}
};
struct LogReplayManager : ReplayManager
{
LogReplayManager() {}
void gather(FileList& list, const std::string& folder, ChannelId channel) ONIXS_CMEMDH_OVERRIDE
{
gatherLogFiles(list, channel, folder);
}
void configure(HandlerSettings& /*settings*/) ONIXS_CMEMDH_OVERRIDE
{
// In the most common way, the Handler grabs all required
// parameters from the log files it replays. If the data
// has to be replayed with the different session settings,
// this is the right place to update Handler's parameters.
// However, in such case, the replay must be configured
// additionally.
}
void replay(const FileList& files, Handler& handler) ONIXS_CMEMDH_OVERRIDE
{
replayLogFiles(files, handler);
}
};
class PcapReplayManager : public ReplayManager
{
public:
PcapReplayManager() {}
NetAddressAliases& aliases()
{
return supplements_.aliases();
}
void gather(FileList& files, const std::string& folder, ChannelId) ONIXS_CMEMDH_OVERRIDE
{
gatherFiles(files, folder, ".pcap");
}
void configure(HandlerSettings& settings) ONIXS_CMEMDH_OVERRIDE
{
setSessionToNaturalRefresh(settings.session());
}
void replay(const FileList& files, Handler& handler) ONIXS_CMEMDH_OVERRIDE
{
if (supplements_.aliases().empty())
{
replayPcapFiles(files, handler);
}
else
{
replayPcapFiles(files, handler, supplements_);
}
}
private:
PcapReplaySettings supplements_;
};
class DatamineReplayManager : public ReplayManager
{
public:
explicit DatamineReplayManager(bool useInstrumentCache)
: instruments_(useInstrumentCache ? "data/SecDef-310.dat" : "")
{
}
void gather(FileList& files, const std::string& folder, ChannelId) ONIXS_CMEMDH_OVERRIDE
{
gatherFiles(files, folder, ".gz");
}
void configure(HandlerSettings& settings) ONIXS_CMEMDH_OVERRIDE
{
settings.session().joinRecovery(
instruments_.empty() ? JoinRecoveryOptions::Disabled : JoinRecoveryOptions::Instruments
);
settings.instrumentCache(instruments_);
}
void replay(const FileList& files, Handler& handler) ONIXS_CMEMDH_OVERRIDE
{
replayDatamineFiles(files, handler);
}
private:
std::string instruments_;
};
// Spawns market data processing from the previously recorded log file.
class Application
{
public:
// Initializes the sample according to the arguments.
Application(const Configuration& configuration)
: configuration_(configuration)
{
// Configures replay manager for the
// selected data (either log or PCAP).
configuration_.filesToReplay(files_);
if (configuration_.replayPcap())
{
ScopedPtr<PcapReplayManager> pcapManager(new PcapReplayManager());
if (files_.empty())
{
// The Handler is configured to use connectivity
// configuration for NRCert environment, whereas
// recorded packets belong to production environment.
// The following steps force the PCAP replay machinery
// to map data from production incremental feed to the
// certification one.
pcapManager->aliases()[NetFeedConnection("224.0.31.1", 14310)] = NetFeedConnection("224.0.28.1", 14310);
}
replayManager_.reset(pcapManager.release());
}
else if (configuration_.replayDatamine())
{
replayManager_.reset(new DatamineReplayManager(files_.empty()));
}
else
{
replayManager_.reset(new LogReplayManager());
}
if (files_.empty())
{
replayManager_->gather(files_, "data", configuration_.channel());
}
}
// Allows instances to be destroyed.
~Application() {}
// Spawns market data processing from
// the previously recorded log file.
void run()
{
// Even in replay, handler still logs its events.
FileLoggerEventTracer loggerTracer;
const ScopedPtr<Logger> logger(constructFileLogger(configuration_, &loggerTracer));
// Constructs handler for replay in regular way.
Handler handler;
// A bit of configuring for the instance.
HandlerSettings& settings = handler.settings();
setLicense(settings);
settings.channel(configuration_.channel()).connectivityConfigurationFile(configuration_.connectivityFile());;
settings.bookManagement().mboBooks().maintain(true);
// Additional turn-up.
replayManager_->configure(settings);
// Binds the given logger to the Handler.
apply(settings.logging(), configuration_, logger.get());
// Associates events processing with handler.
SecurityEventTracer securityEventTracer;
securityEventTracer.bind(handler);
MarketDataEventTracer marketDataTracer;
marketDataTracer.bind(handler);
ServiceEventTracer serviceTracer;
serviceTracer.bind(handler);
std::cout << "Replaying the recorded market data.. " << std::endl;
replayManager_->replay(files_, handler);
std::cout << std::endl << "Done. " << std::endl << std::endl;
}
// A bit of information on the sample.
static void identify()
{
std::cout << "Market Data Replay for the CME MDP Premium Market Data Handler, v" << toStr(Version::current())
<< ". "
<< std::endl
<< std::endl;
}
private:
const Configuration& configuration_;
ScopedPtr<ReplayManager> replayManager_;
FileList files_;
// Prohibits copy construction.
Application(const Application&);
// No re-assignments for this time.
Application& operator=(const Application&);
};
int main(int qty, char** args)
{
try
{
Application::identify();
const Configuration configuration(qty, args);
if (configuration.show())
{
configuration.show(std::cout);
}
else
{
Application(configuration).run();
}
return 0;
}
catch (const std::exception& ex)
{
std::cerr << std::endl << "ERROR: " << ex.what() << std::endl;
return 1;
}
}