OnixS C++ B3 BOE Binary Order Entry 1.4.0
Users' manual and API documentation
Loading...
Searching...
No Matches
Session.h
Go to the documentation of this file.
1/*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable OnixS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19
20#pragma once
21
22#include <limits.h>
23
24#include <OnixS/B3/BOE/ABI.h>
33
34namespace OnixS
35{
36namespace B3
37{
38namespace BOE
39{
40// Forward declarations
41class TcpDirectStack;
42namespace AdHoc { struct SessionHelper; };
43
48{
49public:
50 static const UInt64 UndefinedSessionVerID = 0;
51
65 SessionStorage *storage = ONIXS_B3_BOE_NULLPTR, UInt64 sessionVerId = UndefinedSessionVerID, const std::string &customKey = "");
66
68 SessionStorage * storage = ONIXS_B3_BOE_NULLPTR, UInt64 sessionVerId = UndefinedSessionVerID, const std::string &customKey = "");
69
71
74
76 std::string id() const;
77
80
82 std::string customKey() const;
83
85 bool negotiated() const;
86
91
100 Session& connect(const std::string &host, Port port);
101
108 Threading::SharedFuture<void> connectAsync(const std::string & host, Port port);
109
118
122 Threading::SharedFuture<void> disconnectAsync();
123
134 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
135 Session& send(
136 Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg,
137 Messaging::Timestamp sendingTime = UtcWatch::now());
138
139#if defined (ONIXS_B3_BOE_CXX11)
140
150 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
151 Session& send(
153 Messaging::Timestamp sendingTime = UtcWatch::now());
154
155#endif
156
158 static const size_t B3BOEMaxPacketSize = 1420;
159
160#if defined (ONIXS_B3_BOE_CXX11)
161
177 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
178 Session& send(
180 Messaging::Timestamp sendingTime = UtcWatch::now(),
181 size_t maxPacketSize = B3BOEMaxPacketSize);
182
200 Messaging::Timestamp sendingTime = UtcWatch::now(),
201 size_t maxPacketSize = B3BOEMaxPacketSize);
202
203#endif
204
215
227 template <typename SbeMessageType, size_t MaxMessageSize>
230 Messaging::Timestamp sendingTime = UtcWatch::now(),
231 int warmupFlags = 0);
232
233
234#if defined (ONIXS_B3_BOE_CXX11)
235
251 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
254 Messaging::Timestamp sendingTime = UtcWatch::now(),
255 size_t maxPacketSize = B3BOEMaxPacketSize,
256 int warmupFlags = 0);
257
275 size_t maxPacketSize = B3BOEMaxPacketSize, int warmupFlags = 0);
276#endif
277
285
292 size_t tryThrottle();
293
302 Session & throttlingLimit(size_t messagesCount, size_t intervalInMs = 1000);
303
305 std::string remoteHost() const;
306
308 std::string remoteIpAddress() const;
309
312
314 std::pair<Port, Port> localPortRange() const ONIXS_B3_BOE_NOTHROW;
315
317 Session& localPortRange(std::pair<Port, Port> portRange);
318
320 std::string localNetworkInterface() const;
321
323 Session& localNetworkInterface(const std::string &value);
324
327
329 bool tcpNoDelayOption() const;
330
333
336
339
340 static const int UndefinedAffinity = -1;
341
344
347
350
353
356
359
362
365
368
371
374
377
380
383
386
388 Session& receiveSpinningTimeout(unsigned timeoutInUs);
389
392
394 Session& sendSpinningTimeout(unsigned timeoutInUs);
395
399
403
406
417 Session& messageGrouping(unsigned numberOfMessagesToGroup);
418
421
424
426 const std::string &storageId() const ONIXS_B3_BOE_NOTHROW;
427
430
432 Session& outSeqNum(SeqNumber nextOutSeqNum);
433
436
438 Session& inSeqNum(SeqNumber nextExpectedInSeqNum);
439
448
450 SessionStateId::Enum state() const;
451
454
457
459 unsigned reconnectAttempts() const;
460
463
465 unsigned reconnectInterval() const;
466
468 Session& reconnectInterval(unsigned seconds);
469
471 bool localTimeUsage() const;
472
474 Session& localTimeUsage(bool useLocalTime);
475
477 const std::string &storageDirectory() const;
478
480 bool logBeforeSending() const;
481
484
487
490
492 const Session& log(const std::string &message) const;
493
495 std::string toString() const;
496
499
502
505
510
513
520
522 static const std::string& version() ONIXS_B3_BOE_NOTHROW;
523
524private:
525 Session(const Session& );
526 Session& operator=(const Session& );
527
528 void send(NetworkMessage msg, Messaging::Timestamp ts);
529 void send(NetMessages& msgs, size_t maxPacketSize, Messaging::Timestamp ts);
530 void warmUp(NetworkMessage msg, int warmupFlags, Messaging::Timestamp ts);
531 void warmUp(NetMessages& msgs, size_t maxPacketSize, int warmupFlags, Messaging::Timestamp ts);
532
533 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
534 void validate(const Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg) const;
535
536 void validateVersion(Messaging::SchemaVersion version) const;
537
538 Session& sendRetransmitRequest(UInt64 sessionVerId, SeqNumber from, SeqNumber to);
539
540 struct Impl;
541 Impl *const impl_;
542
543friend struct AdHoc::SessionHelper;
544};
545
546template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
548Session& Session::send(Messaging::MessageHolder<SbeMessageType, MaxMessageSize, MessageInitializer> &msg, Messaging::Timestamp ts)
549{
550 assert(ts != Messaging::Timestamp());
551
552#ifndef NDEBUG
553 validate(msg);
554#endif
555
556 send(msg.toNetworkMessage(), ts);
557
558 return *this;
559}
560
561template <typename SbeMessageType, size_t MaxMessageSize>
565 Messaging::Timestamp sendingTime,
566 int warmupFlags)
567{
568 warmUp(msg.toNetworkMessage(), warmupFlags, sendingTime);
569
570 return *this;
571}
572
573#if defined (ONIXS_B3_BOE_CXX11)
574
575template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
584
585#endif
586
587#if defined (ONIXS_B3_BOE_CXX11)
588
589template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
594 size_t maxPacketSize)
595{
596 assert(ts != Messaging::Timestamp());
597
598 send(msgs.netMsgs_, maxPacketSize, ts);
599 return *this;
600}
601
602template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
606 Messaging::Timestamp ts, size_t maxPacketSize, int flags)
607{
608 assert(ts != Messaging::Timestamp());
609
610 warmUp(msgs.netMsgs_, maxPacketSize, flags, ts);
611 return *this;
612}
613
614#endif
615
616template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
618{
619 validateVersion(holder->version());
620}
621
622}
623}
624}
#define ONIXS_B3_BOE_NULLPTR
Definition Compiler.h:188
#define ONIXS_B3_BOE_EXPORTED
Definition Compiler.h:181
#define ONIXS_B3_BOE_NOTHROW
Definition Compiler.h:182
#define ONIXS_B3_BOE_HOTPATH
Definition Compiler.h:193
NetworkMessage toNetworkMessage() noexcept
The time point without the time-zone information.
Definition Time.h:468
Session's network stack reactor interface.
bool localTimeUsage() const
Messaging::Timestamp licenseExpirationDate() const
unsigned messageGrouping() const noexcept
std::string localNetworkInterface() const
std::string customKey() const
static const std::string & version() noexcept
friend struct AdHoc::SessionHelper
Definition Session.h:543
static const int UndefinedAffinity
Definition Session.h:340
std::string remoteIpAddress() const
std::string toString() const
static const UInt64 UndefinedSessionVerID
Definition Session.h:50
std::string remoteHost() const
const SocketOptions & socketOptions() const
Session & warmUp(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=B3BOEMaxPacketSize, int warmupFlags=0)
unsigned reconnectAttempts() const
bool reportNewMessagesWhileWaitingForMissedMessages() const
Session & throttlingLimit(size_t messagesCount, size_t intervalInMs=1000)
SeqNumber outSeqNum() const noexcept
Threading::SharedFuture< void > disconnectAsync()
Messaging::SchemaVersion messagingVersion() const noexcept
std::pair< Port, Port > localPortRange() const noexcept
const std::string & storageDirectory() const
int receivingThreadPolicy() const
size_t incomingMessageGapQueueMaximumSize() const
bool getTcpInfo(TcpInfo &)
unsigned receiveSpinningTimeout() const noexcept
Messaging::SessionID sessionId() const noexcept
Session & warmUp(Messaging::MessageHolder< SbeMessageType, MaxMessageSize > &msg, Messaging::Timestamp sendingTime=UtcWatch::now(), int warmupFlags=0)
Definition Session.h:563
Session & receivingThreadAffinity(CpuIndex cpuIndex)
Sets the receiving thread CPU affinity.
ThreadingModel::Enum threadingModel() const
unsigned reconnectInterval() const
SessionStorageType::Enum storageType() const noexcept
int sendingThreadPriority() const
static const size_t B3BOEMaxPacketSize
Packet size maximum behavior based on MTU (Maximum Transmission Unit) 1420 bytes.
Definition Session.h:158
size_t outboundQueueBytes()
Returns the total number of bytes in the outbound queue.
const std::string & storageId() const noexcept
static const Handle InvalidSocketHandle
Represents invalid value of socket handle.
Definition Session.h:504
bool logBeforeSending() const
Session & sendSequenceMessage()
Session(const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 sessionVerId=UndefinedSessionVerID, const std::string &customKey="")
Session(SessionReactor &stack, const SessionSettings &settings, SessionListener *listener, SessionStorageType::Enum storageType=SessionStorageType::FileBased, SessionStorage *storage=nullptr, UInt64 sessionVerId=UndefinedSessionVerID, const std::string &customKey="")
Messaging::Timestamp creationTime() const noexcept
UInt64 sessionVerId() const noexcept
SessionStateId::Enum state() const
void flushSessionStorage()
Flushes all internal buffers of the session storage.
SeqNumber inSeqNum() const
Session & send(Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg, Messaging::Timestamp sendingTime=UtcWatch::now())
Definition Session.h:548
Session & send(MessageBatchCombiner &msgs, Messaging::Timestamp sendingTime=UtcWatch::now(), size_t maxPacketSize=B3BOEMaxPacketSize)
int receivingThreadPriority() const
const Session & log(const std::string &message) const
Write the given user's message to the Handler's log file using the session formatted Id.
Session & sendingThreadAffinity(CpuIndex cpuIndex)
Sets the sending thread CPU affinity.
Session & connect(const std::string &host, Port port)
unsigned sendSpinningTimeout() const noexcept
int sendingThreadPolicy() const
Sets the sending thread policy.
bool tcpNoDelayOption() const
Threading::SharedFuture< void > connectAsync(const std::string &host, Port port)
MessageHeader::Version SchemaVersion
SBE-encoded data version type.
std::vector< NetworkMessage > NetMessages
unsigned short Port
Definition Defines.h:44
std::vector< SocketOption > SocketOptions
Socket options.
Definition Defines.h:77
size_t CpuIndex
Definition Defines.h:72
int Handle
Type alias for socket handle.
Definition Defines.h:56
std::set< CpuIndex > CpuIndexes
Definition Defines.h:73
Messaging::UInt32 SeqNumber
Definition Messaging.h:52
@ FileBased
File-based Session Storage.
TCP state information.
Definition TcpInfo.h:31