OnixS C++ FIX Engine  4.12.0
API Documentation
Pluggable Storage Sample

Memory-based session storage is typically used to maintain a high-performance FIX session when persisting of a sessions state and messages to the file system is not required. This sample demonstrates how to use pluggable session storage.


Source code:


MySessionStorage.h:

class MySessionStorage : public OnixS::FIX::ISessionStorage
{
public:
MySessionStorage();
void clear() ONIXS_FIXENGINE_OVERRIDE;
void close(bool keepSequenceNumbers, bool doBackup) ONIXS_FIXENGINE_OVERRIDE;
void getOutbound(SequenceNumber beginSequenceNumber, SequenceNumber endSequenceNumber, ISessionStorageListener * listener) ONIXS_FIXENGINE_OVERRIDE;
SequenceNumber inSeqNum() ONIXS_FIXENGINE_OVERRIDE;
void inSeqNum(SequenceNumber messageSequenceNumber) ONIXS_FIXENGINE_OVERRIDE;
void storeInbound(const Message & message, SequenceNumber messageSequenceNumber, const RawMessagePointer& pointer, bool logMessage) ONIXS_FIXENGINE_OVERRIDE;
void storeInbound(const FlatMessage & message, SequenceNumber messageSequenceNumber, const RawMessagePointer & pointer, bool logMessage) ONIXS_FIXENGINE_OVERRIDE;
typedef std::vector<char> RawMessage;
typedef std::map<SequenceNumber, RawMessage> MessageSequenceNumber2RawMessage;
MessageSequenceNumber2RawMessage outgoingMessages_;
void storeOutbound(const Message &, const RawMessagePointer & pointer, bool logMessage) ONIXS_FIXENGINE_OVERRIDE;
void storeOutbound(const FlatMessage &, SequenceNumber sequenceNumber, const RawMessagePointer & pointer, bool logMessage) ONIXS_FIXENGINE_OVERRIDE;
void setSessionTerminationFlag(bool terminated) ONIXS_FIXENGINE_OVERRIDE;
SequenceNumber outSeqNum() ONIXS_FIXENGINE_OVERRIDE;
void outSeqNum(SequenceNumber messageSequenceNumber) ONIXS_FIXENGINE_OVERRIDE;
void sessionCreationTime(Timestamp) ONIXS_FIXENGINE_OVERRIDE;
Timestamp sessionCreationTime() ONIXS_FIXENGINE_OVERRIDE;
void flush() ONIXS_FIXENGINE_OVERRIDE {}
size_t resendingQueueSize() const ONIXS_FIXENGINE_OVERRIDE { return 1000; };
void resendingQueueSize(size_t) ONIXS_FIXENGINE_OVERRIDE {}
private:
bool isSessionTerminated_;
SequenceNumber inSeqNum_;
SequenceNumber outSeqNum_;
Timestamp sessionCreationTime_;
};

MySessionStorage.cpp:

#include "MySessionStorage.h"
MySessionStorage::MySessionStorage()
: isSessionTerminated_(false),
inSeqNum_(0),
outSeqNum_(0)
{}
{
std::clog << "Clear the session storage." << std::endl;
inSeqNum_ = 0;
outSeqNum_ = 0;
outgoingMessages_.clear();
}
void MySessionStorage::close(bool keepSequenceNumbers, bool doBackup)
{
if(doBackup)
{
// Back up the stored data.
std::clog << "Back up the stored data." << std::endl;
}
if(!keepSequenceNumbers)
clear();
}
void MySessionStorage::getOutbound(SequenceNumber beginSequenceNumber, SequenceNumber endSequenceNumber, ISessionStorageListener * listener)
{
std::clog << "getOutbound(" << beginSequenceNumber << ", " << endSequenceNumber << ",..)" << std::endl;
for(SequenceNumber i = beginSequenceNumber; i <= endSequenceNumber; ++i)
{
MessageSequenceNumber2RawMessage::const_iterator it = outgoingMessages_.find(i);
if(it != outgoingMessages_.end())
listener->onReplayedMessage(ISessionStorage::RawMessagePointer(&it->second[0], it->second.size()));
}
}
{
return inSeqNum_;
}
void MySessionStorage::inSeqNum(SequenceNumber messageSequenceNumber)
{
std::clog << "inSeqNum(" << messageSequenceNumber << ")" << std::endl;
inSeqNum_ = messageSequenceNumber;
}
void MySessionStorage::storeInbound(const Message &, SequenceNumber messageSequenceNumber, const RawMessagePointer& pointer, bool logMessage)
{
inSeqNum_ = messageSequenceNumber;
if (logMessage)
{
std::clog << "storeInbound(" << messageSequenceNumber << ", " << std::string(
reinterpret_cast<const char*>(pointer.buffer_), pointer.length_)
<< ')' << std::endl;
}
}
void MySessionStorage::storeInbound(const FlatMessage &, SequenceNumber messageSequenceNumber, const RawMessagePointer & pointer, bool logMessage)
{
inSeqNum_ = messageSequenceNumber;
if(logMessage)
{
std::clog << "storeInbound(" << messageSequenceNumber << ", " << std::string(
reinterpret_cast<const char*>(pointer.buffer_), pointer.length_)
<< ')' << std::endl;
}
}
void MySessionStorage::storeOutbound(const Message & message, const RawMessagePointer & pointer, bool logMessage)
{
OnixS::FIX::Int32 messageSequenceNumber = message.seqNum();
outSeqNum_ = messageSequenceNumber;
if(logMessage)
{
std::clog << "storeOutbound(" << messageSequenceNumber << ", " << std::string(
reinterpret_cast<const char*>(pointer.buffer_), pointer.length_)
<< ')' << std::endl;
outgoingMessages_.insert(make_pair(messageSequenceNumber, RawMessage(pointer.buffer_,
pointer.buffer_ + pointer.length_)));
}
}
void MySessionStorage::storeOutbound(const FlatMessage &, SequenceNumber sequenceNumber, const RawMessagePointer & pointer, bool logMessage)
{
outSeqNum_ = sequenceNumber;
if(logMessage)
{
std::clog << "storeOutbound(" << sequenceNumber << ", " << std::string(
reinterpret_cast<const char*>(pointer.buffer_), pointer.length_)
<< ')' << std::endl;
outgoingMessages_.insert(make_pair(sequenceNumber, RawMessage(pointer.buffer_,
pointer.buffer_ + pointer.length_)));
}
}
{
std::clog << "setSessionTerminationFlag(" << terminated << ")" << std::endl;
isSessionTerminated_ = terminated;
}
{
return outSeqNum_;
}
void MySessionStorage::outSeqNum(SequenceNumber messageSequenceNumber)
{
std::clog << "outSeqNum(" << messageSequenceNumber << ")" << std::endl;
outSeqNum_ = messageSequenceNumber;
}
void MySessionStorage::sessionCreationTime(Timestamp timestamp)
{
std::clog << "sessionCreationTime(" << timestamp.toString() << ")" << std::endl;
sessionCreationTime_ = timestamp;
}
{
return sessionCreationTime_;
}

PluggableStorage.cpp:

#include "MySessionStorage.h"
#include "../../Common/Helpers.h"
#include "../../Common/Settings.h"
using namespace Settings;
namespace {
class MySessionListener : public ISessionListener
{
public:
void onInboundApplicationMsg(Message &, Session *) ONIXS_FIXENGINE_OVERRIDE {}
};
};
int main()
{
try {
EngineSettings settings;
settings.licenseStore(LicenseStore);
Engine::init(settings);
MySessionListener listener;
MySessionStorage storage;
Session session("MySessionStorage_Initiator", "MySessionStorage_Acceptor", FixProtocolVersion, true, &listener, SessionStorageType::Pluggable, &storage);
Message order(MsgType::Order_Single, FixProtocolVersion);
session.send(&order);
MessagePtr foundMessage(session.findSentMessage(1));
if(foundMessage.get())
std::clog << std::endl << "Message found: " << *foundMessage << std::endl;
else
std::clog << std::endl << "Message not found!" << std::endl;
}
catch(const std::exception & ex) {
processSampleException(ex.what());
}
Engine::shutdown();
return 0;
}