22 #include <OnixS/HandlerCore/HandlerLogger.h> 30 ONIXS_HANDLER_NAMESPACE_BEGIN
36 DataSource createDataSource(
const std::string& sessionId)
38 DataSource dataSource;
39 dataSource.origin = DataSource::glimpse;
40 dataSource.packetMessageCount = 1;
41 dataSource.packetMessageNumber = 0;
42 dataSource.packetReceptionTime = Timestamp();
45 dataSource.messageSeqNum = 0;
46 dataSource.session =
reinterpret_cast<const Byte*
>(sessionId.c_str());
51 OnixS::ItchCore::SoupBinTCP::ReconnectableSessionSettings createSessionSettings(
const HandlerSettings& settings)
53 OnixS::ItchCore::SoupBinTCP::ReconnectableSessionSettings sessionSettings;
54 sessionSettings.receiveTimeoutMs = 2000;
55 sessionSettings.sendTimeoutMs = 900;
57 sessionSettings.reconnectIntervalMs = settings.minReconnectingTimeout * 1000;
58 sessionSettings.reconnectAttempts = settings.maxConnectingAttempts;
59 sessionSettings.resetSeqNumWhenReconnect =
true;
61 return sessionSettings;
65 OnixS::ItchCore::SoupBinTCP::ConnectivitySettings createSessionConnectivitySettings(
66 const HandlerSettings& settings,
67 const SessionType& session)
69 OnixS::ItchCore::SoupBinTCP::ConnectivitySettings connectivitySettings;
71 connectivitySettings.connectionA =
72 OnixS::ItchCore::SoupBinTCP::ValidatedNetFeedConnection(
73 settings.glimpseFeed.serviceA.address,
74 settings.glimpseFeed.serviceA.port,
75 settings.networkInterfaceForTcpServices);
77 connectivitySettings.connectionB =
78 OnixS::ItchCore::SoupBinTCP::ValidatedNetFeedConnection(
79 settings.glimpseFeed.serviceB.address,
80 settings.glimpseFeed.serviceB.port,
81 settings.networkInterfaceForTcpServices);
84 connectivitySettings.authenticationSettings.userName = settings.glimpseUsername;
85 connectivitySettings.authenticationSettings.password = settings.glimpsePassword;
86 connectivitySettings.authenticationSettings.session = std::string(reinterpret_cast<const char*>(session));
87 connectivitySettings.initialSequenceNumber = 1;
89 return connectivitySettings;
94 GlimpseService::GlimpseService(
96 const Logging::LogFacility* parent,
97 HandlerCore::Common::HandlerLogger* logger)
98 : Logging::LogFacility(
"GlimpseService", parent,
OnixS::Logging::LOG_LEVEL_DEBUG)
100 , sessionSettings_(createSessionSettings(settings_))
101 , sessionConnectivitySettings_()
111 void GlimpseService::invokeOnFailure(
const std::string& what)
113 if (onFailureCallback_)
114 onFailureCallback_(what);
117 void GlimpseService::invokeOnWarning(
const std::string& what)
119 if (onWarningCallback_)
120 onWarningCallback_(what);
125 if (onMessageCallback_)
126 onMessageCallback_(ds, &msg);
129 void GlimpseService::invokeOnRestarted()
131 if (onRestartedCallback_)
132 onRestartedCallback_();
137 onFailureCallback_ = callback;
142 onWarningCallback_ = callback;
147 onMessageCallback_ = callback;
152 onRestartedCallback_ = callback;
162 log(ONIXS_LOG_DEBUG[
this] <<
"Stopping.");
164 Guard guard(sessionLock_);
168 session_->disconnect(!wait);
175 log(ONIXS_LOG_INFO[
this] <<
" SnapshotRecoveryRequested");
177 sessionConnectivitySettings_ = createSessionConnectivitySettings(settings_, session);
179 Guard guard(sessionLock_);
183 log(ONIXS_LOG_DEBUG[
this] <<
"Requested glimpse sequence: " << requestedSequence);
186 new OnixS::ItchCore::SoupBinTCP::ReconnectableSession(sessionSettings_, sessionConnectivitySettings_,
this,
this)
189 session_->setLogger(logger_);
194 void GlimpseService::onError(OnixS::ItchCore::SoupBinTCP::ReconnectableSession&,
const std::string& what)
196 log(ONIXS_LOG_ERROR[
this] << what);
198 invokeOnFailure(what);
201 void GlimpseService::onWarning(OnixS::ItchCore::SoupBinTCP::ReconnectableSession&,
const std::string& what)
203 log(ONIXS_LOG_WARN[
this] << what);
205 invokeOnWarning(what);
208 void GlimpseService::onStateChange(OnixS::ItchCore::SoupBinTCP::ReconnectableSession& session,
209 OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Enum oldState,
210 OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Enum newState)
212 if (OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Disconnected == newState)
217 OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Connected == oldState &&
218 OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Connecting == newState
222 log(ONIXS_LOG_INFO[
this] <<
" SnapshotRecoveryRestarted");
224 log(ONIXS_LOG_WARN[
this] <<
"Reconnect detected, resetting message repository.");
230 if (OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Connected == newState)
234 sessionId_ = session.sessionId();
238 log(ONIXS_LOG_WARN[
this] <<
"Error during retrieving session id");
243 void GlimpseService::onData(OnixS::ItchCore::SoupBinTCP::ReconnectableSession& session,
const void* data, UInt16 size)
245 BOOST_ASSERT(data !=
nullptr);
246 BOOST_ASSERT(size != 0);
250 processData(session, data, size);
254 log(ONIXS_LOG_WARN[
this] <<
"Exception during message processing.");
258 void GlimpseService::processData(OnixS::ItchCore::SoupBinTCP::ReconnectableSession& session,
const void* data, UInt16 size)
260 const DataSource dataSource = createDataSource(sessionId_);
263 invokeOnMessage(dataSource, message);
267 session.disconnect(
true);
272 ONIXS_HANDLER_NAMESPACE_END
boost::function< void(const DataSource &, const IncomingMessage *)> OnMessageCallback
UInt64 SequenceNumber
Alias for Sequence Number type.
boost::function< void(const std::string &)> OnFailureCallback
Log binary data of received packets, applied only for Info log level and below.
void request(const OnixS::ItchCore::SessionType &session, SequenceNumber requestedSequence)
MessageType::Enum type() const
Type.
boost::function< void()> OnRestartedCallback
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
void subscribeOnWarning(OnWarningCallback callback)
UInt8 Byte
Alias for Byte.
void subscribeOnMessage(OnMessageCallback callback)
boost::function< void(const std::string &)> OnWarningCallback
virtual ~GlimpseService()
void subscribeOnFailure(OnFailureCallback callback)
Handler configuration settings.
void subscribeOnRestarted(OnRestartedCallback callback)