OnixS C++ B3 Binary UMDF Market Data Handler  1.4.2
API documentation
FeedEngine.h
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 
20 #pragma once
21 
27 
28 #include <string>
29 #include <vector>
30 
31 namespace OnixS
32 {
33  namespace B3
34  {
35  namespace MarketData
36  {
37  namespace UMDF
38  {
39  /// Designed to reflect various aspects of feed engine processing flow.
41  {
42  public:
43  /// Aliases integral type whose bits are used to indicate flag presence.
44  typedef UInt32 Flags;
45 
46  ///
48  {
49  value_.raw_ = flags;
50  }
51 
52  /// Indicates whether feed-related events like data reception or absence have been dispatched.
54  {
55  return 0 != value_.bits_.eventsDispatched_;
56  }
57 
58  /// Indicates whether feed-related events like data reception or absence have been dispatched.
60  {
61  value_.bits_.eventsDispatched_ = state ? 1 : 0;
62  }
63 
64  /// Indicates whether processing had to sleep in kernel while checking data availability.
66  {
67  return 0 != value_.bits_.ioWaited_;
68  }
69 
70  /// Indicates whether processing had to sleep in kernel while checking data availability.
72  {
73  value_.bits_.ioWaited_ = state ? 1 : 0;
74  }
75 
76  /// Reserved (unused) flags.
78  {
79  return value_.bits_.reserved_;
80  }
81 
82  /// Reserved (unused) flags.
84  {
85  value_.bits_.reserved_ = flags;
86  }
87 
88  private:
89  struct Bitset
90  {
91  Flags eventsDispatched_ : 1;
92  Flags ioWaited_ : 1;
93  Flags reserved_ : 30;
94  };
95 
96  union Value
97  {
98  Bitset bits_;
99  Flags raw_;
100  };
101 
102  Value value_;
103  };
104 
105  /// The Feed Engine machinery
106  class ONIXS_B3_UMDF_MD_API FeedEngine
107  {
108  public:
109  /// Carries out pending actions like data retrieval and event dispatching.
110  /// This function must be invoked after each handler-associated action (i.e., start, stop).
112 
113  /// Provides information about the actual implementation of the feed engine.
114  std::string info();
115 
116  ///
117  virtual ~FeedEngine();
118 
119  protected:
120  explicit FeedEngine(void* impl);
121 
122  private:
123  FeedEngine(const FeedEngine&);
124  FeedEngine& operator=(const FeedEngine&);
125 
126  void* const impl_;
127  friend struct FeHelper;
128  };
129 
130  /// Carries out pending actions like
131  /// data retrieval and event dispatching.
132  ///
133  /// The returned value indicates whether any
134  /// events have been handled by the engine.
135  inline bool process(FeedEngine& engine)
136  {
137  return engine.process().eventsDispatched();
138  }
139 
140  /// The given class implements feed engine concept using pool of working threads and standard socket API.
141  class ONIXS_B3_UMDF_MD_API SocketFeedEngine : public FeedEngine
142  {
143  public:
144  /// Constructor
145  /// \param loggerSettings Defines feed engine logging settings.
146  /// \param socketBufferSize Defines size of receiving buffer in bytes for sockets.
147  /// \param dataWaitTime Defines amount of time Feed Engine spends on
148  /// waiting for I/O while running master processing loop. Time is measured in milliseconds.
149  ///
150  /// \param watch Watch service to be used by Feed Engine.
151  ///
152  /// Watch is used by Feed Engine to assign time
153  /// points to packets received from the feeds.
154  ///
155  /// @note By default, Utc watch service is used.
157  const LoggerSettings& loggerSettings, UInt32 dataWaitTime = 10,
158  UInt32 socketBufferSize = 8 * 1024 * 1024, WatchService& watch = UtcWatch::service());
159 
160  ///
161  explicit SocketFeedEngine(UInt32 dataWaitTime = 10, UInt32 socketBufferSize = 8 * 1024 * 1024,
162  WatchService& watch = UtcWatch::service());
163 
164  ///
166  };
167 
168  /// The given class implements the Feed Engine concept using the Solarlfare ef_vi SDK.
169  class ONIXS_B3_UMDF_MD_API EfViFeedEngine : public FeedEngine
170  {
171  public:
172  /// Constructor
173  /// \param loggerSettings Defines feed engine logging settings.
174  /// \param receiveRingSize Number of buffers in a virtual interface receive ring.
175  /// Single buffer is used for a single network packet.
176  /// Thus the receive ring must be capacious enough to
177  /// place incoming packets during market data bursts.
178  ///
179  /// \param watch Watch service to be used by Feed Engine.
180  ///
181  /// Watch is used by Feed Engine to assign time
182  /// points to packets received from the feeds.
183  ///
184  /// @note By default, NIC watch service is used.
185  EfViFeedEngine(const LoggerSettings& loggerSettings,
186  UInt32 receiveRingSize = 4095, WatchService& watch = NicWatch::service());
187 
188  ///
189  explicit EfViFeedEngine(UInt32 receiveRingSize = 4095, WatchService& watch = NicWatch::service());
190 
191  ///
193  };
194 
195  /// Identifies reasons feed engine threads becomes idle.
197  {
198  enum Reason
199  {
200  /// Thread waited for incoming data using corresponding
201  /// I/O operations (like 'select') and exited waiting with
202  /// no data availability signs.
204 
205  /// Thread entered idle state due to absence of any data and
206  /// while other threads are waiting for new incoming data.
207  Redundant
208  };
209  };
210 
211  /// Settings for a thread pool executing feed engine tasks.
213  {
215  : threadAffinity_()
216  , spinBeforeIdleTime_(1)
217  , threadCount_(1)
218  {
219  }
220 
221  /// Defines set of CPUs allowed for each working thread
222  /// to be executed on while processing market data by Handlers
223  /// bound to Feed Engine instance which is configured by given
224  /// settings.
225  ///
226  /// @note By default set is empty thus allowing threads
227  /// to be executed on any CPU available in the system.
229  {
230  return threadAffinity_;
231  }
232 
233  /// Defines set of CPUs allowed for each working thread
234  /// to be executed on while processing market data by Handlers
235  /// bound to Feed Engine instance which is configured by given
236  /// settings.
237  ///
238  /// @note By default set is empty thus allowing threads
239  /// to be executed on any CPU available in the system.
241  {
242  return threadAffinity_;
243  }
244 
245  /// Number of working threads to be used by feed engine.
246  ///
247  /// @note Default value is '1'.
249  {
250  return threadCount_;
251  }
252 
253  /// Sets threadsCount. @see threadsCount.
254  void threadCount(UInt32 value)
255  {
256  threadCount_ = value;
257  }
258 
259  /// Defines amount of time Feed Engine keeps cycling before
260  /// going to sleep when no useful activity can be done.
261  ///
262  /// Time is measured in milliseconds.
263  ///
264  /// @note Default value is '1'.
265  ///
266  /// @warning Given parameter has direct influence onto CPU load!
268  {
269  return spinBeforeIdleTime_;
270  }
271 
272  /// Sets redundancySpinTime. @see redundancySpinTime.
274  {
275  spinBeforeIdleTime_ = value;
276  }
277 
278  private:
279  System::ThreadAffinity threadAffinity_;
280  UInt32 spinBeforeIdleTime_;
281  UInt32 threadCount_;
282  };
283 
284  class ONIXS_B3_UMDF_MD_API FeedEngineThreadPool;
285 
286  /// Listener for thread-related events.
287  ///
288  /// Members of this classes are invoked to reflect
289  /// various life-time events of threads spawned and
290  /// used by the feed engine while processing market data.
291  struct ONIXS_B3_UMDF_MD_API FeedEngineThreadPoolListener
292  {
293  /// Member invoked by feed engine when
294  /// a new processing thread is spawned.
295  ///
296  /// @note Invocation is done within newly
297  /// started thread.
299 
300  /// Member is invoked by feed engine when
301  /// processing thread is about to end.
302  ///
303  /// @note Invocation is done within the
304  /// thread that is about to end.
306 
307  /// Is called when feed engine's thread is idle.
308  ///
309  /// Integer parameter-variable defines amount of time feed engine
310  /// suggest for thread to sleep in kernel after invoking given member.
312 
313  ///
314  virtual ~FeedEngineThreadPoolListener() = 0;
315  };
316 
317  /// A pool of threads executing feed engine tasks.
318  class ONIXS_B3_UMDF_MD_API FeedEngineThreadPool
319  {
320  public:
323 
325  const FeedEngineThreadPoolSettings&, const std::vector<FeedEngine*>&, FeedEngineThreadPoolListener* = ONIXS_B3_UMDF_MD_NULLPTR);
326 
328 
329  const FeedEngineThreadPoolSettings& settings() const;
330 
331  private:
333  FeedEngineThreadPool& operator=(const FeedEngineThreadPool&);
334 
335  const FeedEngineThreadPoolSettings settings_;
336  class FeedEngineThreadPoolImpl;
337  FeedEngineThreadPoolImpl* const impl_;
338  };
339 
340  namespace System
341  {
342  /// Current thread related tasks.
343  class ONIXS_B3_UMDF_MD_API ThisThread
344  {
345  public:
346  /// Sets the processor affinity mask for the current thread.
347  static void affinity(const System::ThreadAffinity&);
348  static void affinity(System::CpuIndex);
349  };
350  }
351  }
352  }
353  }
354 }
#define ONIXS_B3_UMDF_MD_NOTHROW
Definition: Compiler.h:114
A pool of threads executing feed engine tasks.
Definition: FeedEngine.h:318
UInt32 spinBeforeIdleTime() const
Defines amount of time Feed Engine keeps cycling before going to sleep when no useful activity can be...
Definition: FeedEngine.h:267
bool ioWaited() const
Indicates whether processing had to sleep in kernel while checking data availability.
Definition: FeedEngine.h:65
#define ONIXS_B3_UMDF_MD_NULLPTR
Definition: Compiler.h:128
UInt32 Flags
Aliases integral type whose bits are used to indicate flag presence.
Definition: FeedEngine.h:44
Messaging::UInt32 UInt32
Definition: Integral.h:37
bool process(FeedEngine &engine)
Carries out pending actions like data retrieval and event dispatching.
Definition: FeedEngine.h:135
Designed to reflect various aspects of feed engine processing flow.
Definition: FeedEngine.h:40
The given class implements feed engine concept using pool of working threads and standard socket API...
Definition: FeedEngine.h:141
size_t CpuIndex
Zero-based index of CPU.
Definition: Defines.h:95
Thread waited for incoming data using corresponding I/O operations (like &#39;select&#39;) and exited waiting...
Definition: FeedEngine.h:203
Definition: Handler.h:31
virtual void onFeedEngineThreadBegin(const FeedEngineThreadPool &)
Member invoked by feed engine when a new processing thread is spawned.
Definition: FeedEngine.h:298
void ioWaited(bool state)
Indicates whether processing had to sleep in kernel while checking data availability.
Definition: FeedEngine.h:71
System::ThreadAffinity & threadAffinity()
Defines set of CPUs allowed for each working thread to be executed on while processing market data by...
Definition: FeedEngine.h:240
bool eventsDispatched() const
Indicates whether feed-related events like data reception or absence have been dispatched.
Definition: FeedEngine.h:53
#define ONIXS_B3_UMDF_MD_OVERRIDE
Definition: Compiler.h:119
class ONIXS_B3_UMDF_MD_API FeedEngineThreadPool
Definition: FeedEngine.h:284
Current thread related tasks.
Definition: FeedEngine.h:343
UInt32 threadCount() const
Number of working threads to be used by feed engine.
Definition: FeedEngine.h:248
NetFeedEngineProcessResult process()
Carries out pending actions like data retrieval and event dispatching.
Represents set of CPU indices.
Definition: Defines.h:98
void reserved(Flags flags)
Reserved (unused) flags.
Definition: FeedEngine.h:83
void threadCount(UInt32 value)
Sets threadsCount.
Definition: FeedEngine.h:254
virtual void onFeedEngineThreadEnd(const FeedEngineThreadPool &)
Member is invoked by feed engine when processing thread is about to end.
Definition: FeedEngine.h:305
static UtcWatch & service()
Returns watch service.
Identifies reasons feed engine threads becomes idle.
Definition: FeedEngine.h:196
static NicWatch & service()
Returns watch service.
void eventsDispatched(bool state)
Indicates whether feed-related events like data reception or absence have been dispatched.
Definition: FeedEngine.h:59
The Feed Engine machinery.
Definition: FeedEngine.h:106
const System::ThreadAffinity & threadAffinity() const
Defines set of CPUs allowed for each working thread to be executed on while processing market data by...
Definition: FeedEngine.h:228
The given class implements the Feed Engine concept using the Solarlfare ef_vi SDK.
Definition: FeedEngine.h:169
Flags reserved() const
Reserved (unused) flags.
Definition: FeedEngine.h:77
Settings for a thread pool executing feed engine tasks.
Definition: FeedEngine.h:212
void spinBeforeIdleTime(UInt32 value)
Sets redundancySpinTime.
Definition: FeedEngine.h:273
virtual void onFeedEngineThreadIdle(const FeedEngineThreadPool &, FeedEngineThreadIdle::Reason, UInt32 &)
Is called when feed engine&#39;s thread is idle.
Definition: FeedEngine.h:311