pion  5.0.6
scheduler.hpp
1 // ---------------------------------------------------------------------
2 // pion: a Boost C++ framework for building lightweight HTTP interfaces
3 // ---------------------------------------------------------------------
4 // Copyright (C) 2007-2014 Splunk Inc. (https://github.com/splunk/pion)
5 //
6 // Distributed under the Boost Software License, Version 1.0.
7 // See http://www.boost.org/LICENSE_1_0.txt
8 //
9 
10 #ifndef __PION_SCHEDULER_HEADER__
11 #define __PION_SCHEDULER_HEADER__
12 
13 #include <vector>
14 #include <boost/asio.hpp>
15 #include <boost/assert.hpp>
16 #include <boost/bind.hpp>
17 #include <boost/function/function0.hpp>
18 #include <boost/cstdint.hpp>
19 #include <boost/shared_ptr.hpp>
20 #include <boost/noncopyable.hpp>
21 #include <boost/thread/thread.hpp>
22 #include <boost/thread/mutex.hpp>
23 #include <boost/thread/xtime.hpp>
24 #include <boost/thread/condition.hpp>
25 #include <pion/config.hpp>
26 #include <pion/logger.hpp>
27 
28 
29 namespace pion { // begin namespace pion
30 
34 class PION_API scheduler :
35  private boost::noncopyable
36 {
37 public:
38 
40  scheduler(void)
41  : m_logger(PION_GET_LOGGER("pion.scheduler")),
42  m_num_threads(DEFAULT_NUM_THREADS), m_active_users(0), m_is_running(false)
43  {}
44 
46  virtual ~scheduler() {}
47 
49  virtual void startup(void) {}
50 
52  virtual void shutdown(void);
53 
55  void join(void);
56 
60  void add_active_user(void);
61 
63  void remove_active_user(void);
64 
66  inline bool is_running(void) const { return m_is_running; }
67 
69  inline void set_num_threads(const boost::uint32_t n) { m_num_threads = n; }
70 
72  inline boost::uint32_t get_num_threads(void) const { return m_num_threads; }
73 
75  inline void set_logger(logger log_ptr) { m_logger = log_ptr; }
76 
78  inline logger get_logger(void) { return m_logger; }
79 
81  virtual boost::asio::io_service& get_io_service(void) = 0;
82 
88  virtual void post(boost::function0<void> work_func) {
89  get_io_service().post(work_func);
90  }
91 
98  void keep_running(boost::asio::io_service& my_service,
99  boost::asio::deadline_timer& my_timer);
100 
107  inline static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec) {
108  boost::system_time wakeup_time(get_wakeup_time(sleep_sec, sleep_nsec));
109  boost::thread::sleep(wakeup_time);
110  }
111 
121  template <typename ConditionType, typename LockType>
122  inline static void sleep(ConditionType& wakeup_condition, LockType& wakeup_lock,
123  boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
124  {
125  boost::system_time wakeup_time(get_wakeup_time(sleep_sec, sleep_nsec));
126  wakeup_condition.timed_wait(wakeup_lock, wakeup_time);
127  }
128 
129 
131  void process_service_work(boost::asio::io_service& service);
132 
133 
134 protected:
135 
144  static boost::system_time get_wakeup_time(boost::uint32_t sleep_sec,
145  boost::uint32_t sleep_nsec);
146 
148  virtual void stop_services(void) {}
149 
151  virtual void stop_threads(void) {}
152 
154  virtual void finish_services(void) {}
155 
157  virtual void finish_threads(void) {}
158 
159 
161  static const boost::uint32_t DEFAULT_NUM_THREADS;
162 
164  static const boost::uint32_t NSEC_IN_SECOND;
165 
167  static const boost::uint32_t MICROSEC_IN_SECOND;
168 
170  static const boost::uint32_t KEEP_RUNNING_TIMER_SECONDS;
171 
172 
174  boost::mutex m_mutex;
175 
178 
180  boost::condition m_no_more_active_users;
181 
183  boost::condition m_scheduler_has_stopped;
184 
186  boost::uint32_t m_num_threads;
187 
189  boost::uint32_t m_active_users;
190 
193 };
194 
195 
199 class PION_API multi_thread_scheduler :
200  public scheduler
201 {
202 public:
203 
206 
209 
210 
211 protected:
212 
214  virtual void stop_threads(void) {
215  if (! m_thread_pool.empty()) {
216  PION_LOG_DEBUG(m_logger, "Waiting for threads to shutdown");
217 
218  // wait until all threads in the pool have stopped
219  boost::thread current_thread;
220  for (ThreadPool::iterator i = m_thread_pool.begin();
221  i != m_thread_pool.end(); ++i)
222  {
223  // make sure we do not call join() for the current thread,
224  // since this may yield "undefined behavior"
225  if (**i != current_thread) (*i)->join();
226  }
227  }
228  }
229 
231  virtual void finish_threads(void) { m_thread_pool.clear(); }
232 
233 
235  typedef std::vector<boost::shared_ptr<boost::thread> > ThreadPool;
236 
237 
239  ThreadPool m_thread_pool;
240 };
241 
242 
246 class PION_API single_service_scheduler :
248 {
249 public:
250 
253  : m_service(), m_timer(m_service)
254  {}
255 
257  virtual ~single_service_scheduler() { shutdown(); }
258 
260  virtual boost::asio::io_service& get_io_service(void) { return m_service; }
261 
263  virtual void startup(void);
264 
265 
266 protected:
267 
269  virtual void stop_services(void) { m_service.stop(); }
270 
272  virtual void finish_services(void) { m_service.reset(); }
273 
274 
276  boost::asio::io_service m_service;
277 
279  boost::asio::deadline_timer m_timer;
280 };
281 
282 
286 class PION_API one_to_one_scheduler :
288 {
289 public:
290 
293  : m_service_pool(), m_next_service(0)
294  {}
295 
297  virtual ~one_to_one_scheduler() { shutdown(); }
298 
300  virtual boost::asio::io_service& get_io_service(void) {
301  boost::mutex::scoped_lock scheduler_lock(m_mutex);
302  while (m_service_pool.size() < m_num_threads) {
303  boost::shared_ptr<service_pair_type> service_ptr(new service_pair_type());
304  m_service_pool.push_back(service_ptr);
305  }
306  if (++m_next_service >= m_num_threads)
307  m_next_service = 0;
308  BOOST_ASSERT(m_next_service < m_num_threads);
309  return m_service_pool[m_next_service]->first;
310  }
311 
318  virtual boost::asio::io_service& get_io_service(boost::uint32_t n) {
319  BOOST_ASSERT(n < m_num_threads);
320  BOOST_ASSERT(n < m_service_pool.size());
321  return m_service_pool[n]->first;
322  }
323 
325  virtual void startup(void);
326 
327 
328 protected:
329 
331  virtual void stop_services(void) {
332  for (service_pool_type::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
333  (*i)->first.stop();
334  }
335  }
336 
338  virtual void finish_services(void) { m_service_pool.clear(); }
339 
340 
343  service_pair_type(void) : first(), second(first) {}
344  boost::asio::io_service first;
345  boost::asio::deadline_timer second;
346  };
347 
349  typedef std::vector<boost::shared_ptr<service_pair_type> > service_pool_type;
350 
351 
353  service_pool_type m_service_pool;
354 
356  boost::uint32_t m_next_service;
357 };
358 
359 
360 } // end namespace pion
361 
362 #endif
virtual void post(boost::function0< void > work_func)
Definition: scheduler.hpp:88
virtual ~multi_thread_scheduler()
virtual destructor
Definition: scheduler.hpp:208
boost::condition m_scheduler_has_stopped
condition triggered when the scheduler has stopped
Definition: scheduler.hpp:183
virtual void finish_threads(void)
finishes all threads used to perform work
Definition: scheduler.hpp:231
virtual void stop_threads(void)
stops all threads used to perform work
Definition: scheduler.hpp:151
virtual void stop_services(void)
stops all services used to schedule work
Definition: scheduler.hpp:148
virtual ~one_to_one_scheduler()
virtual destructor
Definition: scheduler.hpp:297
void set_logger(logger log_ptr)
sets the logger to be used
Definition: scheduler.hpp:75
bool m_is_running
true if the thread scheduler is running
Definition: scheduler.hpp:192
static const boost::uint32_t DEFAULT_NUM_THREADS
default number of worker threads in the thread pool
Definition: scheduler.hpp:161
boost::asio::deadline_timer m_timer
timer used to periodically check for shutdown
Definition: scheduler.hpp:279
single_service_scheduler(void)
constructs a new single_service_scheduler
Definition: scheduler.hpp:252
multi_thread_scheduler(void)
constructs a new single_service_scheduler
Definition: scheduler.hpp:205
virtual void finish_services(void)
finishes all services used to schedule work
Definition: scheduler.hpp:154
virtual void stop_services(void)
stops all services used to schedule work
Definition: scheduler.hpp:269
static void sleep(ConditionType &wakeup_condition, LockType &wakeup_lock, boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
Definition: scheduler.hpp:122
boost::condition m_no_more_active_users
condition triggered when there are no more active users
Definition: scheduler.hpp:180
virtual void finish_services(void)
finishes all services used to schedule work
Definition: scheduler.hpp:272
bool is_running(void) const
returns true if the scheduler is running
Definition: scheduler.hpp:66
boost::asio::io_service m_service
service used to manage async I/O events
Definition: scheduler.hpp:276
static const boost::uint32_t MICROSEC_IN_SECOND
number of microseconds in one full second (10 ^ 6)
Definition: scheduler.hpp:167
boost::uint32_t m_num_threads
total number of worker threads in the pool
Definition: scheduler.hpp:186
virtual boost::asio::io_service & get_io_service(void)
returns an async I/O service used to schedule work
Definition: scheduler.hpp:300
ThreadPool m_thread_pool
pool of threads used to perform work
Definition: scheduler.hpp:239
virtual ~scheduler()
virtual destructor
Definition: scheduler.hpp:46
virtual boost::asio::io_service & get_io_service(void)
returns an async I/O service used to schedule work
Definition: scheduler.hpp:260
boost::uint32_t get_num_threads(void) const
returns the number of threads currently in use
Definition: scheduler.hpp:72
scheduler(void)
constructs a new scheduler
Definition: scheduler.hpp:40
boost::mutex m_mutex
mutex to make class thread-safe
Definition: scheduler.hpp:174
static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
Definition: scheduler.hpp:107
static const boost::uint32_t NSEC_IN_SECOND
number of nanoseconds in one full second (10 ^ 9)
Definition: scheduler.hpp:164
logger m_logger
primary logging interface used by this class
Definition: scheduler.hpp:177
boost::uint32_t m_active_users
the scheduler will not shutdown until there are no more active users
Definition: scheduler.hpp:189
void set_num_threads(const boost::uint32_t n)
sets the number of threads to be used (these are shared by all servers)
Definition: scheduler.hpp:69
static const boost::uint32_t KEEP_RUNNING_TIMER_SECONDS
number of seconds a timer should wait for to keep the IO services running
Definition: scheduler.hpp:170
std::vector< boost::shared_ptr< service_pair_type > > service_pool_type
typedef for a pool of IO services
Definition: scheduler.hpp:349
boost::uint32_t m_next_service
the next service to use for scheduling work
Definition: scheduler.hpp:356
std::vector< boost::shared_ptr< boost::thread > > ThreadPool
typedef for a pool of worker threads
Definition: scheduler.hpp:235
typedef for a pair object where first is an IO service and second is a deadline timer ...
Definition: scheduler.hpp:342
virtual void finish_services(void)
finishes all services used to schedule work
Definition: scheduler.hpp:338
virtual boost::asio::io_service & get_io_service(boost::uint32_t n)
Definition: scheduler.hpp:318
virtual void stop_services(void)
stops all services used to schedule work
Definition: scheduler.hpp:331
virtual ~single_service_scheduler()
virtual destructor
Definition: scheduler.hpp:257
logger get_logger(void)
returns the logger currently in use
Definition: scheduler.hpp:78
virtual void startup(void)
Starts the thread scheduler (this is called automatically when necessary)
Definition: scheduler.hpp:49
virtual void stop_threads(void)
stops all threads used to perform work
Definition: scheduler.hpp:214
service_pool_type m_service_pool
pool of IO services used to schedule work
Definition: scheduler.hpp:353
virtual void finish_threads(void)
finishes all threads used to perform work
Definition: scheduler.hpp:157
one_to_one_scheduler(void)
constructs a new one_to_one_scheduler
Definition: scheduler.hpp:292