OnixS C++ B3 BOE Binary Order Entry 1.3.0
API Documentation
Loading...
Searching...
No Matches
Session.h
Go to the documentation of this file.
1/*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable OnixS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19
20#pragma once
21
22#include <limits.h>
23
24#include <OnixS/B3/BOE/ABI.h>
33
34namespace OnixS
35{
36namespace B3
37{
38namespace BOE
39{
40// Forward declarations
41class TcpDirectStack;
42namespace AdHoc { struct SessionHelper; };
43
48{
49public:
50 static const UInt64 UndefinedSessionVerID = 0;
51
63 SessionStorage *storage = ONIXS_B3_BOE_NULLPTR, UInt64 sessionVerId = UndefinedSessionVerID, const std::string &customKey = "");
64
66 SessionStorage * storage = ONIXS_B3_BOE_NULLPTR, UInt64 sessionVerId = UndefinedSessionVerID, const std::string &customKey = "");
67
69
72
74 std::string id() const;
75
78
80 std::string customKey() const;
81
83 bool negotiated() const;
84
89
98 Session& connect(const std::string &host, Port port);
99
106 Threading::SharedFuture<void> connectAsync(const std::string & host, Port port);
107
116
120 Threading::SharedFuture<void> disconnectAsync();
121
132 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
133 Session& send(
134 Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg,
135 Messaging::Timestamp sendingTime = UtcWatch::now());
136
137#if defined (ONIXS_B3_BOE_CXX11)
138
148 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
149 Session& send(
151 Messaging::Timestamp sendingTime = UtcWatch::now());
152
153#endif
154
156 static const size_t B3BOEMaxPacketSize = 1420;
157
158#if defined (ONIXS_B3_BOE_CXX11)
159
175 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
176 Session& send(
178 Messaging::Timestamp sendingTime = UtcWatch::now(),
179 size_t maxPacketSize = B3BOEMaxPacketSize);
180
198 Messaging::Timestamp sendingTime = UtcWatch::now(),
199 size_t maxPacketSize = B3BOEMaxPacketSize);
200
201#endif
202
213
225 template <typename SbeMessageType, size_t MaxMessageSize>
228 Messaging::Timestamp sendingTime = UtcWatch::now(),
229 int warmupFlags = 0);
230
231
232#if defined (ONIXS_B3_BOE_CXX11)
233
249 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
252 Messaging::Timestamp sendingTime = UtcWatch::now(),
253 size_t maxPacketSize = B3BOEMaxPacketSize,
254 int warmupFlags = 0);
255
273 size_t maxPacketSize = B3BOEMaxPacketSize, int warmupFlags = 0);
274#endif
275
283
290 size_t tryThrottle();
291
300 Session & throttlingLimit(size_t messagesCount, size_t intervalInMs = 1000);
301
303 std::string remoteHost() const;
304
306 std::string remoteIpAddress() const;
307
310
312 std::pair<Port, Port> localPortRange() const ONIXS_B3_BOE_NOTHROW;
313
315 Session& localPortRange(std::pair<Port, Port> portRange);
316
318 std::string localNetworkInterface() const;
319
321 Session& localNetworkInterface(const std::string &value);
322
325
327 bool tcpNoDelayOption() const;
328
331
334
337
338 static const int UndefinedAffinity = -1;
339
342
345
348
351
354
357
360
363
366
369
372
375
378
381
384
386 Session& receiveSpinningTimeout(unsigned timeoutInUs);
387
390
392 Session& sendSpinningTimeout(unsigned timeoutInUs);
393
397
401
404
415 Session& messageGrouping(unsigned numberOfMessagesToGroup);
416
419
421 const std::string &storageId() const ONIXS_B3_BOE_NOTHROW;
422
425
427 Session& outSeqNum(SeqNumber nextOutSeqNum);
428
431
433 Session& inSeqNum(SeqNumber nextExpectedInSeqNum);
434
443
445 SessionStateId::Enum state() const;
446
449
452
454 unsigned reconnectAttempts() const;
455
458
460 unsigned reconnectInterval() const;
461
463 Session& reconnectInterval(unsigned seconds);
464
466 bool localTimeUsage() const;
467
469 Session& localTimeUsage(bool useLocalTime);
470
472 const std::string &storageDirectory() const;
473
475 bool logBeforeSending() const;
476
479
482
485
487 const Session& log(const std::string &message) const;
488
490 std::string toString() const;
491
494
497
500
505
508
515
517 static const std::string& version() ONIXS_B3_BOE_NOTHROW;
518
519private:
520 Session(const Session& );
521 Session& operator=(const Session& );
522
523 void send(NetworkMessage msg, Messaging::Timestamp ts);
524 void send(NetMessages& msgs, size_t maxPacketSize, Messaging::Timestamp ts);
525 void warmUp(NetworkMessage msg, int warmupFlags, Messaging::Timestamp ts);
526 void warmUp(NetMessages& msgs, size_t maxPacketSize, int warmupFlags, Messaging::Timestamp ts);
527
528 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
529 void validate(const Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg) const;
530
531 void validateVersion(Messaging::SchemaVersion version) const;
532
533 Session& sendRetransmitRequest(UInt64 sessionVerId, SeqNumber from, SeqNumber to);
534
535 struct Impl;
536 Impl *const impl_;
537
538friend struct AdHoc::SessionHelper;
539};
540
541template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
543Session& Session::send(Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg, Messaging::Timestamp ts)
544{
545 assert(ts != Messaging::Timestamp());
546
547#ifndef NDEBUG
548 validate(msg);
549#endif
550
551 send(msg.toNetworkMessage(), ts);
552
553 return *this;
554}
555
556template <typename SbeMessageType, size_t MaxMessageSize>
560 Messaging::Timestamp sendingTime,
561 int warmupFlags)
562{
563 warmUp(msg.toNetworkMessage(), warmupFlags, sendingTime);
564
565 return *this;
566}
567
568#if defined (ONIXS_B3_BOE_CXX11)
569
570template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
579
580#endif
581
582#if defined (ONIXS_B3_BOE_CXX11)
583
584template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
589 size_t maxPacketSize)
590{
591 assert(ts != Messaging::Timestamp());
592
593 send(msgs.netMsgs_, maxPacketSize, ts);
594 return *this;
595}
596
597template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
601 Messaging::Timestamp ts, size_t maxPacketSize, int flags)
602{
603 assert(ts != Messaging::Timestamp());
604
605 warmUp(msgs.netMsgs_, maxPacketSize, flags, ts);
606 return *this;
607}
608
609#endif
610
611template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
613{
614 validateVersion(holder->version());
615}
616
617}
618}
619}
#define ONIXS_B3_BOE_NULLPTR
Definition Compiler.h:188
#define ONIXS_B3_BOE_EXPORTED
Definition Compiler.h:181
#define ONIXS_B3_BOE_NOTHROW
Definition Compiler.h:182
#define ONIXS_B3_BOE_HOTPATH
Definition Compiler.h:193
The class can be used to combine messages with different types to the batch for sending.
Contains the SimpleOpenFramingHeader, the SBE message, and the data buffer.
NetworkMessage toNetworkMessage() noexcept
The time point without the time-zone information.
Definition Time.h:468
Session's network stack reactor interface.
bool localTimeUsage() const
Messaging::Timestamp licenseExpirationDate() const
unsigned messageGrouping() const noexcept
std::string localNetworkInterface() const
std::string customKey() const
static const std::string & version() noexcept
friend struct AdHoc::SessionHelper
Definition Session.h:538
static const int UndefinedAffinity
Definition Session.h:338
std::string remoteIpAddress() const
Session & throttle()
Performs the throttling of a session that must be called before each send function call.
std::string toString() const
static const UInt64 UndefinedSessionVerID
Definition Session.h:50
std::string remoteHost() const
const SocketOptions & socketOptions() const
Session & warmUp(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=B3BOEMaxPacketSize, int warmupFlags=0)
Warms up the sending path.
Session & disconnect()
Terminates the connection.
unsigned reconnectAttempts() const
bool reportNewMessagesWhileWaitingForMissedMessages() const
Session & throttlingLimit(size_t messagesCount, size_t intervalInMs=1000)
Sets throttling limit parameters.
SeqNumber outSeqNum() const noexcept
Threading::SharedFuture< void > disconnectAsync()
Terminates the connection asynchronously.
Messaging::SchemaVersion messagingVersion() const noexcept
std::pair< Port, Port > localPortRange() const noexcept
const std::string & storageDirectory() const
int receivingThreadPolicy() const
size_t incomingMessageGapQueueMaximumSize() const
bool getTcpInfo(TcpInfo &)
Gets information about the underlying TCP connection.
unsigned receiveSpinningTimeout() const noexcept
Messaging::SessionID sessionId() const noexcept
Session & warmUp(Messaging::MessageHolder< SbeMessageType, MaxMessageSize > &msg, Messaging::Timestamp sendingTime=UtcWatch::now(), int warmupFlags=0)
Warms up the sending path.
Definition Session.h:558
Handle socketHandle()
Returns the socket handle which the session uses to transmit data.
Session & receivingThreadAffinity(CpuIndex cpuIndex)
Sets the receiving thread CPU affinity.
unsigned reconnectInterval() const
SessionStorageType::Enum storageType() const noexcept
int sendingThreadPriority() const
static const size_t B3BOEMaxPacketSize
Packet size maximum behavior based on MTU (Maximum Transmission Unit) 1420 bytes.
Definition Session.h:156
size_t outboundQueueBytes()
Returns the total number of bytes in the outbound queue.
const std::string & storageId() const noexcept
size_t tryThrottle()
Checks the throttling of a session that must be called before each send function call.
static const Handle InvalidSocketHandle
Represents invalid value of socket handle.
Definition Session.h:499
bool logBeforeSending() const
Session & sendSequenceMessage()
Send the Sequence message.
Session(const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 sessionVerId=UndefinedSessionVerID, const std::string &customKey="")
Constructor.
Session(SessionReactor &stack, const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 sessionVerId=UndefinedSessionVerID, const std::string &customKey="")
Messaging::Timestamp creationTime() const noexcept
UInt64 sessionVerId() const noexcept
SessionStateId::Enum state() const
void flushSessionStorage()
Flushes all internal buffers of the session storage.
SeqNumber inSeqNum() const
Session & send(Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg, Messaging::Timestamp sendingTime=UtcWatch::now())
Sends the message.
Definition Session.h:543
Session & send(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=B3BOEMaxPacketSize)
Sends messages.
Session & reset()
Backups the current log files, resets the sequence numbers to 1 and generates a new sessionVerId.
int receivingThreadPriority() const
const Session & log(const std::string &message) const
Write the given user's message to the Handler's log file using the session formatted Id.
Session & sendingThreadAffinity(CpuIndex cpuIndex)
Sets the sending thread CPU affinity.
Session & connect(const std::string &host, Port port)
Establishes the connection.
unsigned sendSpinningTimeout() const noexcept
int sendingThreadPolicy() const
Sets the sending thread policy.
bool tcpNoDelayOption() const
Threading::SharedFuture< void > connectAsync(const std::string &host, Port port)
Establishes the connection asynchronously.
A high-level wrapper over the TCPDirect network stack.
UInt32 SessionID
Client connection identification on the gateway assigned by B3.
Definition Fields.h:103
MessageHeader::Version SchemaVersion
SBE-encoded data version type.
std::vector< NetworkMessage > NetMessages
unsigned short Port
Definition Defines.h:44
std::vector< SocketOption > SocketOptions
Socket options.
Definition Defines.h:77
size_t CpuIndex
Definition Defines.h:72
int Handle
Type alias for socket handle.
Definition Defines.h:56
std::set< CpuIndex > CpuIndexes
Definition Defines.h:73
Messaging::UInt32 SeqNumber
Definition Messaging.h:52
@ FileBased
File-based Session Storage.
TCP state information.
Definition TcpInfo.h:31