OnixS C++ CME iLink 3 Binary Order Entry Handler 1.19.0
API Documentation
Loading...
Searching...
No Matches
Session.h
Go to the documentation of this file.
1/*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable OnixS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19
20#pragma once
21
22#include <limits.h>
23
33
34namespace OnixS
35{
36namespace CME
37{
38namespace iLink3
39{
40// Forward declarations
41class TcpDirectStack;
42namespace AdHoc { struct SessionHelper; }
43
48{
49public:
50 static const UInt64 UndefinedUuid = 0;
51
65 SessionStorage *storage = nullptr, UInt64 uuid = UndefinedUuid, const std::string &customKey = "");
66
67 Session(SessionReactor & stack, const SessionSettings & settings, int marketSegmentId,
69 SessionStorage * storage = nullptr, UInt64 uuid = UndefinedUuid, const std::string &customKey = "");
70
72
74 UInt64 uuid() const noexcept;
75
77 std::string id() const;
78
80 int marketSegmentId() const noexcept;
81
83 std::string customKey() const;
84
86 bool negotiated() const;
87
92
95
98
107 Session& connect(const std::string &host, Port port);
108
115 Threading::SharedFuture<void> connectAsync(const std::string & host, Port port);
116
126 Session& disconnect(const std::string &reason = "");
127
133 Threading::SharedFuture<void> disconnectAsync(const std::string & reason = "");
134
140
144 Threading::SharedFuture<void> breakConnectionAsync();
145
156 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
157 Session& send(
158 Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg,
159 Messaging::Timestamp sendingTime = UtcWatch::now());
160
161#if defined (ONIXS_ILINK3_CXX11)
162
172 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
173 Session& send(
175 Messaging::Timestamp sendingTime = UtcWatch::now());
176
177#endif
178
180 static const size_t CmeMaxPacketSize = 1420;
181
182#if defined (ONIXS_ILINK3_CXX11)
183
199 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
200 Session& send(
202 Messaging::Timestamp sendingTime = UtcWatch::now(),
203 size_t maxPacketSize = CmeMaxPacketSize);
204
222 Messaging::Timestamp sendingTime = UtcWatch::now(),
223 size_t maxPacketSize = CmeMaxPacketSize);
224
225#endif
226
243 NetworkMessage* begin, NetworkMessage* end,
244 Messaging::Timestamp sendingTime = UtcWatch::now(),
245 size_t maxPacketSize = CmeMaxPacketSize) ONIXS_ILINK3_NONULL;
246
263 NetMessagesRange range,
264 Messaging::Timestamp sendingTime = UtcWatch::now(),
265 size_t maxPacketSize = CmeMaxPacketSize);
266
279
291 template <typename SbeMessageType, size_t MaxMessageSize>
294 Messaging::Timestamp sendingTime = UtcWatch::now(),
295 int warmupFlags = 0);
296
297
298#if defined (ONIXS_ILINK3_CXX11)
299
315 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
318 Messaging::Timestamp sendingTime = UtcWatch::now(),
319 size_t maxPacketSize = CmeMaxPacketSize,
320 int warmupFlags = 0);
321
339 size_t maxPacketSize = CmeMaxPacketSize, int warmupFlags = 0);
340#endif
341
360 size_t maxPacketSize = CmeMaxPacketSize, int warmupFlags = 0) ONIXS_ILINK3_NONULL;
361
378 NetMessagesRange range, Messaging::Timestamp sendingTime = UtcWatch::now(),
379 size_t maxPacketSize = CmeMaxPacketSize, int warmupFlags = 0);
380
381
389
396 size_t tryThrottle();
397
406 Session & throttlingLimit(size_t messagesCount, size_t intervalInMs = 1000);
407
409 std::string remoteHost() const;
410
412 std::string remoteIpAddress() const;
413
416
418 std::pair<Port, Port> localPortRange() const noexcept;
419
421 Session& localPortRange(std::pair<Port, Port> portRange);
422
424 std::string localNetworkInterface() const;
425
427 Session& localNetworkInterface(const std::string &value);
428
431
433 bool tcpNoDelayOption() const;
434
437
440
443
444 static const int UndefinedAffinity = -1;
445
448
451
454
457
460
463
466
469
472
475
478
481
484
487
489 unsigned receiveSpinningTimeout() const noexcept;
490
492 Session& receiveSpinningTimeout(unsigned timeoutInUs);
493
495 unsigned sendSpinningTimeout() const noexcept;
496
498 Session& sendSpinningTimeout(unsigned timeoutInUs);
499
504
509
511 unsigned messageGrouping() const noexcept;
512
523 Session& messageGrouping(unsigned numberOfMessagesToGroup);
524
527
529 SessionStorageType::Enum storageType() const noexcept;
530
532 const std::string &storageId() const noexcept;
533
535 SeqNumber outSeqNum() const noexcept;
536
538 Session& outSeqNum(SeqNumber nextOutSeqNum);
539
542
544 Session& inSeqNum(SeqNumber nextExpectedInSeqNum);
545
551 SeqNumber previousSeqNo() const noexcept;
552
555
562 Messaging::UInt64 previousUuid() const noexcept;
563
566
576 Session& reset(bool startOfWeek = false);
577
579 SessionStateId::Enum state() const;
580
583
586
588 unsigned reconnectAttempts() const;
589
592
594 unsigned reconnectInterval() const;
595
597 Session& reconnectInterval(unsigned seconds);
598
600 bool localTimeUsage() const;
601
603 Session& localTimeUsage(bool useLocalTime);
604
606 const std::string &storageDirectory() const;
607
609 bool logBeforeSending() const;
610
613
616
619
621 const Session& log(const std::string &message) const;
622
624 std::string toString() const;
625
628
631
634
637
640
645
648
655
657 static std::string version();
658
659private:
660 Session(const Session& );
661 Session& operator=(const Session& );
662
663 void send(NetworkMessage msg, Messaging::Timestamp ts);
664 void send(NetworkMessage* begin, NetworkMessage* end, size_t maxPacketSize, Messaging::Timestamp ts);
665 void warmUp(NetworkMessage msg, int warmupFlags, Messaging::Timestamp ts);
666 void warmUp(NetworkMessage* begin, NetworkMessage* end, size_t maxPacketSize, int warmupFlags, Messaging::Timestamp ts);
667
668 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
669 void validate(const Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg) const;
670
671 void validateVersion(Messaging::SchemaVersion version) const;
672
673 Session& sendRetransmitRequest(UInt64 uuid, SeqNumber from, SeqNumber to);
674
675 struct Impl;
676 Impl *const impl_;
677
678friend struct AdHoc::SessionHelper;
679};
680
685{
686public:
699 SessionStorage *storage = nullptr, UInt64 uuid = UndefinedUuid, const std::string &customKey = "");
700
712 CgwSession(SessionReactor & stack, const SessionSettings & settings,
714 SessionStorage * storage = nullptr, UInt64 uuid = UndefinedUuid, const std::string &customKey = "");
715
716private:
717 int marketSegmentId() const noexcept;
718 static const int marketSegmentId_ = INT_MAX;
719};
720
721template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
723Session& Session::send(Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg, Messaging::Timestamp ts)
724{
725 assert(ts != Messaging::Timestamp());
726
727#ifndef NDEBUG
728 validate(msg);
729#endif
730
731 send(msg.toNetworkMessage(), ts);
732
733 return *this;
734}
735
736template <typename SbeMessageType, size_t MaxMessageSize>
740 Messaging::Timestamp sendingTime,
741 int warmupFlags)
742{
743 warmUp(msg.toNetworkMessage(), warmupFlags, sendingTime);
744 return *this;
745}
746
747#if defined (ONIXS_ILINK3_CXX11)
748
749template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
758
759#endif
760
761#if defined (ONIXS_ILINK3_CXX11)
762
763template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
768 size_t maxPacketSize)
769{
770 assert(ts != Messaging::Timestamp());
771
772 if ONIXS_ILINK3_UNLIKELY(msgs.netMsgs_.empty()) {
773 return *this;
774 }
775
776 NetworkMessage* const begin = &*msgs.netMsgs_.begin();
777 NetworkMessage* const end = begin + msgs.netMsgs_.size();
778
779 send(begin, end, maxPacketSize, ts);
780 return *this;
781}
782
783template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
787 Messaging::Timestamp ts, size_t maxPacketSize, int flags)
788{
789 assert(ts != Messaging::Timestamp());
790
791 if ONIXS_ILINK3_UNLIKELY(msgs.netMsgs_.empty())
792 {
793 return *this;
794 }
795
796 NetworkMessage* const begin = &*msgs.netMsgs_.begin();
797 NetworkMessage* const end = begin + msgs.netMsgs_.size();
798
799 warmUp(begin, end, maxPacketSize, flags, ts);
800 return *this;
801}
802
803#endif
804
805template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
807{
808 Messaging::validate(holder);
809 validateVersion(holder->version());
810}
811
812}
813}
814}
#define ONIXS_ILINK3_UNLIKELY(cond)
Definition Compiler.h:158
#define ONIXS_ILINK3_HOTPATH
Definition Compiler.h:148
#define ONIXS_ILINK3_EXPORTED
Definition Compiler.h:145
CgwSession(SessionReactor &stack, const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 uuid=UndefinedUuid, const std::string &customKey="")
Constructor.
CgwSession(const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 uuid=UndefinedUuid, const std::string &customKey="")
Constructor.
The class can be used to combine messages with different types to the batch for sending.
Contains the SimpleOpenFramingHeader, the SBE message, and the data buffer.
The time point without the time-zone information.
Definition Time.h:468
Session's network stack reactor interface.
static const UInt64 UndefinedUuid
Definition Session.h:50
Messaging::Timestamp licenseExpirationDate() const
unsigned messageGrouping() const noexcept
std::string localNetworkInterface() const
std::string customKey() const
Threading::SharedFuture< void > breakConnectionAsync()
Breaks the connection non-gracefully asynchronously.
friend struct AdHoc::SessionHelper
Definition Session.h:678
static const int UndefinedAffinity
Definition Session.h:444
static const size_t CmeMaxPacketSize
CME states: Packet size maximum behavior based on MTU (Maximum Transmission Unit) - 1420 bytes,...
Definition Session.h:180
std::string remoteIpAddress() const
Session & throttle()
Performs the throttling of a session that must be called before each send function call.
Session(SessionReactor &stack, const SessionSettings &settings, int marketSegmentId, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 uuid=UndefinedUuid, const std::string &customKey="")
std::string toString() const
Session & send(NetworkMessage *begin, NetworkMessage *end, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=CmeMaxPacketSize) ONIXS_ILINK3_NONULL
Sends messages.
Session & breakConnection()
Breaks the connection non-gracefully.
std::string remoteHost() const
const SocketOptions & socketOptions() const
unsigned reconnectAttempts() const
bool reportNewMessagesWhileWaitingForMissedMessages() const
Session & throttlingLimit(size_t messagesCount, size_t intervalInMs=1000)
Sets throttling limit parameters.
Messaging::FTI::Enum faultToleranceIndicator() const
SeqNumber outSeqNum() const noexcept
Messaging::SchemaVersion messagingVersion() const noexcept
std::pair< Port, Port > localPortRange() const noexcept
const std::string & storageDirectory() const
Session & send(NetMessagesRange range, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=CmeMaxPacketSize)
Sends messages.
size_t incomingMessageGapQueueMaximumSize() const
bool getTcpInfo(TcpInfo &)
Gets information about the underlying TCP connection.
bool detectMessageGapForPreviousUuid() const
Session & sendSequenceMessage(Messaging::KeepAliveLapsed::Enum keepAliveLapsed=Messaging::KeepAliveLapsed::Lapsed)
Send the Sequence message.
unsigned receiveSpinningTimeout() const noexcept
Session & warmUp(Messaging::MessageHolder< SbeMessageType, MaxMessageSize > &msg, Messaging::Timestamp sendingTime=UtcWatch::now(), int warmupFlags=0)
Warms up the sending path.
Definition Session.h:738
Handle socketHandle()
Returns the socket handle which the session uses to transmit data.
Session & receivingThreadAffinity(CpuIndex cpuIndex)
Sets the receiving thread CPU affinity.
int marketSegmentId() const noexcept
Session & warmUp(NetworkMessage *begin, NetworkMessage *end, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=CmeMaxPacketSize, int warmupFlags=0) ONIXS_ILINK3_NONULL
Warms up the sending path.
ThreadingModel::Enum threadingModel() const
unsigned reconnectInterval() const
SessionStorageType::Enum storageType() const noexcept
Messaging::UInt64 previousUuid() const noexcept
UInt64 uuid() const noexcept
size_t outboundQueueBytes()
Returns the total number of bytes in the outbound queue.
const std::string & storageId() const noexcept
Session & send(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=CmeMaxPacketSize)
Sends messages.
static std::string version()
size_t tryThrottle()
Checks the throttling of a session that must be called before each send function call.
static const Handle InvalidSocketHandle
Represents invalid value of socket handle.
Definition Session.h:639
Session & disconnect(const std::string &reason="")
Terminates the connection.
Messaging::Timestamp creationTime() const noexcept
Session & reset(bool startOfWeek=false)
Backups the current log files, resets the sequence numbers to 1 and generates a new UUID.
SessionStateId::Enum state() const
void flushSessionStorage()
Flushes all internal buffers of the session storage.
SeqNumber inSeqNum() const
Session & send(Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg, Messaging::Timestamp sendingTime=UtcWatch::now())
Sends the message.
Definition Session.h:723
int receivingThreadPriority() const
const Session & log(const std::string &message) const
Write the given user's message to the Handler's log file using the session formatted Id.
Session & sendingThreadAffinity(CpuIndex cpuIndex)
Sets the sending thread CPU affinity.
Session & connect(const std::string &host, Port port)
Establishes the connection.
SeqNumber previousSeqNo() const noexcept
Session & warmUp(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=CmeMaxPacketSize, int warmupFlags=0)
Warms up the sending path.
unsigned sendSpinningTimeout() const noexcept
Session(const SessionSettings &settings, int marketSegmentId, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 uuid=UndefinedUuid, const std::string &customKey="")
Constructor.
int sendingThreadPolicy() const
Sets the sending thread policy.
Threading::SharedFuture< void > disconnectAsync(const std::string &reason="")
Terminates the connection asynchronously.
Threading::SharedFuture< void > connectAsync(const std::string &host, Port port)
Establishes the connection asynchronously.
A high-level wrapper over the TCPDirect network stack.
void validate(const Message &)
Definition Validation.h:32
MessageHeader::Version SchemaVersion
SBE-encoded data version type.
std::pair< NetworkMessage *, NetworkMessage * > NetMessagesRange
unsigned short Port
Definition Defines.h:44
std::vector< SocketOption > SocketOptions
Socket options.
Definition Defines.h:77
int Handle
Type alias for socket handle.
Definition Defines.h:56
std::set< CpuIndex > CpuIndexes
Definition Defines.h:73
Messaging::UInt32 SeqNumber
Definition Messaging.h:60
@ FileBased
File-based Session Storage.
TCP state information.
Definition TcpInfo.h:32