OnixS C++ Eurex T7 Market and Reference Data (EMDI, MDI, RDI, EOBI) Handlers  17.0.1
API documentation
FeedEngine.h
Go to the documentation of this file.
1 /*
2 * Copyright Onix Solutions Limited [OnixS]. All rights reserved.
3 *
4 * This software owned by Onix Solutions Limited [OnixS] and is protected by copyright law
5 * and international copyright treaties.
6 *
7 * Access to and use of the software is governed by the terms of the applicable ONIXS Software
8 * Services Agreement (the Agreement) and Customer end user license agreements granting
9 * a non-assignable, non-transferable and non-exclusive license to use the software
10 * for it's own data processing purposes under the terms defined in the Agreement.
11 *
12 * Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
13 * of this source code or associated reference material to any other location for further reproduction
14 * or redistribution, and any amendments to this copyright notice, are expressly prohibited.
15 *
16 * Any reproduction or redistribution for sale or hiring of the Software not in accordance with
17 * the terms of the Agreement is a violation of copyright law.
18 */
19 
20 #pragma once
21 
22 #include <string>
23 #include <set>
24 
29 
30 namespace OnixS
31 {
32  namespace Eurex
33  {
34  namespace MarketData
35  {
36  /// Designed to reflect various aspects of feed engine processing flow.
38  {
39  public:
40  /// Aliases integral type whose bits are used to indicate flag presence.
41  typedef UInt32 Flags;
42 
43  ///
45  {
46  value_.raw_ = flags;
47  }
48 
49  /// Indicates whether feed-related events like data reception or absence have been dispatched.
51  {
52  return 0 != value_.bits_.eventsDispatched_;
53  }
54 
55  /// Indicates whether feed-related events like data reception or absence have been dispatched.
57  {
58  value_.bits_.eventsDispatched_ = state ? 1 : 0;
59  }
60 
61  /// Indicates whether processing had to sleep in kernel while checking data availability.
63  {
64  return 0 != value_.bits_.ioWaited_;
65  }
66 
67  /// Indicates whether processing had to sleep in kernel while checking data availability.
69  {
70  value_.bits_.ioWaited_ = state ? 1 : 0;
71  }
72 
73  /// Reserved (unused) flags.
75  {
76  return value_.bits_.reserved_;
77  }
78 
79  /// Reserved (unused) flags.
81  {
82  value_.bits_.reserved_ = flags;
83  }
84 
85  private:
86  struct Bitset
87  {
88  Flags eventsDispatched_ : 1;
89  Flags ioWaited_ : 1;
90  Flags reserved_ : 30;
91  };
92 
93  union Value
94  {
95  Bitset bits_;
96  Flags raw_;
97  };
98 
99  Value value_;
100  };
101 
102  /// The Feed Engine machinery
103  class ONIXS_EUREX_EMDI_API FeedEngine
104  {
105  public:
106  /// Carries out pending actions like data retrieval and event dispatching.
107  /// This function must be invoked after each handler-associated action (i.e., start, stop).
109 
110  /// Provides information about the actual implementation of the feed engine.
111  std::string info();
112 
113  ///
114  virtual ~FeedEngine();
115 
116  protected:
117  explicit FeedEngine(void* impl);
118 
119  private:
120  FeedEngine(const FeedEngine&);
121  FeedEngine& operator=(const FeedEngine&);
122 
123  void* const impl_;
124  friend struct FeHelper;
125  };
126 
127  /// Carries out pending actions like
128  /// data retrieval and event dispatching.
129  ///
130  /// The returned value indicates whether any
131  /// events have been handled by the engine.
132  inline bool process(FeedEngine& engine)
133  {
134  return engine.process().eventsDispatched();
135  }
136 
137  /// The given class implements feed engine concept using pool of working threads and standard socket API.
138  class ONIXS_EUREX_EMDI_API SocketFeedEngine : public FeedEngine
139  {
140  public:
141  /// Constructor
142  /// \param socketBufferSize Defines size of receiving buffer in bytes for sockets.
143  /// \param dataWaitTime Defines amount of time Feed Engine spends on
144  /// waiting for I/O while running master processing loop. Time is measured in milliseconds.
145  ///
146  /// \param watch Watch service to be used by Feed Engine.
147  ///
148  /// Watch is used by Feed Engine to assign time
149  /// points to packets received from the feeds.
150  ///
151  /// @note By default, Utc watch service is used.
152  explicit SocketFeedEngine(
153  UInt32 dataWaitTime = 10, UInt32 socketBufferSize = 8 * 1024 * 1024, WatchService& watch = UtcWatch::service());
154 
155  ///
157  };
158 
159  /// The given class implements the Feed Engine concept using the Solarlfare ef_vi SDK.
160  class ONIXS_EUREX_EMDI_API EfViFeedEngine : public FeedEngine
161  {
162  public:
163  /// Constructor
164  /// \param receiveRingSize Number of buffers in a virtual interface receive ring.
165  /// Single buffer is used for a single network packet.
166  /// Thus the receive ring must be capacious enough to
167  /// place incoming packets during market data bursts.
168  ///
169  /// \param watch Watch service to be used by Feed Engine.
170  ///
171  /// Watch is used by Feed Engine to assign time
172  /// points to packets received from the feeds.
173  ///
174  /// @note By default, NIC watch service is used.
175  explicit EfViFeedEngine(UInt32 receiveRingSize = 4095, WatchService& watch = NicWatch::service());
176 
177  ///
179  };
180 
181 
182  /// Zero-based index of CPU.
183  typedef size_t CpuIndex;
184 
185  /// Represents set of CPU indices.
186  class ONIXS_EUREX_EMDI_API ThreadAffinity
187  {
188  typedef std::set<CpuIndex> CpuIndexes;
189 
190  public:
191  /// Initializes empty set.
192  ThreadAffinity();
193 
194  /// Initializes as copy of other set.
196 
197  /// Utilizes all the resources.
198  ~ThreadAffinity();
199 
200  /// Indicates whether is empty.
201  bool empty() const;
202 
203  /// Copies set into another set.
204  void copyTo(CpuIndexes&) const;
205 
206  /// Adds CPU index into set.
207  bool insert(CpuIndex index);
208 
209  /// Removes CPU index from the set.
210  bool erase(CpuIndex index);
211 
212  /// Makes set empty.
213  void clear();
214 
215  /// Re-initializes instance as copy of other set.
216  ThreadAffinity& operator = (const ThreadAffinity&);
217 
218  /// Returns the string representation.
219  std::string toString() const;
220 
221  private:
222  /// System index.
223  CpuIndexes* indices_;
224  };
225 
226  /// Identifies reasons feed engine threads becomes idle.
228  {
229  enum Reason
230  {
231  /// Thread waited for incoming data using corresponding
232  /// I/O operations (like 'select') and exited waiting with
233  /// no data availability signs.
235 
236  /// Thread entered idle state due to absence of any data and
237  /// while other threads are waiting for new incoming data.
238  Redundant
239  };
240  };
241 
242  ///
244  {
246  : threadAffinity_()
247  , spinBeforeIdleTime_(1)
248  , threadCount_(1)
249  {
250  }
251 
252  /// Defines set of CPUs allowed for each working thread
253  /// to be executed on while processing market data by Handlers
254  /// bound to Feed Engine instance which is configured by given
255  /// settings.
256  ///
257  /// @note By default set is empty thus allowing threads
258  /// to be executed on any CPU available in the system.
260  {
261  return threadAffinity_;
262  }
263 
264  /// Defines set of CPUs allowed for each working thread
265  /// to be executed on while processing market data by Handlers
266  /// bound to Feed Engine instance which is configured by given
267  /// settings.
268  ///
269  /// @note By default set is empty thus allowing threads
270  /// to be executed on any CPU available in the system.
272  {
273  return threadAffinity_;
274  }
275 
276  /// Number of working threads to be used by feed engine.
277  ///
278  /// @note Default value is '1'.
280  {
281  return threadCount_;
282  }
283 
284  /// Sets threadsCount. @see threadsCount.
285  void threadCount(UInt32 value)
286  {
287  threadCount_ = value;
288  }
289 
290  /// Defines amount of time Feed Engine keeps cycling before
291  /// going to sleep when no useful activity can be done.
292  ///
293  /// Time is measured in milliseconds.
294  ///
295  /// @note Default value is '1'.
296  ///
297  /// @warning Given parameter has direct influence onto CPU load!
299  {
300  return spinBeforeIdleTime_;
301  }
302 
303  /// Sets redundancySpinTime. @see redundancySpinTime.
305  {
306  spinBeforeIdleTime_ = value;
307  }
308 
309  private:
310  ThreadAffinity threadAffinity_;
311  UInt32 spinBeforeIdleTime_;
312  UInt32 threadCount_;
313  };
314 
315  class ONIXS_EUREX_EMDI_API FeedEngineThreadPool;
316 
317  /// Listener for thread-related events.
318  ///
319  /// Members of this classes are invoked to reflect
320  /// various life-time events of threads spawned and
321  /// used by the feed engine while processing market data.
322  struct ONIXS_EUREX_EMDI_API FeedEngineThreadPoolListener
323  {
324  /// Member invoked by feed engine when
325  /// a new processing thread is spawned.
326  ///
327  /// @note Invocation is done within newly
328  /// started thread.
329  virtual void onFeedEngineThreadBegin(const FeedEngineThreadPool&) {}
330 
331  /// Member is invoked by feed engine when
332  /// processing thread is about to end.
333  ///
334  /// @note Invocation is done within the
335  /// thread that is about to end.
336  virtual void onFeedEngineThreadEnd(const FeedEngineThreadPool&) {}
337 
338  /// Is called when feed engine's thread is idle.
339  ///
340  /// Integer parameter-variable defines amount of time feed engine
341  /// suggest for thread to sleep in kernel after invoking given member.
342  virtual void onFeedEngineThreadIdle(const FeedEngineThreadPool&, FeedEngineThreadIdle::Reason, UInt32&) {}
343 
344  ///
346  };
347 
348  /// A pool of threads executing feed engine tasks.
349  class ONIXS_EUREX_EMDI_API FeedEngineThreadPool
350  {
351  public:
352  FeedEngineThreadPool(
354 
355  ~FeedEngineThreadPool();
356 
357  const FeedEngineThreadPoolSettings settings() const;
358 
359  private:
360  FeedEngineThreadPool(const FeedEngineThreadPool&);
361  FeedEngineThreadPool& operator=(const FeedEngineThreadPool&);
362 
363  const FeedEngineThreadPoolSettings settings_;
364  class FeedEngineThreadPoolImpl;
365  FeedEngineThreadPoolImpl* impl_;
366  };
367 
368  /// Current thread related tasks.
369  class ONIXS_EUREX_EMDI_API ThisThread
370  {
371  public:
372  /// Sets the processor affinity mask for the current thread.
373  static void affinity(const ThreadAffinity&);
374  static void affinity(CpuIndex);
375  };
376  }
377  }
378 }
virtual void onFeedEngineThreadEnd(const FeedEngineThreadPool &)
Definition: FeedEngine.h:336
The Feed Engine machinery.
Definition: FeedEngine.h:103
Represents set of CPU indices.
Definition: FeedEngine.h:186
#define ONIXS_EUREX_EMDI_NULLPTR
Definition: Compiler.h:143
virtual void onFeedEngineThreadBegin(const FeedEngineThreadPool &)
Definition: FeedEngine.h:329
#define ONIXS_EUREX_EMDI_NOEXCEPT
Definition: Compiler.h:130
#define ONIXS_EUREX_EMDI_OVERRIDE
Definition: Compiler.h:134
void reserved(Flags flags) ONIXS_EUREX_EMDI_NOEXCEPT
Reserved (unused) flags.
Definition: FeedEngine.h:80
void spinBeforeIdleTime(UInt32 value)
Sets redundancySpinTime.
Definition: FeedEngine.h:304
bool ioWaited() const ONIXS_EUREX_EMDI_NOEXCEPT
Indicates whether processing had to sleep in kernel while checking data availability.
Definition: FeedEngine.h:62
void ioWaited(bool state) ONIXS_EUREX_EMDI_NOEXCEPT
Indicates whether processing had to sleep in kernel while checking data availability.
Definition: FeedEngine.h:68
void eventsDispatched(bool state) ONIXS_EUREX_EMDI_NOEXCEPT
Indicates whether feed-related events like data reception or absence have been dispatched.
Definition: FeedEngine.h:56
unsigned int UInt32
Definition: Numeric.h:41
Definition: Defines.h:30
size_t CpuIndex
Zero-based index of CPU.
Definition: FeedEngine.h:183
Designed to reflect various aspects of feed engine processing flow.
Definition: FeedEngine.h:37
bool process(FeedEngine &engine)
Definition: FeedEngine.h:132
Identifies reasons feed engine threads becomes idle.
Definition: FeedEngine.h:227
The given class implements feed engine concept using pool of working threads and standard socket API...
Definition: FeedEngine.h:138
static UtcWatch & service()
Returns watch service.
const ThreadAffinity & threadAffinity() const
Definition: FeedEngine.h:259
NetFeedEngineProcessResult(UInt32 flags) ONIXS_EUREX_EMDI_NOEXCEPT
Definition: FeedEngine.h:44
UInt32 Flags
Aliases integral type whose bits are used to indicate flag presence.
Definition: FeedEngine.h:41
Flags reserved() const ONIXS_EUREX_EMDI_NOEXCEPT
Reserved (unused) flags.
Definition: FeedEngine.h:74
static NicWatch & service()
Returns watch service.
NetFeedEngineProcessResult process()
bool eventsDispatched() const ONIXS_EUREX_EMDI_NOEXCEPT
Indicates whether feed-related events like data reception or absence have been dispatched.
Definition: FeedEngine.h:50
virtual void onFeedEngineThreadIdle(const FeedEngineThreadPool &, FeedEngineThreadIdle::Reason, UInt32 &)
Definition: FeedEngine.h:342
The given class implements the Feed Engine concept using the Solarlfare ef_vi SDK.
Definition: FeedEngine.h:160
void threadCount(UInt32 value)
Sets threadsCount.
Definition: FeedEngine.h:285
Current thread related tasks.
Definition: FeedEngine.h:369