OnixS C++ CME MDP Streamlined Market Data Handler  1.2.0
API Documentation
FeedEngine.h
Go to the documentation of this file.
1 // Copyright Onix Solutions Limited [OnixS]. All rights reserved.
2 //
3 // This software owned by Onix Solutions Limited [OnixS] and is
4 // protected by copyright law and international copyright treaties.
5 //
6 // Access to and use of the software is governed by the terms of the applicable
7 // OnixS Software Services Agreement (the Agreement) and Customer end user license
8 // agreements granting a non-assignable, non-transferable and non-exclusive license
9 // to use the software for it's own data processing purposes under the terms defined
10 // in the Agreement.
11 //
12 // Except as otherwise granted within the terms of the Agreement, copying or
13 // reproduction of any part of this source code or associated reference material
14 // to any other location for further reproduction or redistribution, and any
15 // amendments to this copyright notice, are expressly prohibited.
16 //
17 // Any reproduction or redistribution for sale or hiring of the Software not in
18 // accordance with the terms of the Agreement is a violation of copyright law.
19 //
20 
21 #pragma once
22 
27 
29 
30 /// Zero-based index of CPU.
31 typedef Int32 CpuIndex;
32 
33 /// Represents set of CPU indices.
35 {
36 public:
37  /// Alias for collection of CPU indices.
38  typedef
41 
42  /// Initializes empty set.
44  {
45  }
46 
47  /// Initializes as copy of other set.
49  const ThreadAffinity& other)
50  : indices_(other.indices_)
51  {
52  }
53 
54  /// Utilizes all the resources.
56  {
57  }
58 
59  /// Read-only access to index collection.
60  const CpuIndices& cpus() const
61  {
62  return indices_;
63  }
64 
65  /// Collection of CPU indices.
67  {
68  return indices_;
69  }
70 
71 private:
72  CpuIndices indices_;
73 };
74 
75 /// Serializes thread affinity into a string.
77 void
78 toStr(
79  std::string&,
80  const ThreadAffinity&);
81 
82 /// Serializes thread affinity into a string.
83 inline
84 std::string
86  const ThreadAffinity& affinity)
87 {
88  std::string str;
89 
90  toStr(str, affinity);
91 
92  return str;
93 }
94 
95 /// Collection of settings affecting Feed Engine behavior.
97 {
98  ThreadAffinity threadAffinity_;
99  UInt32 threadCount_;
100 
101  WatchService* watch_;
102 
103  UInt32 dataWaitTime_;
104  UInt32 spinBeforeIdleTime_;
105 
106  UInt32 socketBufferSize_;
107 
108 public:
109  /// Initializes options with default values.
111  : threadAffinity_()
112  , threadCount_(1)
113  , watch_(
114  &UtcWatch::service())
115  , dataWaitTime_(10)
116  , spinBeforeIdleTime_(1)
117  , socketBufferSize_(
118  8 * 1024 * 1024)
119  {
120  }
121 
122  /// Cleans everything up.
124  {
125  }
126 
127  /// Defines set of CPUs allowed for each working thread
128  /// to be executed on while processing market data by Handlers
129  /// binded to Feed Engine instance which is configured by given
130  /// settings.
131  ///
132  /// @note By default set is empty thus allowing threads
133  /// to be executed on any CPU available in the system.
134  const
137  {
138  return threadAffinity_;
139  }
140 
141  /// Defines set of CPUs allowed for each working thread
142  /// to be executed on while processing market data by Handlers
143  /// binded to Feed Engine instance which is configured by given
144  /// settings.
145  ///
146  /// @note By default set is empty thus allowing threads
147  /// to be executed on any CPU available in the system.
149  {
150  return threadAffinity_;
151  }
152 
153  /// Number of working threads to be used by feed engine.
154  ///
155  /// @note Default value is '1'.
156  UInt32
157  threadCount() const
158  {
159  return threadCount_;
160  }
161 
162  /// Sets threadsCount. @see threadsCount.
163  void
165  UInt32 value)
166  {
167  threadCount_ = value;
168  }
169 
170  /// Watch service to be used by Feed Engine.
171  ///
172  /// Watch is used by Feed Engine to timestamp
173  /// packets received from the feeds.
174  ///
175  /// @note By default, UTC watch service is used.
177  {
178  return *watch_;
179  }
180 
181  /// Watch service to be used by Feed Engine.
182  ///
183  /// If no instance associated, UTC watch is used.
184  void
186  WatchService& watch)
187  {
188  watch_ = &watch;
189  }
190 
191  /// Defines amount of time Feed Engine spends on socket
192  /// waiting for I/O while running master processing loop.
193  ///
194  /// Time is measured in milliseconds.
195  ///
196  /// @note Default value is '10'.
197  ///
198  /// @warning Given parameter significantly affects
199  /// Handler's responsiveness and load onto CPU!
200  UInt32
201  dataWaitTime() const
202  {
203  return dataWaitTime_;
204  }
205 
206  /// Sets dataWaitTime. @see dataWaitTime.
207  void
209  UInt32 value)
210  {
211  dataWaitTime_ = value;
212  }
213 
214  /// Defines amount of time Feed Engine keeps cycling before
215  /// going to sleep when no useful activity can be done.
216  ///
217  /// Time is measured in milliseconds.
218  ///
219  /// @note Default value is '1'.
220  ///
221  /// @warning Given parameter has direct influence onto CPU load!
222  UInt32
224  {
225  return spinBeforeIdleTime_;
226  }
227 
228  /// Sets redundancySpinTime. @see redundancySpinTime.
229  void
231  UInt32 value)
232  {
233  spinBeforeIdleTime_ = value;
234  }
235 
236  /// Defines size of receiving buffer in bytes for sockets.
237  ///
238  /// @note Default value is 4 MiB.
239  UInt32
241  {
242  return socketBufferSize_;
243  }
244 
245  /// Sets udpSocketBufferSize. @see udpSocketBufferSize.
246  void
248  UInt32 value)
249  {
250  socketBufferSize_ = value;
251  }
252 };
253 
254 /// Serializes feed engine settings into a string.
256 void
257 toStr(
258  std::string&,
259  const FeedEngineSettings&);
260 
261 /// Serializes feed engine settings into a string.
262 inline
263 std::string
265  const FeedEngineSettings& settings)
266 {
267  std::string str;
268 
269  toStr(str, settings);
270 
271  return str;
272 }
273 
274 /// Identifies reasons feed engine threads becomes idle.
276 {
277  enum Reason
278  {
279  /// Thread waited for incoming data using corresponding
280  /// I/O operations (like 'select') and exited waiting with
281  /// no data availability signs.
283 
284  /// Thread entered idle state due to absence of any data and
285  /// while other threads are waiting for new incoming data.
286  Redundant
287  };
288 };
289 
290 //
291 
293 
294 /// Listener for thread-related events.
295 ///
296 /// Members of this classes are invoked to reflect
297 /// various life-time events of threads spawned and
298 /// used by the feed engine while processing market data.
300 {
301  /// Member invoked by feed engine when
302  /// a new processing thread is spawned.
303  ///
304  /// @note Invocation is done within newly
305  /// started thread.
306  virtual
307  void
309  const FeedEngine&)
310  {
311  }
312 
313  /// Member is invoked by feed engine when
314  /// processing thread is about to end.
315  ///
316  /// @note Invocation is done within the
317  /// thread that is about to end.
318  virtual
319  void
321  const FeedEngine&)
322  {
323  }
324 
325  /// Is called when feed engine's thread is idle.
326  ///
327  /// Thread becomes idle when either no data is received within
328  /// time interval defined by FeedEngineSettings::dataWaitTime
329  /// parameter or no pending data is available for processing.
330  /// In the first case, callback is invoked with 'DataWaitTimeout'
331  /// reason. In the second case, thread is considered as redundant
332  /// and thus callback is invoked with 'Redundant' reason.
333  /// After callback invocation threads may sleep in kernel to reduce
334  /// load onto CPU and racing between feed engine working threads.
335  ///
336  /// Integer parameter-variable defines amount of time feed engine
337  /// suggest for thread to sleep in kernel after invoking given member.
338  virtual
339  void
341  const FeedEngine&,
343  UInt32&)
344  {
345  }
346 
347  /// Invoked if working thread experiences an issue
348  /// while processing its tasks. Once invoked, working
349  /// thread continues performing its tasks.
350  virtual
351  void
353  const FeedEngine&,
354  const Char*)
355  {
356  }
357 };
358 
359 // Workhorse for the public Feed Engine.
361 (
362  MultithreadedFeedEngine
363 );
364 
365 /// Manages processing machinery for market data received from feeds.
367 {
368  // Current workhorse is multi-threaded
369  // feed engine using on socket API.
370  friend
372  (
373  MultithreadedFeedEngine
374  );
375 
376  MultithreadedFeedEngine* engine_;
377 
378  // Copying is not supposed for given class.
379 
380  FeedEngine(const FeedEngine&);
381  FeedEngine& operator =(const FeedEngine&);
382 
383 public:
384  /// Initializes engine with given configuration.
385  FeedEngine(
386  const FeedEngineSettings&,
388 
389  /// Destructs given instance.
390  ~FeedEngine();
391 
392  /// Settings used define behavior of given instance.
393  const
395  settings() const;
396 };
397 
UInt32 dataWaitTime() const
Defines amount of time Feed Engine spends on socket waiting for I/O while running master processing l...
Definition: FeedEngine.h:201
ThreadAffinity()
Initializes empty set.
Definition: FeedEngine.h:43
virtual void onFeedEngineThreadIssue(const FeedEngine &, const Char *)
Invoked if working thread experiences an issue while processing its tasks.
Definition: FeedEngine.h:352
UInt32 threadCount() const
Number of working threads to be used by feed engine.
Definition: FeedEngine.h:157
#define ONIXS_CMESTREAMLINEDMDH_EXPORTED_CLASS_DECL(typeName)
Definition: Bootstrap.h:55
Thread waited for incoming data using corresponding I/O operations (like 'select') and exited waiting...
Definition: FeedEngine.h:282
~ThreadAffinity()
Utilizes all the resources.
Definition: FeedEngine.h:55
ThreadAffinity(const ThreadAffinity &other)
Initializes as copy of other set.
Definition: FeedEngine.h:48
Manages processing machinery for market data received from feeds.
Definition: FeedEngine.h:366
TinySet< CpuIndex > CpuIndices
Alias for collection of CPU indices.
Definition: FeedEngine.h:40
virtual void onFeedEngineThreadBegin(const FeedEngine &)
Member invoked by feed engine when a new processing thread is spawned.
Definition: FeedEngine.h:308
UInt32 spinBeforeIdleTime() const
Defines amount of time Feed Engine keeps cycling before going to sleep when no useful activity can be...
Definition: FeedEngine.h:223
WatchService & watch() const
Watch service to be used by Feed Engine.
Definition: FeedEngine.h:176
Represents set of CPU indices.
Definition: FeedEngine.h:34
Collection of settings affecting Feed Engine behavior.
Definition: FeedEngine.h:96
Listener for thread-related events.
Definition: FeedEngine.h:299
Int32 CpuIndex
Zero-based index of CPU.
Definition: FeedEngine.h:31
virtual void onFeedEngineThreadEnd(const FeedEngine &)
Member is invoked by feed engine when processing thread is about to end.
Definition: FeedEngine.h:320
Identifies reasons feed engine threads becomes idle.
Definition: FeedEngine.h:275
#define ONIXS_CMESTREAMLINEDMDH_NAMESPACE_END
Definition: Bootstrap.h:173
#define ONIXS_CMESTREAMLINEDMDH_LTWT_CLASS
Definition: Bootstrap.h:111
#define ONIXS_CMESTREAMLINEDMDH_EXPORTED
Definition: Compiler.h:160
#define ONIXS_CMESTREAMLINEDMDH_INTERNAL_CLASS_DECL(typeName)
Definition: Bootstrap.h:71
UInt32 socketBufferSize() const
Defines size of receiving buffer in bytes for sockets.
Definition: FeedEngine.h:240
void threadCount(UInt32 value)
Sets threadsCount.
Definition: FeedEngine.h:164
char Char
Character type alias.
Definition: String.h:36
#define ONIXS_CMESTREAMLINEDMDH_EXPORTED_CLASS
Definition: Bootstrap.h:63
ThreadAffinity & threadAffinity()
Defines set of CPUs allowed for each working thread to be executed on while processing market data by...
Definition: FeedEngine.h:148
CpuIndices & cpus()
Collection of CPU indices.
Definition: FeedEngine.h:66
Abstract watch service.
Definition: Time.h:858
void socketBufferSize(UInt32 value)
Sets udpSocketBufferSize.
Definition: FeedEngine.h:247
void dataWaitTime(UInt32 value)
Sets dataWaitTime.
Definition: FeedEngine.h:208
const CpuIndices & cpus() const
Read-only access to index collection.
Definition: FeedEngine.h:60
void spinBeforeIdleTime(UInt32 value)
Sets redundancySpinTime.
Definition: FeedEngine.h:230
std::string toStr(const FeedEngineSettings &settings)
Serializes feed engine settings into a string.
Definition: FeedEngine.h:264
#define ONIXS_CMESTREAMLINEDMDH_EXPORTED_STRUCT
Definition: Bootstrap.h:67
#define ONIXS_CMESTREAMLINEDMDH_NULLPTR
Definition: Compiler.h:167
FeedEngineSettings()
Initializes options with default values.
Definition: FeedEngine.h:110
#define ONIXS_CMESTREAMLINEDMDH_LTWT_STRUCT
Definition: Bootstrap.h:115
virtual void onFeedEngineThreadIdle(const FeedEngine &, FeedEngineThreadIdle::Reason, UInt32 &)
Is called when feed engine&#39;s thread is idle.
Definition: FeedEngine.h:340
UInt32 UInt32
uInt32.
Definition: Fields.h:183
void watch(WatchService &watch)
Watch service to be used by Feed Engine.
Definition: FeedEngine.h:185
#define ONIXS_CMESTREAMLINEDMDH_NAMESPACE_BEGIN
Definition: Bootstrap.h:169
const ThreadAffinity & threadAffinity() const
Defines set of CPUs allowed for each working thread to be executed on while processing market data by...
Definition: FeedEngine.h:136