OnixS C++ ICE Binary Order Entry Handler 1.0.0
API Documentation
Loading...
Searching...
No Matches
Session.h
Go to the documentation of this file.
1/*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable OnixS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19
20#pragma once
21
22#include <limits.h>
23
24#include <OnixS/ICE/BOE/ABI.h>
33
35
36// Forward declarations
37class TcpDirectStack;
38namespace AdHoc { struct SessionHelper; };
39
42{
43 enum Enum
44 {
47
50 };
51
53 ONIXS_ICEBOE_EXPORTED static const std::string toString(Enum state);
54};
55
56inline std::ostream& operator<<(std::ostream& o, SessionType::Enum type) {
57 return o << SessionType::toString(type);
58}
59
62{
63public:
74 Session(SessionType::Enum sessionType, const SessionSettings &settings,
76 SessionStorage *storage = nullptr, const std::string &customKey = {});
77
78 Session(SessionReactor & stack, SessionType::Enum sessionType, const SessionSettings & settings,
80 SessionStorage * storage = nullptr, const std::string &customKey = {});
81
83
85 std::string id() const;
86
88 int clientId() const noexcept;
89
91 std::string customKey() const;
92
100 Session& connect(const std::string &host, Port port);
101
108 Threading::SharedFuture<void> connectAsync(const std::string & host, Port port);
109
119 Session& disconnect(const std::string &reason = {});
120
126 Threading::SharedFuture<void> disconnectAsync(const std::string & reason = {});
127
133
138
149 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
150 Session& send(
152 Messaging::Timestamp sendingTime = UtcWatch::now());
153
163 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
164 Session& send(
166 Messaging::Timestamp sendingTime = UtcWatch::now());
167
168 static const size_t MaxPacketSize = 1420;
169
185 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
186 Session& send(
188 Messaging::Timestamp sendingTime = UtcWatch::now(),
189 size_t maxPacketSize = MaxPacketSize);
190
208 Messaging::Timestamp sendingTime = UtcWatch::now(),
209 size_t maxPacketSize = MaxPacketSize);
210
227 NetworkMessage* begin, NetworkMessage* end,
228 Messaging::Timestamp sendingTime = UtcWatch::now(),
229 size_t maxPacketSize = MaxPacketSize) ONIXS_ICEBOE_NONULL;
230
247 NetMessagesRange range,
248 Messaging::Timestamp sendingTime = UtcWatch::now(),
249 size_t maxPacketSize = MaxPacketSize);
250
260
272 template <typename SbeMessageType, size_t MaxMessageSize>
275 Messaging::Timestamp sendingTime = UtcWatch::now(),
276 int warmupFlags = 0);
277
278
294 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
297 Messaging::Timestamp sendingTime = UtcWatch::now(),
298 size_t maxPacketSize = MaxPacketSize,
299 int warmupFlags = 0);
300
318 size_t maxPacketSize = MaxPacketSize, int warmupFlags = 0);
319
338 size_t maxPacketSize = MaxPacketSize, int warmupFlags = 0) ONIXS_ICEBOE_NONULL;
339
356 NetMessagesRange range, Messaging::Timestamp sendingTime = UtcWatch::now(),
357 size_t maxPacketSize = MaxPacketSize, int warmupFlags = 0);
358
359
367
374 size_t tryThrottle();
375
384 Session & throttlingLimit(size_t messagesCount, size_t intervalInMs = 1000);
385
387 std::string remoteHost() const;
388
390 std::string remoteIpAddress() const;
391
394
396 std::pair<Port, Port> localPortRange() const noexcept;
397
399 Session& localPortRange(std::pair<Port, Port> portRange);
400
402 std::string localNetworkInterface() const;
403
405 Session& localNetworkInterface(const std::string &value);
406
409
411 bool tcpNoDelayOption() const;
412
415
418
421
422 static const int UndefinedAffinity = -1;
423
426
429
432
435
438
441
444
447
450
453
456
459
462
465
467 unsigned receiveSpinningTimeout() const noexcept;
468
470 Session& receiveSpinningTimeout(unsigned timeoutInUs);
471
473 unsigned sendSpinningTimeout() const noexcept;
474
476 Session& sendSpinningTimeout(unsigned timeoutInUs);
477
482
487
489 unsigned messageGrouping() const noexcept;
490
501 Session& messageGrouping(unsigned numberOfMessagesToGroup);
502
504 SessionStorageType::Enum storageType() const noexcept;
505
507 const std::string &storageId() const noexcept;
508
510 SeqNumber outSeqNum() const noexcept;
511
513 Session& outSeqNum(SeqNumber nextOutSeqNum);
514
517
519 Session& inSeqNum(SeqNumber nextExpectedInSeqNum);
520
529
531 SessionStateId::Enum state() const;
532
535
538
540 unsigned reconnectAttempts() const;
541
544
546 unsigned reconnectInterval() const;
547
549 Session& reconnectInterval(unsigned seconds);
550
552 bool localTimeUsage() const;
553
555 Session& localTimeUsage(bool useLocalTime);
556
558 const std::string &storageDirectory() const;
559
561 bool logBeforeSending() const;
562
565
568
571
573 const Session& log(const std::string &message) const;
574
576 std::string toString() const;
577
580
583
586
591
594
601
603 static const char* version() noexcept;
604
605private:
606 Session(const Session& );
607 Session& operator=(const Session& );
608
609 void send(NetworkMessage msg, Messaging::Timestamp ts);
610 void send(NetworkMessage* begin, NetworkMessage* end, size_t maxPacketSize, Messaging::Timestamp ts);
611 void warmUp(NetworkMessage msg, int warmupFlags, Messaging::Timestamp ts);
612 void warmUp(NetworkMessage* begin, NetworkMessage* end, size_t maxPacketSize, int warmupFlags, Messaging::Timestamp ts);
613
614 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
615 void validate(const Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg) const;
616
617 void validateVersion(Messaging::SchemaVersion version) const;
618
619 Session& sendRetransmitRequest(SeqNumber from, SeqNumber to);
620
621 struct Impl;
622 Impl *const impl_;
623
624friend struct AdHoc::SessionHelper;
625};
626
627template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
629Session& Session::send(Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg, Messaging::Timestamp ts)
630{
631 assert(ts != Messaging::Timestamp());
632
633#ifndef NDEBUG
634 validate(msg);
635#endif
636
637 send(msg.toNetworkMessage(), ts);
638
639 return *this;
640}
641
642template <typename SbeMessageType, size_t MaxMessageSize>
646 Messaging::Timestamp sendingTime,
647 int warmupFlags)
648{
649 warmUp(msg.toNetworkMessage(), warmupFlags, sendingTime);
650 return *this;
651}
652
653template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
662
663template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
668 size_t maxPacketSize)
669{
670 assert(ts != Messaging::Timestamp());
671
672 if ONIXS_ICEBOE_UNLIKELY(msgs.netMsgs_.empty())
673 return *this;
674
675 NetworkMessage* const begin = &*msgs.netMsgs_.begin();
676 NetworkMessage* const end = begin + msgs.netMsgs_.size();
677
678 send(begin, end, maxPacketSize, ts);
679 return *this;
680}
681
682template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
686 Messaging::Timestamp ts, size_t maxPacketSize, int flags)
687{
688 assert(ts != Messaging::Timestamp());
689
690 if ONIXS_ICEBOE_UNLIKELY(msgs.netMsgs_.empty())
691 return *this;
692
693 NetworkMessage* const begin = &*msgs.netMsgs_.begin();
694 NetworkMessage* const end = begin + msgs.netMsgs_.size();
695
696 warmUp(begin, end, maxPacketSize, flags, ts);
697 return *this;
698}
699
700template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
702{
703 Messaging::validate(holder);
704 validateVersion(holder->version());
705}
706
#define ONIXS_ICEBOE_NAMESPACE_BEGIN
Definition ABI.h:94
#define ONIXS_ICEBOE_NAMESPACE_END
Definition ABI.h:98
#define ONIXS_ICEBOE_HOTPATH
Definition Compiler.h:155
#define ONIXS_ICEBOE_EXPORTED
Definition Compiler.h:153
The class can be used to combine messages with different types to the batch for sending.
Contains the SimpleOpenFramingHeader, the SBE message, and the data buffer.
NetworkMessage toNetworkMessage() noexcept
The time point without the time-zone information.
Definition Time.h:470
Session's network stack reactor interface.
Session(SessionReactor &stack, SessionType::Enum sessionType, const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, const std::string &customKey={})
bool localTimeUsage() const
Messaging::Timestamp licenseExpirationDate() const
unsigned messageGrouping() const noexcept
std::string localNetworkInterface() const
std::string customKey() const
Threading::SharedFuture< void > breakConnectionAsync()
Breaks the connection non-gracefully asynchronously.
friend struct AdHoc::SessionHelper
Definition Session.h:624
static const int UndefinedAffinity
Definition Session.h:422
std::string remoteIpAddress() const
Session & throttle()
Performs the throttling of a session that must be called before each send function call.
std::string toString() const
Session & breakConnection()
Breaks the connection non-gracefully.
std::string remoteHost() const
const SocketOptions & socketOptions() const
unsigned reconnectAttempts() const
std::string id() const
bool reportNewMessagesWhileWaitingForMissedMessages() const
Session & throttlingLimit(size_t messagesCount, size_t intervalInMs=1000)
Sets throttling limit parameters.
Session & warmUp(NetworkMessage *begin, NetworkMessage *end, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=MaxPacketSize, int warmupFlags=0) ONIXS_ICEBOE_NONULL
Warms up the sending path.
SeqNumber outSeqNum() const noexcept
Messaging::SchemaVersion messagingVersion() const noexcept
Session & send(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=MaxPacketSize)
Sends messages.
std::pair< Port, Port > localPortRange() const noexcept
Session & send(NetworkMessage *begin, NetworkMessage *end, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=MaxPacketSize) ONIXS_ICEBOE_NONULL
Sends messages.
static const size_t MaxPacketSize
Definition Session.h:168
const std::string & storageDirectory() const
int receivingThreadPolicy() const
size_t incomingMessageGapQueueMaximumSize() const
bool getTcpInfo(TcpInfo &)
Gets information about the underlying TCP connection.
unsigned receiveSpinningTimeout() const noexcept
Session & warmUp(Messaging::MessageHolder< SbeMessageType, MaxMessageSize > &msg, Messaging::Timestamp sendingTime=UtcWatch::now(), int warmupFlags=0)
Warms up the sending path.
Definition Session.h:644
Session & warmUp(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=MaxPacketSize, int warmupFlags=0)
Warms up the sending path.
Handle socketHandle()
Returns the socket handle which the session uses to transmit data.
Session & send(NetMessagesRange range, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=MaxPacketSize)
Sends messages.
Session & receivingThreadAffinity(CpuIndex cpuIndex)
Sets the receiving thread CPU affinity.
Threading::SharedFuture< void > disconnectAsync(const std::string &reason={})
Terminates the connection asynchronously.
unsigned reconnectInterval() const
SessionStorageType::Enum storageType() const noexcept
int sendingThreadPriority() const
size_t outboundQueueBytes()
Returns the total number of bytes in the outbound queue.
Session(SessionType::Enum sessionType, const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, const std::string &customKey={})
Constructor.
const std::string & storageId() const noexcept
int clientId() const noexcept
size_t tryThrottle()
Checks the throttling of a session that must be called before each send function call.
static const Handle InvalidSocketHandle
Represents invalid value of socket handle.
Definition Session.h:585
bool logBeforeSending() const
Session & sendSequenceMessage()
Send the Sequence message.
Session & disconnect(const std::string &reason={})
Terminates the connection.
Messaging::Timestamp creationTime() const noexcept
SessionStateId::Enum state() const
void flushSessionStorage()
Flushes all internal buffers of the session storage.
SeqNumber inSeqNum() const
Session & send(Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg, Messaging::Timestamp sendingTime=UtcWatch::now())
Sends the message.
Definition Session.h:629
static const char * version() noexcept
Session & reset()
Backups the current log files, resets the sequence numbers to 1.
int receivingThreadPriority() const
const Session & log(const std::string &message) const
Write the given user's message to the Handler's log file using the session formatted Id.
Session & sendingThreadAffinity(CpuIndex cpuIndex)
Sets the sending thread CPU affinity.
Session & connect(const std::string &host, Port port)
Establishes the connection.
unsigned sendSpinningTimeout() const noexcept
int sendingThreadPolicy() const
Sets the sending thread policy.
bool tcpNoDelayOption() const
Threading::SharedFuture< void > connectAsync(const std::string &host, Port port)
Establishes the connection asynchronously.
Represents a future result of an asynchronous operation - a result that will eventually appear in the...
Definition Future.h:175
void validate(const Message &)
Definition Validation.h:30
MessageHeader::Version SchemaVersion
SBE-encoded data version type.
std::pair< NetworkMessage *, NetworkMessage * > NetMessagesRange
unsigned short Port
Definition Defines.h:40
std::vector< SocketOption > SocketOptions
Socket options.
Definition Defines.h:74
size_t CpuIndex
Definition Defines.h:69
int Handle
Type alias for socket handle.
Definition Defines.h:53
std::ostream & operator<<(std::ostream &o, SessionType::Enum type)
Definition Session.h:56
std::set< CpuIndex > CpuIndexes
Definition Defines.h:70
Messaging::UInt32 SeqNumber
Definition Messaging.h:51
@ FileBased
File-based Session Storage.
@ BUS
Binary Utility Service Gateway.
Definition Session.h:46
@ BGW
Binary Order Gateway.
Definition Session.h:49
static const std::string toString(Enum state)
TCP state information.
Definition TcpInfo.h:30