OnixS C++ CME MDP Premium Market Data Handler  5.8.3
API Documentation
FeedEngineThreadPool.h
Go to the documentation of this file.
1 // Copyright Onix Solutions Limited [OnixS]. All rights reserved.
2 //
3 // This software owned by Onix Solutions Limited [OnixS] and is
4 // protected by copyright law and international copyright treaties.
5 //
6 // Access to and use of the software is governed by the terms of the applicable
7 // OnixS Software Services Agreement (the Agreement) and Customer end user license
8 // agreements granting a non-assignable, non-transferable and non-exclusive license
9 // to use the software for it's own data processing purposes under the terms defined
10 // in the Agreement.
11 //
12 // Except as otherwise granted within the terms of the Agreement, copying or
13 // reproduction of any part of this source code or associated reference material
14 // to any other location for further reproduction or redistribution, and any
15 // amendments to this copyright notice, are expressly prohibited.
16 //
17 // Any reproduction or redistribution for sale or hiring of the Software not in
18 // accordance with the terms of the Agreement is a violation of copyright law.
19 //
20 
21 #pragma once
22 
23 #include <vector>
24 
25 #include <OnixS/CME/MDH/String.h>
26 #include <OnixS/CME/MDH/Integral.h>
27 #include <OnixS/CME/MDH/TinySet.h>
28 
30 
31 ONIXS_CMEMDH_EXPORTED_CLASS_DECL(FeedEngineThreadPool);
32 
33 /// Listener for thread-related events.
34 ///
35 /// Members of this classes are invoked to reflect
36 /// various life-time events of threads spawned and
37 /// used by feed engines while processing market data.
39 {
40  /// Invoked by the thread before entering the processing loop.
41  ///
42  /// @note This callback allows the subscriber to perform thread-specific tuning (e.g., setting thread affinity or priority).
44 
45  /// Invoked by the thread when it is about to end.
46  ///
47  /// @note This callback allows the subscriber to perform thread-specific cleanup.
49 
50  /// Invoked when the thread is idle.
51  ///
52  /// @return `true` if the thread should sleep for some time to reduce the CPU load, and `false` - to resume processing without delay.
54  {
55  return false;
56  }
57 
58  /// Invoked when the thread experiences an issue while processing its tasks.
59  /// Once invoked, the thread continues performing its tasks.
60  virtual void onFeedEngineThreadIssue(const FeedEngineThreadPool&, const Char*){}
61 };
62 
63 /// Zero-based index of CPU.
64 typedef Int32 CpuIndex;
65 
66 /// Alias for collection of CPU indices.
68 
69 /// Represents a set of CPU indices.
71 {
72 public:
73  /// Initializes the empty set.
75 
76  /// Initializes as a copy of the other set.
78  : indices_(other.indices_)
79  {
80  }
81 
82  /// Utilizes internal resources.
84 
85  /// Indicates whether affinity is defined.
86  operator bool() const
87  {
88  return indices_.empty();
89  }
90 
91  /// Read-only access to the indices.
92  const CpuIndices& cpus() const
93  {
94  return indices_;
95  }
96 
97  /// Collection of the CPU indices.
99  {
100  return indices_;
101  }
102 
103 private:
104  CpuIndices indices_;
105 };
106 
107 /// Serializes thread affinity into a string.
109 void toStr(std::string&, const ThreadAffinity&);
110 
111 /// Serializes thread affinity into a string.
112 inline std::string toStr(const ThreadAffinity& affinity)
113 {
114  std::string str;
115 
116  toStr(str, affinity);
117 
118  return str;
119 }
120 
121 //
122 
123 /// The collection of parameters affecting behavior
124 /// of a thread pool. The given class serves as a base for
125 /// defining and manipulating thread pool related parameters.
127 {
128 public:
129  /// Assigns the default values for the given
130  /// instance of the thread pool settings.
132  : affinity_()
133  , size_(1)
134  {
135  }
136 
137  /// Cleans everything up.
139 
140  /// Defines set of CPUs allowed for each working
141  /// thread to be executed on while running tasks.
142  ///
143  /// @note By default set is empty thus allowing threads
144  /// to be executed on any CPU available in the system.
145  const ThreadAffinity& affinity() const
146  {
147  return affinity_;
148  }
149 
150  /// Defines set of CPUs allowed for each working
151  /// thread to be executed on while running tasks.
152  ///
153  /// @note By default set is empty thus allowing threads
154  /// to be executed on any CPU available in the system.
156  {
157  return affinity_;
158  }
159 
160  /// Number of working threads in the pool.
161  ///
162  /// @note Default value is '1'.
163  UInt32 size() const
164  {
165  return size_;
166  }
167 
168  /// Defines the number of working threads in the pool.
170  {
171  size_ = value;
172  return *this;
173  }
174 
175 private:
176  ThreadAffinity affinity_;
177  UInt32 size_;
178 };
179 
180 /// Serializes thread pool settings into a string.
182 void toStr(std::string&, const ThreadPoolSettings&);
183 
184 /// Serializes thread pool settings into a string.
185 inline std::string toStr(const ThreadPoolSettings& settings)
186 {
187  std::string str;
188 
189  toStr(str, settings);
190 
191  return str;
192 }
193 
194 /// The collection of parameters affecting behavior
195 /// of a Feed Engine thread pool.
197 {
198 public:
199  /// Assigns the default values for the given
200  /// instance of the thread pool settings.
202  : loopBeforeIdle_(1)
203  {
204  }
205 
206  /// Settings related to the thread pool.
208  {
209  return pool_;
210  }
211 
212  /// Settings related to the thread pool.
213  const ThreadPoolSettings& pool() const
214  {
215  return pool_;
216  }
217 
218  /// Defines amount of cycles a working thread keeps
219  /// cycling before entering the idle state if no useful
220  /// activity can be done.
221  ///
222  /// Value is measured in number of processing cycles.
223  ///
224  /// @note Default value is '1'.
225  ///
226  /// @warning The given parameter may direct influence
227  /// onto CPU load.
229  {
230  return loopBeforeIdle_;
231  }
232 
233  /// Updates amount of cycles a working thread keeps
234  /// cycling before entering the idle state if no useful
235  /// activity can be done.
236  ///
237  /// @see loopBeforeIdle.
239  {
240  loopBeforeIdle_ = value;
241  }
242 
243 private:
244  ThreadPoolSettings pool_;
245  UInt32 loopBeforeIdle_;
246 };
247 
248 /// Serializes settings defining behavior
249 /// of feed engine threads into a string.
251 void toStr(std::string&, const FeedEngineThreadSettings&);
252 
253 /// Serializes settings defining behavior
254 /// of feed engine threads into a string.
255 inline std::string toStr(const FeedEngineThreadSettings& settings)
256 {
257  std::string str;
258 
259  toStr(str, settings);
260 
261  return str;
262 }
263 
265 
266 /// A pool of threads executing feed engine tasks.
268 {
269 public:
270  /// Initializes working threads and spawns
271  /// feed engine process() routine invocation.
273 
274  /// Stops all tasks and destructs the working threads.
276 
277 private:
278  class Workhorse;
279  Workhorse* workhorse_;
280 };
281 
282 
283 /// Current thread related tasks.
285 {
286 public:
287  /// Sets the processor affinity mask for the current thread.
288  static void affinity(const ThreadAffinity&);
289  static void affinity(CpuIndex);
290 };
291 
ThreadPoolSettings & size(UInt32 value)
Defines the number of working threads in the pool.
Int32 Int32
int32.
Definition: Fields.h:60
TinySet< CpuIndex > CpuIndices
Alias for collection of CPU indices.
A pool of threads executing feed engine tasks.
ThreadAffinity(const ThreadAffinity &other)
Initializes as a copy of the other set.
UInt32 UInt32
uInt32.
Definition: Fields.h:192
#define ONIXS_CMEMDH_NULLPTR
Definition: Compiler.h:145
const CpuIndices & cpus() const
Read-only access to the indices.
virtual void onFeedEngineThreadEnd(const FeedEngineThreadPool &)
Invoked by the thread when it is about to end.
ThreadAffinity & affinity()
Defines set of CPUs allowed for each working thread to be executed on while running tasks...
Current thread related tasks.
Int32 CpuIndex
Zero-based index of CPU.
virtual void onFeedEngineThreadIssue(const FeedEngineThreadPool &, const Char *)
Invoked when the thread experiences an issue while processing its tasks.
The collection of parameters affecting behavior of a Feed Engine thread pool.
#define ONIXS_CMEMDH_EXPORTED_CLASS_DECL(typeName)
Definition: Bootstrap.h:35
#define ONIXS_CMEMDH_LTWT
Definition: Bootstrap.h:46
ThreadPoolSettings & pool()
Settings related to the thread pool.
Abstraction for the Feed Engine machinery.
Definition: FeedEngine.h:101
Represents a set of CPU indices.
~ThreadPoolSettings()
Cleans everything up.
ThreadAffinity()
Initializes the empty set.
char Char
Character type alias.
Definition: String.h:36
bool value(Number &number, const MultiContainer &container, Tag tag)
Finds a tag-value entry in the given collection by the given tag and returns its value component tran...
FeedEngineThreadSettings()
Assigns the default values for the given instance of the thread pool settings.
const ThreadPoolSettings & pool() const
Settings related to the thread pool.
ThreadPoolSettings()
Assigns the default values for the given instance of the thread pool settings.
#define ONIXS_CMEMDH_NAMESPACE_BEGIN
Definition: Bootstrap.h:67
std::string toStr(const FeedEngineThreadSettings &settings)
Serializes settings defining behavior of feed engine threads into a string.
UInt32 size() const
Number of working threads in the pool.
#define ONIXS_CMEMDH_EXPORTED
Definition: Compiler.h:135
const ThreadAffinity & affinity() const
Defines set of CPUs allowed for each working thread to be executed on while running tasks...
CpuIndices & cpus()
Collection of the CPU indices.
The collection of parameters affecting behavior of a thread pool.
Listener for thread-related events.
virtual void onFeedEngineThreadBegin(const FeedEngineThreadPool &)
Invoked by the thread before entering the processing loop.
UInt32 loopBeforeIdle() const
Defines amount of cycles a working thread keeps cycling before entering the idle state if no useful a...
virtual bool onFeedEngineThreadIdle(const FeedEngineThreadPool &)
Invoked when the thread is idle.
~ThreadAffinity()
Utilizes internal resources.
void loopBeforeIdle(UInt32 value)
Updates amount of cycles a working thread keeps cycling before entering the idle state if no useful a...
#define ONIXS_CMEMDH_NAMESPACE_END
Definition: Bootstrap.h:68