OnixS C++ SGX Titan ITCH Market Data Handler  1.2.2
API documentation
GlimpseService.cpp
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable OnixS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 
21 
22 #include <OnixS/HandlerCore/HandlerLogger.h>
23 
24 #include "HandlerLogger.h"
25 
27 #include "GlimpseService.h"
28 
29 
30 ONIXS_HANDLER_NAMESPACE_BEGIN
31 
32 using namespace OnixS::ItchCore;
33 
34 namespace
35 {
36  DataSource createDataSource(const std::string& sessionId)
37  {
38  DataSource dataSource;
39  dataSource.origin = DataSource::glimpse;
40  dataSource.packetMessageCount = 1;
41  dataSource.packetMessageNumber = 0;
42  dataSource.packetReceptionTime = Timestamp();
43 
44 
45  dataSource.messageSeqNum = 0;
46  dataSource.session = reinterpret_cast<const Byte*>(sessionId.c_str());
47 
48  return dataSource;
49  }
50 
51  OnixS::ItchCore::SoupBinTCP::ReconnectableSessionSettings createSessionSettings(const HandlerSettings& settings)
52  {
53  OnixS::ItchCore::SoupBinTCP::ReconnectableSessionSettings sessionSettings;
54  sessionSettings.receiveTimeoutMs = 2000;
55  sessionSettings.sendTimeoutMs = 900;
56 
57  sessionSettings.reconnectIntervalMs = settings.minReconnectingTimeout * 1000;
58  sessionSettings.reconnectAttempts = settings.maxConnectingAttempts;
59  sessionSettings.resetSeqNumWhenReconnect = true;
60 
61  return sessionSettings;
62  }
63 
64 
65  OnixS::ItchCore::SoupBinTCP::ConnectivitySettings createSessionConnectivitySettings(
66  const HandlerSettings& settings,
67  const SessionType& session)
68  {
69  OnixS::ItchCore::SoupBinTCP::ConnectivitySettings connectivitySettings;
70 
71  connectivitySettings.connectionA =
72  OnixS::ItchCore::SoupBinTCP::ValidatedNetFeedConnection(
73  settings.glimpseFeed.serviceA.address,
74  settings.glimpseFeed.serviceA.port,
75  settings.networkInterfaceForTcpServices);
76 
77  connectivitySettings.connectionB =
78  OnixS::ItchCore::SoupBinTCP::ValidatedNetFeedConnection(
79  settings.glimpseFeed.serviceB.address,
80  settings.glimpseFeed.serviceB.port,
81  settings.networkInterfaceForTcpServices);
82 
83 
84  connectivitySettings.authenticationSettings.userName = settings.glimpseUsername;
85  connectivitySettings.authenticationSettings.password = settings.glimpsePassword;
86  connectivitySettings.authenticationSettings.session = std::string(reinterpret_cast<const char*>(session));
87  connectivitySettings.initialSequenceNumber = 1;
88 
89  return connectivitySettings;
90  }
91 
92 }
93 
94 GlimpseService::GlimpseService(
95  const HandlerSettings& settings,
96  const Logging::LogFacility* parent,
97  HandlerCore::Common::HandlerLogger* logger)
98  : Logging::LogFacility("GlimpseService", parent, OnixS::Logging::LOG_LEVEL_DEBUG)
99  , settings_(settings)
100  , sessionSettings_(createSessionSettings(settings_))
101  , sessionConnectivitySettings_()
102  , logger_(logger)
103  , inProgress_(false)
104 {
105 }
106 
108 {
109 }
110 
111 void GlimpseService::invokeOnFailure(const std::string& what)
112 {
113  if (onFailureCallback_)
114  onFailureCallback_(what);
115 }
116 
117 void GlimpseService::invokeOnWarning(const std::string& what)
118 {
119  if (onWarningCallback_)
120  onWarningCallback_(what);
121 }
122 
123 void GlimpseService::invokeOnMessage(const DataSource& ds, const IncomingMessage& msg)
124 {
125  if (onMessageCallback_)
126  onMessageCallback_(ds, &msg);
127 }
128 
129 void GlimpseService::invokeOnRestarted()
130 {
131  if (onRestartedCallback_)
132  onRestartedCallback_();
133 }
134 
136 {
137  onFailureCallback_ = callback;
138 }
139 
141 {
142  onWarningCallback_ = callback;
143 }
144 
146 {
147  onMessageCallback_ = callback;
148 }
149 
151 {
152  onRestartedCallback_ = callback;
153 }
154 
156 {
157  return inProgress_;
158 }
159 
160 void GlimpseService::stop(bool wait)
161 {
162  log(ONIXS_LOG_DEBUG[this] << "Stopping.");
163 
164  Guard guard(sessionLock_);
165 
166  if (session_)
167  {
168  session_->disconnect(!wait);
169  }
170 }
171 
172 void GlimpseService::request(const SessionType& session, SequenceNumber requestedSequence)
173 {
174  if (settings_.logSettings & LogSettings::LogPackets)
175  log(ONIXS_LOG_INFO[this] << " SnapshotRecoveryRequested");
176 
177  sessionConnectivitySettings_ = createSessionConnectivitySettings(settings_, session);
178 
179  Guard guard(sessionLock_);
180 
181  inProgress_ = true;
182 
183  log(ONIXS_LOG_DEBUG[this] << "Requested glimpse sequence: " << requestedSequence);
184 
185  session_.reset(
186  new OnixS::ItchCore::SoupBinTCP::ReconnectableSession(sessionSettings_, sessionConnectivitySettings_, this, this)
187  );
188 
189  session_->setLogger(logger_);
190 
191  session_->connect();
192 }
193 
194 void GlimpseService::onError(OnixS::ItchCore::SoupBinTCP::ReconnectableSession&, const std::string& what)
195 {
196  log(ONIXS_LOG_ERROR[this] << what);
197 
198  invokeOnFailure(what);
199 }
200 
201 void GlimpseService::onWarning(OnixS::ItchCore::SoupBinTCP::ReconnectableSession&, const std::string& what)
202 {
203  log(ONIXS_LOG_WARN[this] << what);
204 
205  invokeOnWarning(what);
206 }
207 
208 void GlimpseService::onStateChange(OnixS::ItchCore::SoupBinTCP::ReconnectableSession& session,
209  OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Enum oldState,
210  OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Enum newState)
211 {
212  if (OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Disconnected == newState)
213  inProgress_ = false;
214 
215 
216  if (
217  OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Connected == oldState &&
218  OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Connecting == newState
219  )
220  {
221  if (settings_.logSettings & LogSettings::LogPackets)
222  log(ONIXS_LOG_INFO[this] << " SnapshotRecoveryRestarted");
223 
224  log(ONIXS_LOG_WARN[this] << "Reconnect detected, resetting message repository.");
225 
226  invokeOnRestarted();
227  }
228 
229 
230  if (OnixS::ItchCore::SoupBinTCP::ReconnectableSessionState::Connected == newState)
231  {
232  try
233  {
234  sessionId_ = session.sessionId();
235  }
236  catch (...)
237  {
238  log(ONIXS_LOG_WARN[this] << "Error during retrieving session id");
239  }
240  }
241 }
242 
243 void GlimpseService::onData(OnixS::ItchCore::SoupBinTCP::ReconnectableSession& session, const void* data, UInt16 size)
244 {
245  BOOST_ASSERT(data != nullptr);
246  BOOST_ASSERT(size != 0);
247 
248  try
249  {
250  processData(session, data, size);
251  }
252  catch (...)
253  {
254  log(ONIXS_LOG_WARN[this] << "Exception during message processing.");
255  }
256 }
257 
258 void GlimpseService::processData(OnixS::ItchCore::SoupBinTCP::ReconnectableSession& session, const void* data, UInt16 size)
259 {
260  const DataSource dataSource = createDataSource(sessionId_);
261  const IncomingMessage message = IncomingMessage(static_cast<const unsigned char*>(data), size);
262 
263  invokeOnMessage(dataSource, message);
264 
265  if (static_cast<GlimpseMessageType::Enum>(message.type()) == (GlimpseMessageType::EndOfSnapshot))
266  {
267  session.disconnect(true);
268  }
269 }
270 
271 
272 ONIXS_HANDLER_NAMESPACE_END
boost::function< void(const DataSource &, const IncomingMessage *)> OnMessageCallback
UInt64 SequenceNumber
Alias for Sequence Number type.
Definition: Defines.h:37
boost::function< void(const std::string &)> OnFailureCallback
Log binary data of received packets, applied only for Info log level and below.
Definition: LogSettings.h:78
void request(const OnixS::ItchCore::SessionType &session, SequenceNumber requestedSequence)
LogSettings::Enum logSettings
Combine LogSettings enum values to configure the logger.
void subscribeOnWarning(OnWarningCallback callback)
UInt8 Byte
Alias for Byte.
Definition: Memory.h:30
void subscribeOnMessage(OnMessageCallback callback)
boost::function< void(const std::string &)> OnWarningCallback
void subscribeOnFailure(OnFailureCallback callback)
void subscribeOnRestarted(OnRestartedCallback callback)