OnixS C++ CME MDP Conflated TCP Handler  1.3.1
API Documentation
Session.h
Go to the documentation of this file.
1 /*
2  * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3  *
4  * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5  * and international copyright treaties.
6  *
7  * Access to and use of the software is governed by the terms of the applicable OnixS Software
8  * Services Agreement (the Agreement) and Customer end user license agreements granting
9  * a non-assignable, non-transferable and non-exclusive license to use the software
10  * for it's own data processing purposes under the terms defined in the Agreement.
11  *
12  * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13  * of this source code or associated reference material to any other location for further reproduction
14  * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15  *
16  * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17  * the terms of the Agreement is a violation of copyright law.
18  */
19 
20 #pragma once
21 
28 
29 namespace OnixS
30 {
31 namespace CME
32 {
33 namespace ConflatedTCP
34 {
35 /**
36 * An Conflated TCP Session.
37 */
39 {
40 public:
41  static const UInt64 UndefinedUuid = 0;
42 
43  /**
44  * Constructor.
45  *
46  * \param settings Session settings.
47  * \param marketSegmentId Market segment ID.
48  * \param sessionListener Session listener.
49  * \param marketDataListener Market Data listener.
50  * \param storageType Session storage type.
51  * \param storage Session storage.
52  * \param uuid Universally Unique Identifier (UUID).
53  * \param customKey Custom key that could be used to distinguish sessions with the same UUID, Session ID, FirmId, and Market Segment ID values.
54  */
55  Session(const SessionSettings &settings, int marketSegmentId,
56  SessionListener* sessionListener, MDP::MarketDataListener* marketDataListener, SessionStorageType::Enum storageType = SessionStorageType::FileBased,
57  SessionStorage *storage = ONIXS_CONFLATEDTCP_NULLPTR, UInt64 uuid = UndefinedUuid, const std::string &customKey = "" );
58 
59  ~Session();
60 
61  /// \return the Universally Unique Identifier (UUID).
62  UInt64 uuid() const;
63 
64  /// \return Session ID (assigned by the exchange).
65  const std::string id() const;
66 
67  /// \return Market Segment ID.
68  int marketSegmentId() const;
69 
70  /// \return Custom key that could be used to distinguish sessions with the same UUID, Session ID, FirmId, and Market Segment ID values.
71  const std::string customKey() const;
72 
73  /**
74  * Establishes the connection.
75  *
76  * \param host Market Segment Gateway host.
77  * \param port Market Segment Gateway port.
78  *
79  * \return This session.
80  */
81  Session& connect(const std::string &host, Port port);
82 
83  /**
84  * Terminates the connection.
85  *
86  * \param reason Terminate reason details.
87  *
88  * \note Method blocks until the Terminate message is received in reply, or the timeout is elapsed.
89  *
90  * \return This session.
91  */
92  Session& disconnect(const std::string &reason = "Terminate");
93 
94  /**
95  * Sends the message.
96  *
97  * \note This call is thread-safe.
98  *
99  * \return This session.
100  */
101  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
103 
104 #if ONIXS_CONFLATEDTCP_COMPILER_CXX_RVALUE_REFERENCES
105 
106  /**
107  * Sends the message.
108  *
109  * \note This call is thread-safe.
110  *
111  * \return This session.
112  */
113  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
115 
116 #endif
117 
118 #if ONIXS_CONFLATEDTCP_COMPILER_CXX_ALIAS_TEMPLATES
119 
120  /**
121  * Sends messages.
122  *
123  * \note This call is thread-safe.
124  *
125  * \return This session.
126  */
127  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
129 
130 #endif
131 
132 #if defined(ONIXS_CONFLATEDTCP_CXX11)
133 
134  /**
135  * Sends messages.
136  *
137  * \note This call is thread-safe.
138  *
139  * \return This session.
140  */
141  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
143 
144 #endif
145 
146  /// \return the host name the session is connected to.
147  std::string remoteHost() const;
148 
149  /// \return the IP address the session is connected to.
150  std::string remoteIpAddress() const;
151 
152  /// \return the port number the session is connected to.
153  Port remotePort() const;
154 
155  // TODO: move to SessionSettings
156  std::pair<Port, Port> localPortRange() const;
157 
158  // TODO: move to SessionSettings
159  Session& localPortRange(std::pair<Port, Port> portRange);
160 
161  /// \return the IP Address or name of the local network interface.
162  std::string localNetworkInterface() const;
163 
164  /// Sets the IP Address or name of the local network interface.
165  Session& localNetworkInterface(const std::string &value);
166 
167  /// \return the local network port number.
168  Port localPort() const;
169 
170  /// \return the value of the `TCP_NODELAY` option (improve latency at the expense of message throughput).
171  bool tcpNoDelayOption() const;
172 
173  /// Sets the `TCP_NODELAY` option (improve latency at the expense of message throughput).
174  Session& tcpNoDelayOption(bool value);
175 
176  /// \return Socket options.
177  const SocketOptions &socketOptions() const;
178 
179  /// Sets socket options.
180  Session& socketOptions(const SocketOptions &options);
181 
182  static const int UndefinedAffinity = -1;
183 
184  /// Sets the receiving thread CPU affinity.
185  Session& receivingThreadAffinity(CpuIndex cpuIndex);
186 
187  /// \return the receiving thread CPU affinity.
188  Session& receivingThreadAffinity(const CpuIndexes &cpuIndexes);
189 
190  /// Sets the receiving thread CPU affinity.
191  const CpuIndexes &receivingThreadAffinity() const;
192 
193  /// Sets the sending thread CPU affinity.
194  Session& sendingThreadAffinity(CpuIndex cpuIndex);
195 
196  /// Sets the sending thread CPU affinity.
197  Session& sendingThreadAffinity(const CpuIndexes &cpuIndexes);
198 
199  /// \return the auxiliary sending thread CPU affinity.
200  const CpuIndexes &sendingThreadAffinity() const;
201 
202  /// \return the scheduling priority of the receiving thread.
203  int receivingThreadPriority() const;
204 
205  /// Sets the scheduling priority of the receiving thread.
206  Session& receivingThreadPriority(int priority);
207 
208  /// \return the scheduling priority of the sending thread.
209  int sendingThreadPriority() const;
210 
211  /// \return the scheduling priority of the sending thread.
212  Session& sendingThreadPriority(int priority);
213 
214  /// \return the receiving thread policy.
215  int receivingThreadPolicy() const;
216 
217  /// Sets the receiving thread policy.
218  Session& receivingThreadPolicy(int policy);
219 
220  /// Sets the sending thread policy.
221  int sendingThreadPolicy() const;
222 
223  /// \return the sending thread policy.
224  Session& sendingThreadPolicy(int policy);
225 
226  /// \return the receive spinning timeout value (in microseconds).
227  unsigned receiveSpinningTimeout() const;
228 
229  /// Sets the receive spinning timeout value (in microseconds).
230  Session& receiveSpinningTimeout(unsigned timeoutInUs);
231 
232  /// \return the send spinning timeout value (in microseconds).
233  unsigned sendSpinningTimeout() const;
234 
235  /// Sets the send spinning timeout value (in microseconds).
236  Session& sendSpinningTimeout(unsigned timeoutInUs);
237 
238  /// \return `true` if new messages are reported even when the message gap is detected,
239  /// and the reply on the [Retransmit Request](https://www.cmegroup.com/confluence/display/EPICSANDBOX/Retransmit+Request)
240  /// message is expected, otherwise - `false`.
241  bool reportNewMessagesWhileWaitingForMissedMessages() const;
242 
243  /// Sets the option to report new messages even when the message gap is detected,
244  /// and the reply on the [Retransmit Request](https://www.cmegroup.com/confluence/display/EPICSANDBOX/Retransmit+Request)
245  /// message is expected.
246  Session& reportNewMessagesWhileWaitingForMissedMessages(bool value);
247 
248  /// \return `true` if incoming message sequence numbers are validated, otherwise - `false`.
249  bool validateSequenceNumbers() const;
250 
251  /// Turns on/off the validation of incoming message sequence numbers.
252  Session& validateSequenceNumbers(bool validate);
253 
254  /// \return the number of messages to be sent together.
255  unsigned messageGrouping() const;
256 
257  /**
258  * Sets the number of messages to be sent together.
259  *
260  * \param numberOfMessagesToGroup If the parameter is zero (by default) or one, the Handler tries to send
261  * an outgoing application-level message in the context of the thread that calls the OnixS::CME::ConflatedTCP::Session::send method.
262  * If the message cannot be sent immediately, it is stored in the queue for the subsequent sending by the sending thread.
263  * If this parameter is larger than one, the Handler stores outgoing SBE messages in the queue for the subsequent sending by the sending thread.
264  *
265  * \return This session.
266  */
267  Session& messageGrouping(unsigned numberOfMessagesToGroup);
268 
269  /// \return Session storage type.
270  SessionStorageType::Enum storageType() const;
271 
272  /// \return Session storage ID.
273  const std::string &storageId() const;
274 
275  /// \return the sequence number of the next outgoing message.
276  SeqNumber outSeqNum() const;
277 
278  /// Sets the sequence number of the next outgoing message.
279  Session& outSeqNum(SeqNumber nextOutSeqNum);
280 
281  /// \return the expected sequence number of the next inbound message.
282  SeqNumber inSeqNum() const;
283 
284  /// Sets the expected sequence number of the next inbound message.
285  Session& inSeqNum(SeqNumber nextExpectedInSeqNum);
286 
287  /**
288  * \return the sequence number of the last business message published by CME with the PreviousUUID.
289  *
290  * If no business message was published, the value is zero.
291  */
292  SeqNumber previousSeqNo() const;
293 
294  /// Sets the sequence number of the last business message published by CME with the PreviousUUID.
295  Session& previousSeqNo(SeqNumber value);
296 
297  /**
298  * \return The UUID from the previously Established session.
299  *
300  * - This can be the CME assigned default UUID=0 for messages published by CME before first Negotiation of customer at the beginning of the week.
301  * - This can be the last UUID as used by the customer from the previously Established session.
302  */
303  Messaging::UInt64 previousUuid() const;
304 
305  /// Sets the UUID from the previously Established session.
306  Session& previousUuid(Messaging::UInt64 value);
307 
308  /**
309  * Backups the current log files, resets the sequence numbers to 1 and generates a new UUID.
310  *
311  * \warning Can be called only when the session is disconnected.
312  *
313  * \param startOfWeek if `true` then the PreviousUUID and PreviousSeqNum are reset to 0.
314  *
315  * \return This session.
316  */
317  Session& reset(bool startOfWeek = false);
318 
319  /// \return Session's current state.
320  SessionStateId::Enum state();
321 
322  /// \return the time when the session was created or the last reset operation was performed.
323  Messaging::Timestamp creationTime() const;
324 
325  /// Flushes all internal buffers of the session storage.
326  void flushSessionStorage();
327 
328  /// \return the maximum number of tries to restore the telecommunications link.
329  unsigned reconnectAttempts() const;
330 
331  /// Sets the maximum number of tries to restore the telecommunications link.
332  Session& reconnectAttempts(unsigned reconnectAttempts);
333 
334  /// \return the time interval between retries to restore the telecommunications link (seconds).
335  unsigned reconnectInterval() const;
336 
337  /// Sets the time interval between retries to restore the telecommunications link (seconds).
338  Session& reconnectInterval(unsigned seconds);
339 
340  /// \return the usage of local time in Handler events and logs files.
341  bool localTimeUsage() const;
342 
343  /// Specifies the usage of local time in Handler events and logs files.
344  Session& localTimeUsage(bool useLocalTime);
345 
346  /// \return The session storage directory.
347  const std::string &storageDirectory() const;
348 
349  /// \return `true` if logging of an outgoing message to the session storage is performed before sending it to the wire, otherwise - `false`.
350  bool logBeforeSending() const;
351 
352  /// Sets the option to log an outgoing message to the session storage before sending it to the wire.
353  Session& logBeforeSending(bool value);
354 
355  // @TODO: document this method.
356  size_t incomingMessageGapQueueMaximumSize() const;
357 
358  // @TODO: document this method.
359  Session& incomingMessageGapQueueMaximumSize(size_t value);
360 
361  /// Write the given user's message to the Handler's log file using the session formatted Id.
362  const Session& log(const std::string &message) const;
363 
364  /// \return the human-friendly description.
365  std::string toString() const;
366 
367 private:
368  Session(const Session& );
369  Session& operator=(const Session& );
370 
372  void send(std::vector<Messaging::SbeMessage> msgs, std::vector<Messaging::SimpleOpenFramingHeader *> headers);
374 
375  struct Impl;
376  Impl *const impl_;
377 };
378 
379 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
381 {
382  const Messaging::MessageSize messageSize = msg.setHeader();
383  Messaging::SimpleOpenFramingHeader *const header = msg.header();
384  send(Messaging::SbeMessage(header + 1, messageSize, Messaging::SbeMessage::NoCheck()), *header);
385 
386  return *this;
387 }
388 
389 #if ONIXS_CONFLATEDTCP_COMPILER_CXX_RVALUE_REFERENCES
390 
391 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
393 {
394  send(msg);
395 
396  return *this;
397 }
398 
399 #endif
400 
401 #if ONIXS_CONFLATEDTCP_COMPILER_CXX_ALIAS_TEMPLATES
402 
403 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
405 {
406  std::vector<Messaging::SbeMessage> messages(msgs.size());
407  std::vector<Messaging::SimpleOpenFramingHeader *> headers(msgs.size());
408 
409  for (size_t i = 0; i < msgs.size(); ++i)
410  {
411  const Messaging::MessageSize messageSize = msgs[i].setHeader();
412  Messaging::SimpleOpenFramingHeader *const header = msgs[i].header();
413  messages[i] = Messaging::SbeMessage(header + 1, messageSize, Messaging::SbeMessage::NoCheck());
414  headers[i] = header;
415  }
416 
417  send(messages, headers);
418 
419  return *this;
420 }
421 
422 #endif
423 
424 #if defined(ONIXS_CONFLATEDTCP_CXX11)
425 
426 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
428 {
430  return send(messages);
431 }
432 
433 #endif
434 
435 
436 }
437 }
438 }
unsigned short Port
Definition: Defines.h:44
An Conflated TCP Session.
Definition: Session.h:38
Messaging::UInt32 SeqNumber
Definition: Messaging.h:58
Contains the SimpleOpenFramingHeader, the SBE message, and the data buffer.
#define ONIXS_CONFLATEDTCP_EXPORTED
Definition: Compiler.h:187
Session & send(Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg)
Sends the message.
Definition: Session.h:380
std::vector< SocketOption > SocketOptions
Socket options.
Definition: Defines.h:77
Definition: Defines.h:40
std::set< CpuIndex > CpuIndexes
Definition: Defines.h:74
The time point without the time-zone information.
Definition: Time.h:467
MessageSize setHeader() noexcept
Calculates the binary size of the message and updates the Simple Open Framing Header accordingly...
UInt16 MessageSize
Message length type.
Definition: Aliases.h:29
Callbacks invoked by Handler during market data processing to reflect various stages of processing...
std::vector< MessageHolder< MessageTypeT, MaxMessageSize, MessageInitializer > > MessageHolders
const SimpleOpenFramingHeader * header() const noexcept