OnixS C++ ICE Binary Order Entry Handler 1.1.1
API Documentation
Loading...
Searching...
No Matches
Session.h
Go to the documentation of this file.
1/*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable OnixS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19
20#pragma once
21
22#include <limits.h>
23
24#include <OnixS/ICE/BOE/ABI.h>
35#include <OnixS/ICE/BOE/Tools.h>
36
38
39// Forward declarations
40class TcpDirectStack;
41class ServiceFactory;
42
43namespace AdHoc { struct SessionHelper; }
44
47{
48 enum Enum
49 {
52
55 };
56
58 ONIXS_ICEBOE_EXPORTED static const std::string toString(Enum state) noexcept;
59};
60
61inline std::ostream& operator<<(std::ostream& o, SessionType::Enum type)
62{
63 return o << SessionType::toString(type);
64}
65
68{
69public:
80 Session(SessionType::Enum sessionType, const SessionSettings &settings,
82 SessionStorage *storage = nullptr, const std::string &customKey = {}, const Utils::BuildInfo & = Utils::consumerBuildInfo());
83
95 Session(SessionReactor & stack, SessionType::Enum sessionType, const SessionSettings & settings,
97 SessionStorage * storage = nullptr, const std::string &customKey = {}, const Utils::BuildInfo & = Utils::consumerBuildInfo());
98
99 Session(ServiceFactory & service, SessionType::Enum sessionType, const SessionSettings & settings,
101 SessionStorage * storage = nullptr, const std::string &customKey = {}, const Utils::BuildInfo & = Utils::consumerBuildInfo());
102
104
106 std::string id() const;
107
109 int clientId() const noexcept;
110
112 std::string userId() const noexcept;
113
115 std::string customKey() const;
116
125 Session& connect(const std::string& host, Port port, const std::string& ipSessionToken = {});
126
134 Threading::SharedFuture<void> connectAsync(const std::string& host, Port port, const std::string& ipSessionToken = {});
135
145 Session& disconnect(const std::string &reason = {});
146
152 Threading::SharedFuture<void> disconnectAsync(const std::string & reason = {});
153
159
164
175 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
176 Session& send(
178 Messaging::Timestamp sendingTime = UtcWatch::now());
179
189 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
190 Session& send(
192 Messaging::Timestamp sendingTime = UtcWatch::now());
193
194 static const size_t MaxPacketSize = 1420;
195
211 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
212 Session& send(
214 Messaging::Timestamp sendingTime = UtcWatch::now(),
215 size_t maxPacketSize = MaxPacketSize);
216
234 Messaging::Timestamp sendingTime = UtcWatch::now(),
235 size_t maxPacketSize = MaxPacketSize);
236
253 NetworkMessage* begin, NetworkMessage* end,
254 Messaging::Timestamp sendingTime = UtcWatch::now(),
255 size_t maxPacketSize = MaxPacketSize) ONIXS_ICEBOE_NONULL;
256
273 NetMessagesRange range,
274 Messaging::Timestamp sendingTime = UtcWatch::now(),
275 size_t maxPacketSize = MaxPacketSize);
276
286
298 template <typename SbeMessageType, size_t MaxMessageSize>
301 Messaging::Timestamp sendingTime = UtcWatch::now(),
302 int warmupFlags = 0);
303
304
320 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
323 Messaging::Timestamp sendingTime = UtcWatch::now(),
324 size_t maxPacketSize = MaxPacketSize,
325 int warmupFlags = 0);
326
344 size_t maxPacketSize = MaxPacketSize, int warmupFlags = 0);
345
364 size_t maxPacketSize = MaxPacketSize, int warmupFlags = 0) ONIXS_ICEBOE_NONULL;
365
382 NetMessagesRange range, Messaging::Timestamp sendingTime = UtcWatch::now(),
383 size_t maxPacketSize = MaxPacketSize, int warmupFlags = 0);
384
385
393
400 size_t tryThrottle();
401
410 Session & throttlingLimit(size_t messagesCount, size_t intervalInMs = 1000);
411
413 std::string remoteHost() const;
414
416 std::string remoteIpAddress() const;
417
420
422 std::pair<Port, Port> localPortRange() const noexcept;
423
425 Session& localPortRange(std::pair<Port, Port> portRange);
426
428 std::string localNetworkInterface() const;
429
431 Session& localNetworkInterface(const std::string &value);
432
435
437 bool tcpNoDelayOption() const;
438
441
444
447
448 static const int UndefinedAffinity = -1;
449
452
455
458
461
464
467
470
473
476
479
482
485
488
491
493 unsigned receiveSpinningTimeout() const noexcept;
494
496 Session& receiveSpinningTimeout(unsigned timeoutInUs);
497
499 unsigned sendSpinningTimeout() const noexcept;
500
502 Session& sendSpinningTimeout(unsigned timeoutInUs);
503
508
513
515 unsigned messageGrouping() const noexcept;
516
527 Session& messageGrouping(unsigned numberOfMessagesToGroup);
528
530 SessionStorageType::Enum storageType() const noexcept;
531
533 const std::string &storageId() const noexcept;
534
536 SeqNumber outSeqNum() const noexcept;
537
539 Session& outSeqNum(SeqNumber nextOutSeqNum);
540
543
545 Session& inSeqNum(SeqNumber nextExpectedInSeqNum);
546
555
557 SessionStateId::Enum state() const;
558
561
564
566 unsigned reconnectAttempts() const;
567
570
572 unsigned reconnectInterval() const;
573
575 Session& reconnectInterval(unsigned seconds);
576
578 bool localTimeUsage() const;
579
581 Session& localTimeUsage(bool useLocalTime);
582
584 const std::string &storageDirectory() const;
585
587 bool logBeforeSending() const;
588
591
594
597
599 const Session& log(const std::string &message) const;
600
602 std::string toString() const;
603
606
609
612
617
620
627
629 static const char* version() noexcept;
630
631protected:
632 Threading::SharedFuture<BgwCredentials> getBgwCredentialsAsync(const std::string& userId, bool forceRequest);
633
634private:
635 Session(const Session& );
636 Session& operator=(const Session& );
637
638 void send(NetworkMessage msg, Messaging::Timestamp ts);
639 void send(NetworkMessage* begin, NetworkMessage* end, size_t maxPacketSize, Messaging::Timestamp ts);
640 void warmUp(NetworkMessage msg, int warmupFlags, Messaging::Timestamp ts);
641 void warmUp(NetworkMessage* begin, NetworkMessage* end, size_t maxPacketSize, int warmupFlags, Messaging::Timestamp ts);
642
643 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
644 void validate(const Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg) const;
645
646 void validateVersion(Messaging::SchemaVersion version) const;
647
648 Session& sendRetransmitRequest(SeqNumber from, SeqNumber to);
649
650 struct Impl;
651 Impl *const impl_;
652
653friend struct AdHoc::SessionHelper;
654friend struct SessionHelper;
655};
656
657template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
659Session& Session::send(Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg, Messaging::Timestamp ts)
660{
661 assert(ts != Messaging::Timestamp());
662
663#ifndef NDEBUG
664 validate(msg);
665#endif
666
667 send(msg.toNetworkMessage(), ts);
668
669 return *this;
670}
671
672template <typename SbeMessageType, size_t MaxMessageSize>
676 Messaging::Timestamp sendingTime,
677 int warmupFlags)
678{
679 warmUp(msg.toNetworkMessage(), warmupFlags, sendingTime);
680 return *this;
681}
682
683template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
692
693template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
698 size_t maxPacketSize)
699{
700 assert(ts != Messaging::Timestamp());
701
702 if ONIXS_ICEBOE_UNLIKELY(msgs.netMsgs_.empty())
703 return *this;
704
705 NetworkMessage* const begin = &*msgs.netMsgs_.begin();
706 NetworkMessage* const end = begin + msgs.netMsgs_.size();
707
708 send(begin, end, maxPacketSize, ts);
709 return *this;
710}
711
712template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
716 Messaging::Timestamp ts, size_t maxPacketSize, int flags)
717{
718 assert(ts != Messaging::Timestamp());
719
720 if ONIXS_ICEBOE_UNLIKELY(msgs.netMsgs_.empty())
721 return *this;
722
723 NetworkMessage* const begin = &*msgs.netMsgs_.begin();
724 NetworkMessage* const end = begin + msgs.netMsgs_.size();
725
726 warmUp(begin, end, maxPacketSize, flags, ts);
727 return *this;
728}
729
730template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
732{
733 Messaging::validate(holder);
734 validateVersion(holder->version());
735}
736
#define ONIXS_ICEBOE_NAMESPACE_BEGIN
Definition ABI.h:94
#define ONIXS_ICEBOE_NAMESPACE_END
Definition ABI.h:98
#define ONIXS_ICEBOE_HOTPATH
Definition Compiler.h:157
#define ONIXS_ICEBOE_EXPORTED
Definition Compiler.h:155
std::ostream & operator<<(std::ostream &os, const PerformanceCounter::Count &value)
BGW session connection credentials.
The class can be used to combine messages with different types to the batch for sending.
Contains the SimpleOpenFramingHeader, the SBE message, and the data buffer.
NetworkMessage toNetworkMessage() noexcept
The time point without the time-zone information.
Definition Time.h:470
Session's network stack reactor interface.
bool localTimeUsage() const
Messaging::Timestamp licenseExpirationDate() const
unsigned messageGrouping() const noexcept
std::string localNetworkInterface() const
Session(ServiceFactory &service, SessionType::Enum sessionType, const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, const std::string &customKey={}, const Utils::BuildInfo &=Utils::consumerBuildInfo())
std::string customKey() const
Threading::SharedFuture< void > breakConnectionAsync()
Breaks the connection non-gracefully asynchronously.
friend struct AdHoc::SessionHelper
Definition Session.h:653
static const int UndefinedAffinity
Definition Session.h:448
std::string remoteIpAddress() const
Session & throttle()
Performs the throttling of a session that must be called before each send function call.
std::string userId() const noexcept
std::string toString() const
Session & breakConnection()
Breaks the connection non-gracefully.
std::string remoteHost() const
const SocketOptions & socketOptions() const
unsigned reconnectAttempts() const
Session(SessionReactor &stack, SessionType::Enum sessionType, const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, const std::string &customKey={}, const Utils::BuildInfo &=Utils::consumerBuildInfo())
Constructor.
std::string id() const
bool reportNewMessagesWhileWaitingForMissedMessages() const
Session & throttlingLimit(size_t messagesCount, size_t intervalInMs=1000)
Sets throttling limit parameters.
Session & warmUp(NetworkMessage *begin, NetworkMessage *end, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=MaxPacketSize, int warmupFlags=0) ONIXS_ICEBOE_NONULL
Warms up the sending path.
SeqNumber outSeqNum() const noexcept
Messaging::SchemaVersion messagingVersion() const noexcept
Session & send(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=MaxPacketSize)
Sends messages.
std::pair< Port, Port > localPortRange() const noexcept
Session & send(NetworkMessage *begin, NetworkMessage *end, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=MaxPacketSize) ONIXS_ICEBOE_NONULL
Sends messages.
static const size_t MaxPacketSize
Definition Session.h:194
const std::string & storageDirectory() const
int receivingThreadPolicy() const
size_t incomingMessageGapQueueMaximumSize() const
bool getTcpInfo(TcpInfo &)
Gets information about the underlying TCP connection.
Threading::SharedFuture< BgwCredentials > getBgwCredentialsAsync(const std::string &userId, bool forceRequest)
Session & connect(const std::string &host, Port port, const std::string &ipSessionToken={})
Establishes the connection.
Threading::SharedFuture< void > connectAsync(const std::string &host, Port port, const std::string &ipSessionToken={})
Establishes the connection asynchronously.
unsigned receiveSpinningTimeout() const noexcept
Session(SessionType::Enum sessionType, const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, const std::string &customKey={}, const Utils::BuildInfo &=Utils::consumerBuildInfo())
Constructor.
Session & warmUp(Messaging::MessageHolder< SbeMessageType, MaxMessageSize > &msg, Messaging::Timestamp sendingTime=UtcWatch::now(), int warmupFlags=0)
Warms up the sending path.
Definition Session.h:674
Session & warmUp(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=MaxPacketSize, int warmupFlags=0)
Warms up the sending path.
Handle socketHandle()
Returns the socket handle which the session uses to transmit data.
Session & send(NetMessagesRange range, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=MaxPacketSize)
Sends messages.
Session & receivingThreadAffinity(CpuIndex cpuIndex)
Sets the receiving thread CPU affinity.
Threading::SharedFuture< void > disconnectAsync(const std::string &reason={})
Terminates the connection asynchronously.
unsigned reconnectInterval() const
SessionStorageType::Enum storageType() const noexcept
int sendingThreadPriority() const
size_t outboundQueueBytes()
Returns the total number of bytes in the outbound queue.
const std::string & storageId() const noexcept
int clientId() const noexcept
size_t tryThrottle()
Checks the throttling of a session that must be called before each send function call.
static const Handle InvalidSocketHandle
Represents invalid value of socket handle.
Definition Session.h:611
bool logBeforeSending() const
Session & sendSequenceMessage()
Send the Sequence message.
Session & disconnect(const std::string &reason={})
Terminates the connection.
Messaging::Timestamp creationTime() const noexcept
SessionStateId::Enum state() const
void flushSessionStorage()
Flushes all internal buffers of the session storage.
SeqNumber inSeqNum() const
Session & send(Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg, Messaging::Timestamp sendingTime=UtcWatch::now())
Sends the message.
Definition Session.h:659
static const char * version() noexcept
Session & reset()
Backups the current log files, resets the sequence numbers to 1.
int receivingThreadPriority() const
const Session & log(const std::string &message) const
Write the given user's message to the Handler's log file using the session formatted Id.
Session & sendingThreadAffinity(CpuIndex cpuIndex)
Sets the sending thread CPU affinity.
unsigned sendSpinningTimeout() const noexcept
int sendingThreadPolicy() const
Sets the sending thread policy.
bool tcpNoDelayOption() const
Represents a future result of an asynchronous operation - a result that will eventually appear in the...
Definition Future.h:175
void validate(const Message &)
Definition Validation.h:30
MessageHeader::Version SchemaVersion
SBE-encoded data version type.
std::pair< NetworkMessage *, NetworkMessage * > NetMessagesRange
unsigned short Port
Definition Defines.h:41
std::vector< SocketOption > SocketOptions
Socket options.
Definition Defines.h:75
size_t CpuIndex
Definition Defines.h:70
int Handle
Type alias for socket handle.
Definition Defines.h:54
std::set< CpuIndex > CpuIndexes
Definition Defines.h:71
decltype(std::declval< const Messaging::SbeMessage & >().sequenceId()) SeqNumber
Definition Messaging.h:53
@ FileBased
File-based Session Storage.
@ BUS
Binary Utility Service Gateway.
Definition Session.h:51
@ BGW
Binary Order Gateway.
Definition Session.h:54
static const std::string toString(Enum state) noexcept
TCP state information.
Definition TcpInfo.h:30