OnixS C++ CME MDP Conflated TCP Handler  1.2.1
API Documentation
Session.h
Go to the documentation of this file.
1 /*
2  * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3  *
4  * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5  * and international copyright treaties.
6  *
7  * Access to and use of the software is governed by the terms of the applicable OnixS Software
8  * Services Agreement (the Agreement) and Customer end user license agreements granting
9  * a non-assignable, non-transferable and non-exclusive license to use the software
10  * for it's own data processing purposes under the terms defined in the Agreement.
11  *
12  * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13  * of this source code or associated reference material to any other location for further reproduction
14  * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15  *
16  * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17  * the terms of the Agreement is a violation of copyright law.
18  */
19 
20 #pragma once
21 
28 
29 namespace OnixS
30 {
31 namespace CME
32 {
33 namespace ConflatedTCP
34 {
35 /**
36 * An Conflated TCP Session.
37 */
39 {
40 public:
41  static const UInt64 UndefinedUuid = 0;
42 
43  /**
44  * Constructor.
45  *
46  * \param settings Session settings.
47  * \param marketSegmentId Market segment ID.
48  * \param sessionListener Session listener.
49  * \param marketDataListener Market Data listener.
50  * \param storageType Session storage type.
51  * \param storage Session storage.
52  * \param uuid Universally Unique Identifier (UUID).
53  * \param customKey Custom key that could be used to distinguish sessions with the same UUID, Session ID, FirmId, and Market Segment ID values.
54  */
55  Session(const SessionSettings &settings, int marketSegmentId,
56  SessionListener* sessionListener, MDP::MarketDataListener* marketDataListener, SessionStorageType::Enum storageType = SessionStorageType::FileBased,
57  SessionStorage *storage = ONIXS_CONFLATEDTCP_NULLPTR, UInt64 uuid = UndefinedUuid, const std::string &customKey = "" );
58 
59  ~Session();
60 
61  /// \return the Universally Unique Identifier (UUID).
62  UInt64 uuid() const;
63 
64  /// \return Session ID (assigned by the exchange).
65  const std::string id() const;
66 
67  /// \return Market Segment ID.
68  int marketSegmentId() const;
69 
70  /// \return Custom key that could be used to distinguish sessions with the same UUID, Session ID, FirmId, and Market Segment ID values.
71  const std::string customKey() const;
72 
73  /**
74  * Establishes the connection.
75  *
76  * \param host Market Segment Gateway host.
77  * \param port Market Segment Gateway port.
78  *
79  * \return This session.
80  */
81  Session& connect(const std::string &host, Port port);
82 
83  /**
84  * Terminates the connection.
85  *
86  * \param reason Terminate reason details.
87  *
88  * \note Method blocks until the Terminate message is received in reply, or the timeout is elapsed.
89  *
90  * \return This session.
91  */
92  Session& disconnect(const std::string &reason = "Terminate");
93 
94  /**
95  * Sends the message.
96  *
97  * \note This call is thread-safe.
98  *
99  * \return This session.
100  */
101  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
103 
104 #if ONIXS_CONFLATEDTCP_COMPILER_CXX_RVALUE_REFERENCES
105 
106  /**
107  * Sends the message.
108  *
109  * \note This call is thread-safe.
110  *
111  * \return This session.
112  */
113  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
115 
116 #endif
117 
118 #if ONIXS_CONFLATEDTCP_COMPILER_CXX_ALIAS_TEMPLATES
119 
120  /**
121  * Sends messages.
122  *
123  * \note This call is thread-safe.
124  *
125  * \return This session.
126  */
127  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
129 
130 #endif
131 
132 #if defined(ONIXS_CONFLATEDTCP_CXX11)
133 
134  /**
135  * Sends messages.
136  *
137  * \note This call is thread-safe.
138  *
139  * \return This session.
140  */
141  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
143 
144 #endif
145 
146  /**
147  * Warms up the sending path.
148  *
149  * \param msg SBE message to warm up the sending path.
150  *
151  * \note This call is thread-safe.
152  *
153  * \return This session.
154  */
155  template <typename SbeMessageType, size_t MaxMessageSize>
157  {
158  warmUp(msg.message(), *msg.header());
159  return *this;
160  }
161 
162  /// \return the host name the session is connected to.
163  std::string remoteHost() const;
164 
165  /// \return the IP address the session is connected to.
166  std::string remoteIpAddress() const;
167 
168  /// \return the port number the session is connected to.
169  Port remotePort() const;
170 
171  // TODO: move to SessionSettings
172  std::pair<Port, Port> localPortRange() const;
173 
174  // TODO: move to SessionSettings
175  Session& localPortRange(std::pair<Port, Port> portRange);
176 
177  /// \return the IP Address or name of the local network interface.
178  std::string localNetworkInterface() const;
179 
180  /// Sets the IP Address or name of the local network interface.
181  Session& localNetworkInterface(const std::string &value);
182 
183  /// \return the local network port number.
184  Port localPort() const;
185 
186  /// \return the value of the `TCP_NODELAY` option (improve latency at the expense of message throughput).
187  bool tcpNoDelayOption() const;
188 
189  /// Sets the `TCP_NODELAY` option (improve latency at the expense of message throughput).
190  Session& tcpNoDelayOption(bool value);
191 
192  /// \return Socket options.
193  const SocketOptions &socketOptions() const;
194 
195  /// Sets socket options.
196  Session& socketOptions(const SocketOptions &options);
197 
198  static const int UndefinedAffinity = -1;
199 
200  /// Sets the receiving thread CPU affinity.
201  Session& receivingThreadAffinity(CpuIndex cpuIndex);
202 
203  /// \return the receiving thread CPU affinity.
204  Session& receivingThreadAffinity(const CpuIndexes &cpuIndexes);
205 
206  /// Sets the receiving thread CPU affinity.
207  const CpuIndexes &receivingThreadAffinity() const;
208 
209  /// Sets the sending thread CPU affinity.
210  Session& sendingThreadAffinity(CpuIndex cpuIndex);
211 
212  /// Sets the sending thread CPU affinity.
213  Session& sendingThreadAffinity(const CpuIndexes &cpuIndexes);
214 
215  /// \return the auxiliary sending thread CPU affinity.
216  const CpuIndexes &sendingThreadAffinity() const;
217 
218  /// \return the scheduling priority of the receiving thread.
219  int receivingThreadPriority() const;
220 
221  /// Sets the scheduling priority of the receiving thread.
222  Session& receivingThreadPriority(int priority);
223 
224  /// \return the scheduling priority of the sending thread.
225  int sendingThreadPriority() const;
226 
227  /// \return the scheduling priority of the sending thread.
228  Session& sendingThreadPriority(int priority);
229 
230  /// \return the receiving thread policy.
231  int receivingThreadPolicy() const;
232 
233  /// Sets the receiving thread policy.
234  Session& receivingThreadPolicy(int policy);
235 
236  /// Sets the sending thread policy.
237  int sendingThreadPolicy() const;
238 
239  /// \return the sending thread policy.
240  Session& sendingThreadPolicy(int policy);
241 
242  /// \return the receive spinning timeout value (in microseconds).
243  unsigned receiveSpinningTimeout() const;
244 
245  /// Sets the receive spinning timeout value (in microseconds).
246  Session& receiveSpinningTimeout(unsigned timeoutInUs);
247 
248  /// \return the send spinning timeout value (in microseconds).
249  unsigned sendSpinningTimeout() const;
250 
251  /// Sets the send spinning timeout value (in microseconds).
252  Session& sendSpinningTimeout(unsigned timeoutInUs);
253 
254  /// \return `true` if new messages are reported even when the message gap is detected,
255  /// and the reply on the [Retransmit Request](https://www.cmegroup.com/confluence/display/EPICSANDBOX/Retransmit+Request)
256  /// message is expected, otherwise - `false`.
257  bool reportNewMessagesWhileWaitingForMissedMessages() const;
258 
259  /// Sets the option to report new messages even when the message gap is detected,
260  /// and the reply on the [Retransmit Request](https://www.cmegroup.com/confluence/display/EPICSANDBOX/Retransmit+Request)
261  /// message is expected.
262  Session& reportNewMessagesWhileWaitingForMissedMessages(bool value);
263 
264  /// \return `true` if incoming message sequence numbers are validated, otherwise - `false`.
265  bool validateSequenceNumbers() const;
266 
267  /// Turns on/off the validation of incoming message sequence numbers.
268  Session& validateSequenceNumbers(bool validate);
269 
270  /// \return the number of messages to be sent together.
271  unsigned messageGrouping() const;
272 
273  /**
274  * Sets the number of messages to be sent together.
275  *
276  * \param numberOfMessagesToGroup If the parameter is zero (by default) or one, the Handler tries to send
277  * an outgoing application-level message in the context of the thread that calls the OnixS::CME::ConflatedTCP::Session::send method.
278  * If the message cannot be sent immediately, it is stored in the queue for the subsequent sending by the sending thread.
279  * If this parameter is larger than one, the Handler stores outgoing SBE messages in the queue for the subsequent sending by the sending thread.
280  *
281  * \return This session.
282  */
283  Session& messageGrouping(unsigned numberOfMessagesToGroup);
284 
285  /// \return Session storage type.
286  SessionStorageType::Enum storageType() const;
287 
288  /// \return Session storage ID.
289  const std::string &storageId() const;
290 
291  /// \return the sequence number of the next outgoing message.
292  SeqNumber outSeqNum() const;
293 
294  /// Sets the sequence number of the next outgoing message.
295  Session& outSeqNum(SeqNumber nextOutSeqNum);
296 
297  /// \return the expected sequence number of the next inbound message.
298  SeqNumber inSeqNum() const;
299 
300  /// Sets the expected sequence number of the next inbound message.
301  Session& inSeqNum(SeqNumber nextExpectedInSeqNum);
302 
303  /**
304  * \return the sequence number of the last business message published by CME with the PreviousUUID.
305  *
306  * If no business message was published, the value is zero.
307  */
308  SeqNumber previousSeqNo() const;
309 
310  /// Sets the sequence number of the last business message published by CME with the PreviousUUID.
311  Session& previousSeqNo(SeqNumber value);
312 
313  /**
314  * \return The UUID from the previously Established session.
315  *
316  * - This can be the CME assigned default UUID=0 for messages published by CME before first Negotiation of customer at the beginning of the week.
317  * - This can be the last UUID as used by the customer from the previously Established session.
318  */
319  Messaging::UInt64 previousUuid() const;
320 
321  /// Sets the UUID from the previously Established session.
322  Session& previousUuid(Messaging::UInt64 value);
323 
324  /**
325  * Backups the current log files, resets the sequence numbers to 1 and generates a new UUID.
326  *
327  * \warning Can be called only when the session is disconnected.
328  *
329  * \param startOfWeek if `true` then the PreviousUUID and PreviousSeqNum are reset to 0.
330  *
331  * \return This session.
332  */
333  Session& reset(bool startOfWeek = false);
334 
335  /// \return Session's current state.
336  SessionStateId::Enum state();
337 
338  /// \return the time when the session was created or the last reset operation was performed.
339  Messaging::Timestamp creationTime() const;
340 
341  /// Flushes all internal buffers of the session storage.
342  void flushSessionStorage();
343 
344  /// \return the maximum number of tries to restore the telecommunications link.
345  unsigned reconnectAttempts() const;
346 
347  /// Sets the maximum number of tries to restore the telecommunications link.
348  Session& reconnectAttempts(unsigned reconnectAttempts);
349 
350  /// \return the time interval between retries to restore the telecommunications link (seconds).
351  unsigned reconnectInterval() const;
352 
353  /// Sets the time interval between retries to restore the telecommunications link (seconds).
354  Session& reconnectInterval(unsigned seconds);
355 
356  /// \return the usage of local time in Handler events and logs files.
357  bool localTimeUsage() const;
358 
359  /// Specifies the usage of local time in Handler events and logs files.
360  Session& localTimeUsage(bool useLocalTime);
361 
362  /// \return The session storage directory.
363  const std::string &storageDirectory() const;
364 
365  /// \return `true` if logging of an outgoing message to the session storage is performed before sending it to the wire, otherwise - `false`.
366  bool logBeforeSending() const;
367 
368  /// Sets the option to log an outgoing message to the session storage before sending it to the wire.
369  Session& logBeforeSending(bool value);
370 
371  // @TODO: document this method.
372  size_t incomingMessageGapQueueMaximumSize() const;
373 
374  // @TODO: document this method.
375  Session& incomingMessageGapQueueMaximumSize(size_t value);
376 
377  /// Write the given user's message to the Handler's log file using the session formatted Id.
378  const Session& log(const std::string &message) const;
379 
380  /// \return the human-friendly description.
381  std::string toString() const;
382 
383 private:
384  Session(const Session& );
385  Session& operator=(const Session& );
386 
388  void send(std::vector<Messaging::SbeMessage> msgs, std::vector<Messaging::SimpleOpenFramingHeader *> headers);
390 
391  struct Impl;
392  Impl *const impl_;
393 };
394 
395 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
397 {
398  const Messaging::MessageSize messageSize = msg.setHeader();
399  Messaging::SimpleOpenFramingHeader *const header = msg.header();
400  send(Messaging::SbeMessage(header + 1, messageSize, Messaging::SbeMessage::NoCheck()), *header);
401 
402  return *this;
403 }
404 
405 #if ONIXS_CONFLATEDTCP_COMPILER_CXX_RVALUE_REFERENCES
406 
407 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
409 {
410  send(msg);
411 
412  return *this;
413 }
414 
415 #endif
416 
417 #if ONIXS_CONFLATEDTCP_COMPILER_CXX_ALIAS_TEMPLATES
418 
419 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
421 {
422  std::vector<Messaging::SbeMessage> messages(msgs.size());
423  std::vector<Messaging::SimpleOpenFramingHeader *> headers(msgs.size());
424 
425  for (size_t i = 0; i < msgs.size(); ++i)
426  {
427  const Messaging::MessageSize messageSize = msgs[i].setHeader();
428  Messaging::SimpleOpenFramingHeader *const header = msgs[i].header();
429  messages[i] = Messaging::SbeMessage(header + 1, messageSize, Messaging::SbeMessage::NoCheck());
430  headers[i] = header;
431  }
432 
433  send(messages, headers);
434 
435  return *this;
436 }
437 
438 #endif
439 
440 #if defined(ONIXS_CONFLATEDTCP_CXX11)
441 
442 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
444 {
446  return send(messages);
447 }
448 
449 #endif
450 
451 
452 }
453 }
454 }
unsigned short Port
Definition: Defines.h:44
An Conflated TCP Session.
Definition: Session.h:38
Messaging::UInt32 SeqNumber
Definition: Messaging.h:58
Contains the SimpleOpenFramingHeader, the SBE message, and the data buffer.
#define ONIXS_CONFLATEDTCP_EXPORTED
Definition: Compiler.h:187
Session & send(Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg)
Sends the message.
Definition: Session.h:396
std::vector< SocketOption > SocketOptions
Socket options.
Definition: Defines.h:77
Definition: Defines.h:40
std::set< CpuIndex > CpuIndexes
Definition: Defines.h:74
The time point without the time-zone information.
Definition: Time.h:467
MessageSize setHeader() noexcept
Calculates the binary size of the message and updates the Simple Open Framing Header accordingly...
Session & warmUp(Messaging::MessageHolder< SbeMessageType, MaxMessageSize > &msg)
Warms up the sending path.
Definition: Session.h:156
UInt16 MessageSize
Message length type.
Definition: Aliases.h:29
Callbacks invoked by Handler during market data processing to reflect various stages of processing...
std::vector< MessageHolder< MessageTypeT, MaxMessageSize, MessageInitializer > > MessageHolders
const SimpleOpenFramingHeader * header() const noexcept