OnixS C++ CME iLink 3 Binary Order Entry Handler 1.19.4
Users' manual and API documentation
Loading...
Searching...
No Matches
Session.h
Go to the documentation of this file.
1/*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable OnixS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19
20#pragma once
21
22#include <limits.h>
23
33
34namespace OnixS
35{
36namespace CME
37{
38namespace iLink3
39{
40// Forward declarations
41class TcpDirectStack;
42namespace AdHoc { struct SessionHelper; }
43
48{
49public:
50 static const UInt64 UndefinedUuid = 0;
51
65 SessionStorage *storage = nullptr, UInt64 uuid = UndefinedUuid, const std::string &customKey = "");
66
67 Session(SessionReactor & stack, const SessionSettings & settings, int marketSegmentId,
69 SessionStorage * storage = nullptr, UInt64 uuid = UndefinedUuid, const std::string &customKey = "");
70
72
74 UInt64 uuid() const noexcept;
75
77 std::string id() const;
78
80 int marketSegmentId() const noexcept;
81
83 std::string customKey() const;
84
86 bool negotiated() const;
87
92
95
98
107 Session& connect(const std::string &host, Port port);
108
115 Threading::SharedFuture<void> connectAsync(const std::string & host, Port port);
116
126 Session& disconnect(const std::string &reason = "");
127
133 Threading::SharedFuture<void> disconnectAsync(const std::string & reason = "");
134
140
144 Threading::SharedFuture<void> breakConnectionAsync();
145
156 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
157 Session& send(
158 Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg,
159 Messaging::Timestamp sendingTime = UtcWatch::now());
160
161#if defined (ONIXS_ILINK3_CXX11)
162
172 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
173 Session& send(
175 Messaging::Timestamp sendingTime = UtcWatch::now());
176
177#endif
178
180 static const size_t CmeMaxPacketSize = 1420;
181
182#if defined (ONIXS_ILINK3_CXX11)
183
199 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
200 Session& send(
202 Messaging::Timestamp sendingTime = UtcWatch::now(),
203 size_t maxPacketSize = CmeMaxPacketSize);
204
222 Messaging::Timestamp sendingTime = UtcWatch::now(),
223 size_t maxPacketSize = CmeMaxPacketSize);
224
225#endif
226
243 NetworkMessage* begin, NetworkMessage* end,
244 Messaging::Timestamp sendingTime = UtcWatch::now(),
245 size_t maxPacketSize = CmeMaxPacketSize) ONIXS_ILINK3_NONULL;
246
263 NetMessagesRange range,
264 Messaging::Timestamp sendingTime = UtcWatch::now(),
265 size_t maxPacketSize = CmeMaxPacketSize);
266
279
291 template <typename SbeMessageType, size_t MaxMessageSize>
294 Messaging::Timestamp sendingTime = UtcWatch::now(),
295 int warmupFlags = 0);
296
297
298#if defined (ONIXS_ILINK3_CXX11)
299
315 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
318 Messaging::Timestamp sendingTime = UtcWatch::now(),
319 size_t maxPacketSize = CmeMaxPacketSize,
320 int warmupFlags = 0);
321
339 size_t maxPacketSize = CmeMaxPacketSize, int warmupFlags = 0);
340#endif
341
360 size_t maxPacketSize = CmeMaxPacketSize, int warmupFlags = 0) ONIXS_ILINK3_NONULL;
361
378 NetMessagesRange range, Messaging::Timestamp sendingTime = UtcWatch::now(),
379 size_t maxPacketSize = CmeMaxPacketSize, int warmupFlags = 0);
380
381
389
396 size_t tryThrottle();
397
406 Session & throttlingLimit(size_t messagesCount, size_t intervalInMs = 1000);
407
409 std::string remoteHost() const;
410
412 std::string remoteIpAddress() const;
413
416
418 std::pair<Port, Port> localPortRange() const noexcept;
419
421 Session& localPortRange(std::pair<Port, Port> portRange);
422
424 std::string localNetworkInterface() const;
425
427 Session& localNetworkInterface(const std::string &value);
428
431
433 bool tcpNoDelayOption() const;
434
437
440
443
444 static const int UndefinedAffinity = -1;
445
448
451
454
457
460
463
466
469
472
475
478
481
484
487
489 unsigned receiveSpinningTimeout() const noexcept;
490
492 Session& receiveSpinningTimeout(unsigned timeoutInUs);
493
495 unsigned sendSpinningTimeout() const noexcept;
496
498 Session& sendSpinningTimeout(unsigned timeoutInUs);
499
504
509
511 unsigned messageGrouping() const noexcept;
512
523 Session& messageGrouping(unsigned numberOfMessagesToGroup);
524
527
529 SessionStorageType::Enum storageType() const noexcept;
530
532 const std::string &storageId() const noexcept;
533
535 SeqNumber outSeqNum() const noexcept;
536
538 Session& outSeqNum(SeqNumber nextOutSeqNum);
539
542
544 Session& inSeqNum(SeqNumber nextExpectedInSeqNum);
545
551 SeqNumber previousSeqNo() const noexcept;
552
555
562 Messaging::UInt64 previousUuid() const noexcept;
563
566
576 Session& reset(bool startOfWeek = false);
577
579 SessionStateId::Enum state() const;
580
583
586
588 unsigned reconnectAttempts() const;
589
592
594 unsigned reconnectInterval() const;
595
597 Session& reconnectInterval(unsigned seconds);
598
600 bool localTimeUsage() const;
601
603 Session& localTimeUsage(bool useLocalTime);
604
606 const std::string &storageDirectory() const;
607
609 bool logBeforeSending() const;
610
613
616
619
621 const Session& log(const std::string &message) const;
622
624 std::string toString() const;
625
628
631
634
637
640
645
648
655
657 static std::string version();
658
659private:
660 Session(const Session& );
661 Session& operator=(const Session& );
662
663 void send(NetworkMessage msg, Messaging::Timestamp ts);
664 void send(NetworkMessage* begin, NetworkMessage* end, size_t maxPacketSize, Messaging::Timestamp ts);
665 void warmUp(NetworkMessage msg, int warmupFlags, Messaging::Timestamp ts);
666 void warmUp(NetworkMessage* begin, NetworkMessage* end, size_t maxPacketSize, int warmupFlags, Messaging::Timestamp ts);
667
668 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
669 void validate(const Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg) const;
670
671 void validateVersion(Messaging::SchemaVersion version) const;
672
673 Session& sendRetransmitRequest(UInt64 uuid, SeqNumber from, SeqNumber to);
674
675 struct Impl;
676 Impl *const impl_;
677
678friend struct AdHoc::SessionHelper;
679};
680
685{
686public:
699 SessionStorage *storage = nullptr, UInt64 uuid = UndefinedUuid, const std::string &customKey = "");
700
712 CgwSession(SessionReactor & stack, const SessionSettings & settings,
714 SessionStorage * storage = nullptr, UInt64 uuid = UndefinedUuid, const std::string &customKey = "");
715
716private:
717 int marketSegmentId() const noexcept;
718 static const int marketSegmentId_ = INT_MAX;
719};
720
721template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
723Session& Session::send(Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg, Messaging::Timestamp ts)
724{
725 assert(ts != Messaging::Timestamp());
726
727#ifndef NDEBUG
728 validate(msg);
729#endif
730
731 send(msg.toNetworkMessage(), ts);
732
733 return *this;
734}
735
736template <typename SbeMessageType, size_t MaxMessageSize>
740 Messaging::Timestamp sendingTime,
741 int warmupFlags)
742{
743 warmUp(msg.toNetworkMessage(), warmupFlags, sendingTime);
744 return *this;
745}
746
747#if defined (ONIXS_ILINK3_CXX11)
748
749template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
758
759#endif
760
761#if defined (ONIXS_ILINK3_CXX11)
762
763template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
768 size_t maxPacketSize)
769{
770 assert(ts != Messaging::Timestamp());
771
772 if ONIXS_ILINK3_UNLIKELY(msgs.netMsgs_.empty()) {
773 return *this;
774 }
775
776 NetworkMessage* const begin = &*msgs.netMsgs_.begin();
777 NetworkMessage* const end = begin + msgs.netMsgs_.size();
778
779 send(begin, end, maxPacketSize, ts);
780 return *this;
781}
782
783template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
787 Messaging::Timestamp ts, size_t maxPacketSize, int flags)
788{
789 assert(ts != Messaging::Timestamp());
790
791 if ONIXS_ILINK3_UNLIKELY(msgs.netMsgs_.empty())
792 {
793 return *this;
794 }
795
796 NetworkMessage* const begin = &*msgs.netMsgs_.begin();
797 NetworkMessage* const end = begin + msgs.netMsgs_.size();
798
799 warmUp(begin, end, maxPacketSize, flags, ts);
800 return *this;
801}
802
803#endif
804
805template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
807{
808 Messaging::validate(holder);
809 validateVersion(holder->version());
810}
811
812}
813}
814}
#define ONIXS_ILINK3_UNLIKELY(cond)
Definition Compiler.h:158
#define ONIXS_ILINK3_HOTPATH
Definition Compiler.h:148
#define ONIXS_ILINK3_EXPORTED
Definition Compiler.h:145
CgwSession(SessionReactor &stack, const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 uuid=UndefinedUuid, const std::string &customKey="")
CgwSession(const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 uuid=UndefinedUuid, const std::string &customKey="")
The time point without the time-zone information.
Definition Time.h:468
Session's network stack reactor interface.
static const UInt64 UndefinedUuid
Definition Session.h:50
Messaging::Timestamp licenseExpirationDate() const
unsigned messageGrouping() const noexcept
std::string localNetworkInterface() const
std::string customKey() const
Threading::SharedFuture< void > breakConnectionAsync()
friend struct AdHoc::SessionHelper
Definition Session.h:678
static const int UndefinedAffinity
Definition Session.h:444
static const size_t CmeMaxPacketSize
CME states: Packet size maximum behavior based on MTU (Maximum Transmission Unit) - 1420 bytes,...
Definition Session.h:180
std::string remoteIpAddress() const
Session(SessionReactor &stack, const SessionSettings &settings, int marketSegmentId, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 uuid=UndefinedUuid, const std::string &customKey="")
std::string toString() const
Session & send(NetworkMessage *begin, NetworkMessage *end, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=CmeMaxPacketSize) ONIXS_ILINK3_NONULL
std::string remoteHost() const
const SocketOptions & socketOptions() const
unsigned reconnectAttempts() const
bool reportNewMessagesWhileWaitingForMissedMessages() const
Session & throttlingLimit(size_t messagesCount, size_t intervalInMs=1000)
Messaging::FTI::Enum faultToleranceIndicator() const
SeqNumber outSeqNum() const noexcept
Messaging::SchemaVersion messagingVersion() const noexcept
std::pair< Port, Port > localPortRange() const noexcept
const std::string & storageDirectory() const
Session & send(NetMessagesRange range, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=CmeMaxPacketSize)
size_t incomingMessageGapQueueMaximumSize() const
bool getTcpInfo(TcpInfo &)
bool detectMessageGapForPreviousUuid() const
Session & sendSequenceMessage(Messaging::KeepAliveLapsed::Enum keepAliveLapsed=Messaging::KeepAliveLapsed::Lapsed)
unsigned receiveSpinningTimeout() const noexcept
Session & warmUp(Messaging::MessageHolder< SbeMessageType, MaxMessageSize > &msg, Messaging::Timestamp sendingTime=UtcWatch::now(), int warmupFlags=0)
Definition Session.h:738
Session & receivingThreadAffinity(CpuIndex cpuIndex)
Sets the receiving thread CPU affinity.
int marketSegmentId() const noexcept
Session & warmUp(NetworkMessage *begin, NetworkMessage *end, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=CmeMaxPacketSize, int warmupFlags=0) ONIXS_ILINK3_NONULL
ThreadingModel::Enum threadingModel() const
unsigned reconnectInterval() const
SessionStorageType::Enum storageType() const noexcept
Messaging::UInt64 previousUuid() const noexcept
UInt64 uuid() const noexcept
size_t outboundQueueBytes()
Returns the total number of bytes in the outbound queue.
const std::string & storageId() const noexcept
Session & send(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=CmeMaxPacketSize)
static std::string version()
static const Handle InvalidSocketHandle
Represents invalid value of socket handle.
Definition Session.h:639
Session & disconnect(const std::string &reason="")
Messaging::Timestamp creationTime() const noexcept
Session & reset(bool startOfWeek=false)
SessionStateId::Enum state() const
void flushSessionStorage()
Flushes all internal buffers of the session storage.
SeqNumber inSeqNum() const
Session & send(Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg, Messaging::Timestamp sendingTime=UtcWatch::now())
Definition Session.h:723
int receivingThreadPriority() const
const Session & log(const std::string &message) const
Write the given user's message to the Handler's log file using the session formatted Id.
Session & sendingThreadAffinity(CpuIndex cpuIndex)
Sets the sending thread CPU affinity.
Session & connect(const std::string &host, Port port)
SeqNumber previousSeqNo() const noexcept
Session & warmUp(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=CmeMaxPacketSize, int warmupFlags=0)
unsigned sendSpinningTimeout() const noexcept
Session(const SessionSettings &settings, int marketSegmentId, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 uuid=UndefinedUuid, const std::string &customKey="")
int sendingThreadPolicy() const
Sets the sending thread policy.
Threading::SharedFuture< void > disconnectAsync(const std::string &reason="")
Threading::SharedFuture< void > connectAsync(const std::string &host, Port port)
void validate(const Message &)
Definition Validation.h:32
MessageHeader::Version SchemaVersion
SBE-encoded data version type.
std::pair< NetworkMessage *, NetworkMessage * > NetMessagesRange
unsigned short Port
Definition Defines.h:44
std::vector< SocketOption > SocketOptions
Socket options.
Definition Defines.h:84
int Handle
Type alias for socket handle.
Definition Defines.h:56
std::set< CpuIndex > CpuIndexes
Definition Defines.h:80
Messaging::UInt32 SeqNumber
Definition Messaging.h:60
@ FileBased
File-based Session Storage.
TCP state information.
Definition TcpInfo.h:32