OnixS C++ CME MDP Conflated TCP Handler  1.3.1
API Documentation
Gateway.h
Go to the documentation of this file.
1 // Copyright Onix Solutions Limited [OnixS]. All rights reserved.
2 //
3 // This software owned by Onix Solutions Limited [OnixS] and is
4 // protected by copyright law and international copyright treaties.
5 //
6 // Access to and use of the software is governed by the terms of the applicable
7 // OnixS Software Services Agreement (the Agreement) and Customer end user license
8 // agreements granting a non-assignable, non-transferable and non-exclusive license
9 // to use the software for it's own data processing purposes under the terms defined
10 // in the Agreement.
11 //
12 // Except as otherwise granted within the terms of the Agreement, copying or
13 // reproduction of any part of this source code or associated reference material
14 // to any other location for further reproduction or redistribution, and any
15 // amendments to this copyright notice, are expressly prohibited.
16 //
17 // Any reproduction or redistribution for sale or hiring of the Software not in
18 // accordance with the terms of the Agreement is a violation of copyright law.
19 
20 #pragma once
21 
23 
24 #if defined(ONIXS_CONFLATEDTCP_HAS_GATEWAY_EMULATOR)
25 
28 #include <OnixS/CME/ConflatedTCP/testing/ClientMessageListener.h>
29 
30 #include <chrono>
31 
32 namespace OnixS {
33 namespace CME {
34 namespace ConflatedTCP {
35 namespace Testing {
36 
37 // Assume NegotiationResponse202::credentials is empty
38 constexpr
40 {
41  using Message = NegotiationResponse202;
42 
43  return
44  Message::blockLength(Message::Schema::Version)
46  + Message::getMinimalVariableFieldsSize(Message::Schema::Version);
47 }
48 
49 // Type aliases
55 
61 
62 /// CME Conflated TCP Gateway Emulator.
64 {
65 public:
66  explicit
67  Gateway(int port, const char* host = "127.0.0.1", const std::chrono::seconds& timeout = std::chrono::seconds(10));
68 
69  ///
70  virtual ~Gateway();
71 
72  /// Deleted.
73  Gateway(const Gateway&) = delete;
74  Gateway& operator=(const Gateway&) = delete;
75 
76  /// Move semantic.
77  Gateway(Gateway&&) noexcept;
78  Gateway& operator=(Gateway&&) noexcept;
79 
80  /// Swap with other.
81  void swap(Gateway&) noexcept;
82 
83  /// Accepts an incoming connection.
84  Gateway& acceptConnection();
85 
86  /// Accepts an incoming Conflated TCP session.
87  Gateway& acceptSession(SeqNumber outgoingSequenceNumber = 1, UInt64 previousUuid = 0, UInt32 previousSeqNo = 0);
88 
89  /// Waits until the Terminate203 message is received.
90  ///
91  /// If the listener is provided, then receives and reports application-level messages via the corresponding callback.
92  /// Responds to the Sequence506 message if received.
93  Terminate203Ptr waitUntilTerminate(ClientMessageListener* listener = nullptr);
94 
95  /// Closes the accepted connection.
96  Gateway& disconnect();
97 
98  /// \return the listen port.
100  int port() const noexcept;
101 
102  /// Enables logging.
103  Gateway& enableLogger(const std::string& logFileName);
104 
105  /// Sends the given message.
106  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
108 
109  /// Sends the given message.
110  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
112 
113  /// Sets the given sequence number and send the message.
114  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
116 
117  /// Sets the given sequence number and send the message.
118  template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
120 
121  /// Receives the message of the particular type.
122  ///
123  /// \throw std::logic_error if an unexpected type is received.
124  template <typename TMsg>
125  MessagePtr<TMsg> receiveTypedMessage();
126 
127  /// Accepts an incoming connection and establishes the session.
128  ///
129  /// - Receives and reports application-level messages via the corresponding callback.
130  /// - Sends Sequence messages.
131  Gateway& run(ClientMessageListener& listener);
132 
133  /// \return the sequence number of the next outgoing message.
135  SeqNumber outSeqNum() const;
136 
137  /// Sets the sequence number of the next outgoing message.
138  Gateway& outSeqNum(SeqNumber nextOutSeqNum);
139 
140  /// \return UUUD.
142  UInt64 uuid() const noexcept
143  {
144  return uId_;
145  }
146 
147  /// Sets the value of the option to improve latency at the expense of message throughput (the `TCP_NODELAY` socket option).
148  Gateway& tcpNoDelayOption(bool noDelay);
149 
150  /// \return the value of the option to improve latency at the expense of message throughput (the `TCP_NODELAY` socket option).
151  ///
152  /// The default value is SessionSettings::Default::TcpNoDelayOption.
154  bool tcpNoDelayOption() const;
155 
156  /// \return the socket receive buffer size.
157  ///
158  /// The default value is SessionSettings::Default::SocketReceiveBufferSize.
160  int socketReceiveBufferSize() const;
161 
162  /// Sets the socket receive buffer size.
163  ///
164  /// \param bufferSize the size of the socket receive buffer size.
165  /// If SessionSettings::UseOsDefaultSocketBufferSize then the default operating system value is used.
166  Gateway& socketReceiveBufferSize(int bufferSize);
167 
168  /// \return the size of the socket send buffer size.
169  ///
170  /// The default value is SessionSettings::Default::SocketSedndBufferSize.
172  int socketSendBufferSize() const;
173 
174  /// Sets the size of the socket send buffer size.
175  ///
176  /// \param bufferSize the size of the socket send buffer size.
177  /// If SessionSettings::UseOsDefaultSocketBufferSize then the default operating system value is used.
178  Gateway& socketSendBufferSize(int bufferSize);
179 
180 protected:
181  /// Checks whether the connection is closed by the counterparty.
183  bool isConnectionClosed(const std::chrono::seconds& timeout) const;
184 
185  /// Checks whether the connection is closed by the counterparty
186  /// using the default timeout (provided in the constructor).
188  bool isConnectionClosed() const;
189 
190  /// Receives a message.
191  MessageBasePtr receive() const;
192 
193  /// Receives the Negotiate200 message.
194  ///
195  /// \throw std::logic_error if an unexpected type is received.
196  Negotiate200Ptr receiveNegotiate();
197 
198  /// Receives the Negotiate200 and responds with the NegotiationResponse202 message.
199  ///
200  /// \throw std::logic_error if an unexpected type is received.
201  Negotiate200Ptr acceptNegotiate(UInt32 previousSeqNo = 0);
202 
203  /// Receives the Negotiate200 message and responds with the NegotiationReject201 message.
204  ///
205  /// \throw std::logic_error if an unexpected type is received.
206  Negotiate200Ptr rejectNegotiate(ErrorCodes::Enum errorCodes = ErrorCodes::Enum(), const std::string & reason = std::string());
207 
208  /// Receive the Terminate203 message.
209  ///
210  /// \throw std::logic_error if an unexpected type is received.
211  Terminate203Ptr receiveTerminate();
212 
213  /// Receive the Terminate203 message and responds with the Terminate203 message.
214  ///
215  /// \throw std::logic_error if an unexpected type is received.
216  Terminate203Ptr acceptTerminate();
217 
218  /// Sends the Terminate203 message.
219  Gateway& sendTerminate(const std::string& reason = {}, ErrorCodes::Enum errCode = ErrorCodes::Other);
220 
221  /// Sends the Terminate203 meassage and waist until the response is received.
222  Terminate203Ptr terminate(const std::string& reason = {}, ErrorCodes::Enum errCode = ErrorCodes::Other);
223 
224  /// Creates a NegotiationResponse202 message
225  NegotiationResponse202Container createNegotiationResponse(const Negotiate200Ptr& request, UInt32 previousSeqNo);
226 
227  /// Creates a NegotiationReject201 message.
228  NegotiationReject201Container createNegotiationReject(const Negotiate200Ptr& request, ErrorCodes::Enum errorCodes, const std::string & reason);
229 
230  /// Creates a RequestAck206 message.
231  template <typename TRequestMsg>
232  RequestAck206Container createRequestAck(const TRequestMsg& request);
233 
234  /// Creates a RequestReject207 message.
235  template <typename TRequestMsg>
236  RequestReject207Container createRequestReject(const TRequestMsg& request);
237 
238  /// Creates a Terminate203 message.
239  Terminate203Container createTerminate(const Terminate203Ptr& request);
240 
241  /// Creates a Terminate203 message.
242  Terminate203Container createTerminate(const std::string& reason, ErrorCodes::Enum errCode);
243 
244  /// Sends the given message.
246 
247  /// Sends the given data
248  Gateway& sendData(const void* data, size_t size);
249 
250  /// UUID.
251  UInt64 uId_{0};
252 
253 private:
254  template<typename MessageType>
255  void invokeCallback(void(ClientMessageListener::*Callback)(const MessageType &, Gateway *), ClientMessageListener * listener, const MessageBasePtr & msg)
256  {
257  if(listener)
258  (listener->*Callback)(cast<MessageType>(msg), this);
259  }
260 
261  class Impl;
262  Impl* impl_;
263 };
264 
265 template <typename TRequestMsg>
267 {
269 
270  response.message()
271  .setReqId(request->reqId())
272  .setSubscriptionReqType(request->subscriptionReqType())
273  .setReqIdStatus(RequestIDStatus::FullAck);
274 
275  return response;
276 }
277 
278 template <typename TRequestMsg>
280 {
282 
283  response.message()
284  .setReqId(request->reqId())
285  .setReqRejReason(ReqRejReason::Other);
286 
287  return response;
288 }
289 
290 template <typename TMsg>
292 {
293  return cast<TMsg>(receive());
294 }
295 
296 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
298 {
299  const auto messageSize = msg.setHeader();
300  auto* const header = msg.header();
301  assert(header);
302 
303  return send(Messaging::SbeMessage(header + 1, messageSize), *header);
304 }
305 
306 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
308 {
309  return send(msg);
310 }
311 
312 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
314 {
315  setSeqNum(msg, num);
316  return send(msg);
317 }
318 
319 template <typename SbeMessageType, size_t MaxMessageSize, typename MessageInitializer>
321 {
322  setSeqNum(msg, num);
323  return send(msg);
324 }
325 
326 }}}}
327 
328 #endif
Gateway & send(ConflatedTCP::Messaging::MessageHolder< SbeMessageType, MaxMessageSize, MessageInitializer > &msg)
Sends the given message.
Definition: Gateway.h:297
Requested subscription scope is fully acknowledged .
Definition: Fields.h:113
RequestReject207Container createRequestReject(const TRequestMsg &request)
Creates a RequestReject207 message.
Definition: Gateway.h:279
RequestAck206Container createRequestAck(const TRequestMsg &request)
Creates a RequestAck206 message.
Definition: Gateway.h:266
Messaging::UInt32 SeqNumber
Definition: Messaging.h:58
Contains the SimpleOpenFramingHeader, the SBE message, and the data buffer.
#define ONIXS_CONFLATEDTCP_EXPORTED
Definition: Compiler.h:187
Unexpected request, see Reason field for details.
Definition: Fields.h:66
Definition: Defines.h:40
CME Conflated TCP Gateway Emulator.
Definition: Gateway.h:63
constexpr UInt16 calculateNegotiationResponse202MaxSize()
Definition: Gateway.h:39
#define ONIXS_CONFLATEDTCP_NODISCARD
Definition: Compiler.h:198
MessageSize setHeader() noexcept
Calculates the binary size of the message and updates the Simple Open Framing Header accordingly...
const SimpleOpenFramingHeader * header() const noexcept
MessagePtr< TMsg > receiveTypedMessage()
Receives the message of the particular type.
Definition: Gateway.h:291