OnixS C++ B3 BOE Binary Order Entry  1.2.0
API Documentation
Session.h
Go to the documentation of this file.
1 /*
2  * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3  *
4  * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5  * and international copyright treaties.
6  *
7  * Access to and use of the software is governed by the terms of the applicable OnixS Software
8  * Services Agreement (the Agreement) and Customer end user license agreements granting
9  * a non-assignable, non-transferable and non-exclusive license to use the software
10  * for it's own data processing purposes under the terms defined in the Agreement.
11  *
12  * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13  * of this source code or associated reference material to any other location for further reproduction
14  * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15  *
16  * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17  * the terms of the Agreement is a violation of copyright law.
18  */
19 
20 #pragma once
21 
22 #include <limits.h>
23 
24 #include <OnixS/B3/BOE/ABI.h>
25 #include <OnixS/B3/BOE/Defines.h>
30 #include <OnixS/B3/BOE/TcpInfo.h>
33 
34 namespace OnixS
35 {
36 namespace B3
37 {
38 namespace BOE
39 {
40 // Forward declarations
41 class TcpDirectStack;
42 namespace AdHoc { struct SessionHelper; };
43 
44 /**
45 * A FIXP Session.
46 */
48 {
49 public:
50  static const UInt64 UndefinedSessionVerID = 0;
51 
52  /**
53  * Constructor.
54  *
55  * \param settings Session settings.
56  * \param listener Session listener.
57  * \param storageType Session storage type.
58  * \param storage Session storage.
59  * \param sessionVerId Session Version Identification (sessionVerId).
60  * \param customKey Custom key that could be used to distinguish sessions with the same sessionVerId, Session ID, FirmId, and Market Segment ID values.
61  */
63  SessionStorage *storage = ONIXS_B3_BOE_NULLPTR, UInt64 sessionVerId = UndefinedSessionVerID, const std::string &customKey = "");
64 
66  SessionStorage * storage = ONIXS_B3_BOE_NULLPTR, UInt64 sessionVerId = UndefinedSessionVerID, const std::string &customKey = "");
67 
68  ~Session();
69 
70  /// \return the Session Version Identification (sessionVerId).
71  UInt64 sessionVerId() const ONIXS_B3_BOE_NOTHROW;
72 
73  /// \return string presentation for Session ID (assigned by the exchange).
74  std::string id() const;
75 
76  /// \return Session ID (assigned by the exchange).
77  Messaging::SessionID sessionId() const ONIXS_B3_BOE_NOTHROW;
78 
79  /// \return Custom key that could be used to distinguish sessions with the same sessionVerId, Session ID, FirmId, and Market Segment ID values.
80  std::string customKey() const;
81 
82  /// \return `true` if the session is negotiated (the Negotiation Response message has been received in reply to the Negotiation message); otherwise - `false`.
83  bool negotiated() const;
84 
85  /// Sets the "negotiated" status.
86  ///
87  /// The session is negotiated when the Negotiation Response message has been received in reply to the Negotiation message.
88  Session& negotiated(bool negotiated);
89 
90  /**
91  * Establishes the connection.
92  *
93  * \param host Market Segment Gateway host.
94  * \param port Market Segment Gateway port.
95  *
96  * \return This session.
97  */
98  Session& connect(const std::string &host, Port port);
99 
100  /**
101  * Establishes the connection asynchronously.
102  *
103  * \param host Market Segment Gateway host.
104  * \param port Market Segment Gateway port.
105  */
106  Threading::SharedFuture<void> connectAsync(const std::string & host, Port port);
107 
108  /**
109  * Terminates the connection.
110  *
111  * \note Method blocks until the Terminate message is received in reply, or the timeout is elapsed.
112  *
113  * \return This session.
114  */
115  Session& disconnect();
116 
117  /**
118  * Terminates the connection asynchronously.
119  */
120  Threading::SharedFuture<void> disconnectAsync();
121 
122  /**
123  * Sends the message.
124  *
125  * \param sendingTime The time value to be used for a `sendingTimeEpoch` field of the message(s) being sent.
126  * \param msg The message to be sent.
127  *
128  * \note This call is thread-safe, except for the TCPDirect mode.
129  *
130  * \return This session.
131  */
132  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
133  Session& send(
134  Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg,
135  Messaging::Timestamp sendingTime = UtcWatch::now());
136 
137 #if defined (ONIXS_B3_BOE_CXX11)
138 
139  /**
140  * Sends the message.
141  * \param sendingTime The time value to be used for a `sendingTimeEpoch` field of the message(s) being sent.
142  * \param msg The message to be sent.
143  *
144  * \note This call is thread-safe, except for the TCPDirect mode.
145  *
146  * \return This session.
147  */
148  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
149  Session& send(
151  Messaging::Timestamp sendingTime = UtcWatch::now());
152 
153 #endif
154 
155  /// Packet size maximum behavior based on MTU (Maximum Transmission Unit) 1420 bytes.
156  static const size_t B3BOEMaxPacketSize = 1420;
157 
158 #if defined (ONIXS_B3_BOE_CXX11)
159 
160  /**
161  * Sends messages.
162  *
163  * \note This call is thread-safe.
164  *
165  * \param msgs Message batch to send.
166  * \param maxPacketSize The maximum number of bytes written to the socket's send buffer together.
167  * This parameter could be used to reduce the probability that the operating system will fragment the message across multiple TCP packets.
168  * \param sendingTime The time value to be used for a `sendingTimeEpoch` field of the message(s) being sent.
169  *
170  * \note The maxPacketSize parameter should not be less than any message size in the batch.
171  * Otherwise, the method call can produce an error, and the session can close the connection.
172  *
173  * \return This session.
174  */
175  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
176  Session& send(
178  Messaging::Timestamp sendingTime = UtcWatch::now(),
179  size_t maxPacketSize = B3BOEMaxPacketSize);
180 
181  /**
182  * Sends messages.
183  *
184  * \note This call is thread-safe.
185  *
186  * \param msgs Message batch combiner to send.
187  * \param maxPacketSize The maximum number of bytes written to the socket's send buffer together.
188  * This parameter could be used to reduce the probability that the operating system will fragment the message across multiple TCP packets.
189  * \param sendingTime The time value to be used for a `sendingTimeEpoch` field of the message(s) being sent.
190  *
191  * \note The maxPacketSize parameter should not be less than any message size in the batch.
192  * Otherwise, the method call can produce an error, and the session can close the connection.
193  *
194  * \return This session.
195  */
196  Session& send(
197  MessageBatchCombiner & msgs,
198  Messaging::Timestamp sendingTime = UtcWatch::now(),
199  size_t maxPacketSize = B3BOEMaxPacketSize);
200 
201 #endif
202 
203  /**
204  * Send the Sequence message.
205  *
206  * \note The session sends Sequence messages automatically per the B3 BOE protocol.
207  * This method should be used only if an ad hoc Sequence message is required
208  * and the connection is established.
209  *
210  * \return This session.
211  */
212  Session& sendSequenceMessage();
213 
214  /**
215  * Warms up the sending path.
216  *
217  * \param msg SBE message to warm up the sending path.
218  * \param warmupFlags Specific flags which can be used to turn on the warmup feature for a specific NIC.
219  * \param sendingTime The time value to be used for a `sendingTimeEpoch` field of the message(s) being sent.
220  *
221  * \note This call is thread-safe, except for the TCPDirect mode.
222  *
223  * \return This session.
224  */
225  template <typename SbeMessageType, size_t MaxMessageSize>
226  Session& warmUp(
228  Messaging::Timestamp sendingTime = UtcWatch::now(),
229  int warmupFlags = 0);
230 
231 
232 #if defined (ONIXS_B3_BOE_CXX11)
233 
234  /**
235  * Warms up the sending path.
236  *
237  * \note This call is thread-safe.
238  *
239  * \param msgs Message batch to send.
240  * \param maxPacketSize The maximum number of bytes written to the socket's send buffer together during the subsequent `send` call.
241  * \param sendingTime The time value to be used for a `sendingTimeEpoch` field of the message(s) being sent.
242  * \param warmupFlags Specific flags which can be used to turn on the warmup feature for a specific NIC.
243  *
244  * \note The maxPacketSize parameter should not be less than any message size in the batch.
245  * Otherwise, the method call can produce an error, and the session can close the connection.
246  *
247  * \return This session.
248  */
249  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
250  Session& warmUp(
252  Messaging::Timestamp sendingTime = UtcWatch::now(),
253  size_t maxPacketSize = B3BOEMaxPacketSize,
254  int warmupFlags = 0);
255 
256  /**
257  * Warms up the sending path.
258  *
259  * \note This call is thread-safe.
260  *
261  * \param msgs Message batch combiner to send.
262  * \param maxPacketSize The maximum number of bytes written to the socket's send buffer together during the subsequent `send` call.
263  * \param sendingTime The time value to be used for a `sendingTimeEpoch` field of the message(s) being sent.
264  * \param warmupFlags Specific flags which can be used to turn on the warmup feature for a specific NIC.
265  *
266  * \note The maxPacketSize parameter should not be less than any message size in the batch.
267  * Otherwise, the method call can produce an error, and the session can close the connection.
268  *
269  * \return This session.
270  */
271  Session& warmUp(
273  size_t maxPacketSize = B3BOEMaxPacketSize, int warmupFlags = 0);
274 #endif
275 
276  /**
277  * Performs the throttling of a session that must be called before each send function call.
278  * If the count of messages per time unit exceeds the throttling limit, the function will be blocked until the given time interval is passed.
279  *
280  * \note This call is thread-safe.
281  */
282  Session& throttle();
283 
284  /**
285  * Checks the throttling of a session that must be called before each send function call.
286  * If the count of messages per time unit exceeds the throttling limit, the function returns the delay (in milliseconds) until the sending becomes possible. Otherwise, it returns 0.
287  *
288  * \note This call is thread-safe.
289  */
290  size_t tryThrottle();
291 
292  /**
293  * Sets throttling limit parameters.
294  *
295  * \param messagesCount Message limit per time unit.
296  * \param intervalInMs Time interval to limit messages.
297  *
298  * \note This call is thread-safe.
299  */
300  Session & throttlingLimit(size_t messagesCount, size_t intervalInMs = 1000);
301 
302  /// \return the host name the session is connected to.
303  std::string remoteHost() const;
304 
305  /// \return the IP address the session is connected to.
306  std::string remoteIpAddress() const;
307 
308  /// \return the port number the session is connected to.
309  Port remotePort() const;
310 
311  /// \return the local port range
312  std::pair<Port, Port> localPortRange() const ONIXS_B3_BOE_NOTHROW;
313 
314  /// Sets the local port range
315  Session& localPortRange(std::pair<Port, Port> portRange);
316 
317  /// \return the IP Address or name of the local network interface.
318  std::string localNetworkInterface() const;
319 
320  /// Sets the IP Address or name of the local network interface.
321  Session& localNetworkInterface(const std::string &value);
322 
323  /// \return the local network port number.
324  Port localPort() const;
325 
326  /// \return the value of the `TCP_NODELAY` option (improve latency at the expense of message throughput).
327  bool tcpNoDelayOption() const;
328 
329  /// Sets the `TCP_NODELAY` option (improve latency at the expense of message throughput).
330  Session& tcpNoDelayOption(bool value);
331 
332  /// \return Socket options.
333  const SocketOptions &socketOptions() const;
334 
335  /// Sets socket options.
336  Session& socketOptions(const SocketOptions &options);
337 
338  static const int UndefinedAffinity = -1;
339 
340  /// Sets the receiving thread CPU affinity.
341  Session& receivingThreadAffinity(CpuIndex cpuIndex);
342 
343  /// \return the receiving thread CPU affinity.
344  Session& receivingThreadAffinity(const CpuIndexes &cpuIndexes);
345 
346  /// Sets the receiving thread CPU affinity.
347  const CpuIndexes &receivingThreadAffinity() const;
348 
349  /// Sets the sending thread CPU affinity.
350  Session& sendingThreadAffinity(CpuIndex cpuIndex);
351 
352  /// Sets the sending thread CPU affinity.
353  Session& sendingThreadAffinity(const CpuIndexes &cpuIndexes);
354 
355  /// \return the auxiliary sending thread CPU affinity.
356  const CpuIndexes &sendingThreadAffinity() const;
357 
358  /// \return the scheduling priority of the receiving thread.
359  int receivingThreadPriority() const;
360 
361  /// Sets the scheduling priority of the receiving thread.
362  Session& receivingThreadPriority(int priority);
363 
364  /// \return the scheduling priority of the sending thread.
365  int sendingThreadPriority() const;
366 
367  /// \return the scheduling priority of the sending thread.
368  Session& sendingThreadPriority(int priority);
369 
370  /// \return the receiving thread policy.
371  int receivingThreadPolicy() const;
372 
373  /// Sets the receiving thread policy.
374  Session& receivingThreadPolicy(int policy);
375 
376  /// Sets the sending thread policy.
377  int sendingThreadPolicy() const;
378 
379  /// \return the sending thread policy.
380  Session& sendingThreadPolicy(int policy);
381 
382  /// \return the receive spinning timeout value (in microseconds).
383  unsigned receiveSpinningTimeout() const ONIXS_B3_BOE_NOTHROW;
384 
385  /// Sets the receive spinning timeout value (in microseconds).
386  Session& receiveSpinningTimeout(unsigned timeoutInUs);
387 
388  /// \return the send spinning timeout value (in microseconds).
389  unsigned sendSpinningTimeout() const ONIXS_B3_BOE_NOTHROW;
390 
391  /// Sets the send spinning timeout value (in microseconds).
392  Session& sendSpinningTimeout(unsigned timeoutInUs);
393 
394  /// \return `true` if new messages are reported even when the message gap is detected,
395  /// and the reply on the `Retransmit Request` message is expected; otherwise - `false`.
396  bool reportNewMessagesWhileWaitingForMissedMessages() const;
397 
398  /// Sets the option to report new messages even when the message gap is detected,
399  /// and the reply on the `Retransmit Request` message is expected.
400  Session& reportNewMessagesWhileWaitingForMissedMessages(bool value);
401 
402  /// \return the number of messages to be sent together.
403  unsigned messageGrouping() const ONIXS_B3_BOE_NOTHROW;
404 
405  /**
406  * Sets the number of messages to be sent together.
407  *
408  * \param numberOfMessagesToGroup If the parameter is zero (by default) or one, the Handler tries to send
409  * an outgoing application-level message in the context of the thread that calls the OnixS::B3::BOE::Session::send method.
410  * If the message cannot be sent immediately, it is stored in the queue for the subsequent sending by the sending thread.
411  * If this parameter is larger than one, the Handler stores outgoing SBE messages in the queue for the subsequent sending by the sending thread.
412  *
413  * \return This session.
414  */
415  Session& messageGrouping(unsigned numberOfMessagesToGroup);
416 
417  /// \return Session storage type.
418  SessionStorageType::Enum storageType() const ONIXS_B3_BOE_NOTHROW;
419 
420  /// \return Session storage ID.
421  const std::string &storageId() const ONIXS_B3_BOE_NOTHROW;
422 
423  /// \return the sequence number of the next outgoing message.
424  SeqNumber outSeqNum() const ONIXS_B3_BOE_NOTHROW;
425 
426  /// Sets the sequence number of the next outgoing message.
427  Session& outSeqNum(SeqNumber nextOutSeqNum);
428 
429  /// \return the expected sequence number of the next inbound message.
430  SeqNumber inSeqNum() const;
431 
432  /// Sets the expected sequence number of the next inbound message.
433  Session& inSeqNum(SeqNumber nextExpectedInSeqNum);
434 
435  /**
436  * Backups the current log files, resets the sequence numbers to 1 and generates a new sessionVerId.
437  *
438  * \warning Can be called only when the session is disconnected.
439  *
440  * \return This session.
441  */
442  Session& reset();
443 
444  /// \return Session's current state.
445  SessionStateId::Enum state() const;
446 
447  /// \return the time when the session was created or the last reset operation was performed.
448  Messaging::Timestamp creationTime() const ONIXS_B3_BOE_NOTHROW;
449 
450  /// Flushes all internal buffers of the session storage.
451  void flushSessionStorage();
452 
453  /// \return the maximum number of tries to restore the telecommunications link.
454  unsigned reconnectAttempts() const;
455 
456  /// Sets the maximum number of tries to restore the telecommunications link.
457  Session& reconnectAttempts(unsigned reconnectAttempts);
458 
459  /// \return the time interval between retries to restore the telecommunications link (seconds).
460  unsigned reconnectInterval() const;
461 
462  /// Sets the time interval between retries to restore the telecommunications link (seconds).
463  Session& reconnectInterval(unsigned seconds);
464 
465  /// \return the usage of local time in Handler events and logs files.
466  bool localTimeUsage() const;
467 
468  /// Specifies the usage of local time in Handler events and logs files.
469  Session& localTimeUsage(bool useLocalTime);
470 
471  /// \return The session storage directory.
472  const std::string &storageDirectory() const;
473 
474  /// \return `true` if logging of an outgoing message to the session storage is performed before sending it to the wire; otherwise - `false`.
475  bool logBeforeSending() const;
476 
477  /// Sets the option to log an outgoing message to the session storage before sending it to the wire.
478  Session& logBeforeSending(bool value);
479 
480  /// \return the maximum size of the message queue used during message gap recovery.
481  size_t incomingMessageGapQueueMaximumSize() const;
482 
483  /// Sets the maximum size of the message queue used during message gap recovery.
484  Session& incomingMessageGapQueueMaximumSize(size_t value);
485 
486  /// Write the given user's message to the Handler's log file using the session formatted Id.
487  const Session& log(const std::string &message) const;
488 
489  /// \return the human-friendly description.
490  std::string toString() const;
491 
492  /// \return the license expiration date.
493  Messaging::Timestamp licenseExpirationDate() const;
494 
495  /// \return the version of the message schema.
496  Messaging::SchemaVersion messagingVersion() const ONIXS_B3_BOE_NOTHROW;
497 
498  /// Represents invalid value of socket handle.
499  static const Handle InvalidSocketHandle;
500 
501  /// Returns the socket handle which the session uses to transmit data.
502  ///
503  /// \note The TCPDirect mode does not support this method, so it always returns the `InvalidSocketHandle` value.
504  Handle socketHandle();
505 
506  /// Returns the total number of bytes in the outbound queue.
507  size_t outboundQueueBytes();
508 
509  /// Gets information about the underlying TCP connection.
510  ///
511  /// \return `true` when success; otherwise - `false`.
512  /// \note Available on Linux only.
513  /// \note Not available for the TcpDirect mode.
514  bool getTcpInfo(TcpInfo&);
515 
516  /// \return the version of the library.
517  static const std::string& version() ONIXS_B3_BOE_NOTHROW;
518 
519 private:
520  Session(const Session& );
521  Session& operator=(const Session& );
522 
523  void send(NetworkMessage msg, Messaging::Timestamp ts);
524  void send(NetMessages& msgs, size_t maxPacketSize, Messaging::Timestamp ts);
525  void warmUp(NetworkMessage msg, int warmupFlags, Messaging::Timestamp ts);
526  void warmUp(NetMessages& msgs, size_t maxPacketSize, int warmupFlags, Messaging::Timestamp ts);
527 
528  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
529  void validate(const Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg) const;
530 
531  void validateVersion(Messaging::SchemaVersion version) const;
532 
533  Session& sendRetransmitRequest(UInt64 sessionVerId, SeqNumber from, SeqNumber to);
534 
535  struct Impl;
536  Impl *const impl_;
537 
538 friend struct AdHoc::SessionHelper;
539 };
540 
541 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
543 Session& Session::send(Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg, Messaging::Timestamp ts)
544 {
545  assert(ts != Messaging::Timestamp());
546 
547 #ifndef NDEBUG
548  validate(msg);
549 #endif
550 
551  send(msg.toNetworkMessage(), ts);
552 
553  return *this;
554 }
555 
556 template <typename SbeMessageType, size_t MaxMessageSize>
560  Messaging::Timestamp sendingTime,
561  int warmupFlags)
562 {
563  warmUp(msg.toNetworkMessage(), warmupFlags, sendingTime);
564 
565  return *this;
566 }
567 
568 #if defined (ONIXS_B3_BOE_CXX11)
569 
570 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
573 {
574  assert(ts != Messaging::Timestamp());
575 
576  send(msg, ts);
577  return *this;
578 }
579 
580 #endif
581 
582 #if defined (ONIXS_B3_BOE_CXX11)
583 
584 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
589  size_t maxPacketSize)
590 {
591  assert(ts != Messaging::Timestamp());
592 
593  send(msgs.netMsgs_, maxPacketSize, ts);
594  return *this;
595 }
596 
597 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
601  Messaging::Timestamp ts, size_t maxPacketSize, int flags)
602 {
603  assert(ts != Messaging::Timestamp());
604 
605  warmUp(msgs.netMsgs_, maxPacketSize, flags, ts);
606  return *this;
607 }
608 
609 #endif
610 
611 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
613 {
614  validateVersion(holder->version());
615 }
616 
617 }
618 }
619 }
The time point without the time-zone information.
Definition: Time.h:467
Session & send(Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg, Messaging::Timestamp sendingTime=UtcWatch::now())
Sends the message.
Definition: Session.h:543
#define ONIXS_B3_BOE_NULLPTR
Definition: Compiler.h:188
Contains the SimpleOpenFramingHeader, the SBE message, and the data buffer.
TCP state information.
Definition: TcpInfo.h:30
MessageHeader::Version SchemaVersion
SBE-encoded data version type.
Definition: SchemaTraits.h:30
#define ONIXS_B3_BOE_NOTHROW
Definition: Compiler.h:182
#define ONIXS_B3_BOE_HOTPATH
Definition: Compiler.h:193
The message batch wrapper.
Definition: MessageBatch.h:53
Session&#39;s Listener.
#define ONIXS_B3_BOE_EXPORTED
Definition: Compiler.h:181
STL namespace.
Session&#39;s storage.
UInt32 SessionID
Client connection identification on the gateway assigned by B3.
Definition: Fields.h:103
std::vector< NetworkMessage > NetMessages
Definition: MessageBatch.h:42
Session&#39;s network stack reactor interface.
Definition: Defines.h:40
std::set< CpuIndex > CpuIndexes
Definition: Defines.h:73
Messaging::UInt32 SeqNumber
Definition: Messaging.h:52
Session & warmUp(Messaging::MessageHolder< SbeMessageType, MaxMessageSize > &msg, Messaging::Timestamp sendingTime=UtcWatch::now(), int warmupFlags=0)
Warms up the sending path.
Definition: Session.h:558
A FIXP Session.
Definition: Session.h:47
unsigned short Port
Definition: Defines.h:44
std::vector< SocketOption > SocketOptions
Socket options.
Definition: Defines.h:77
size_t CpuIndex
Definition: Defines.h:72
int Handle
Type alias for socket handle.
Definition: Defines.h:56
NetworkMessage toNetworkMessage() noexcept
The class can be used to combine messages with different types to the batch for sending.
Definition: MessageBatch.h:139