libcopp  1.1.0
task_manager.h
Go to the documentation of this file.
1 /*
2  * task_manager.h
3  *
4  * Created on: 2014年6月16日
5  * Author: owent
6  *
7  * Released under the MIT license
8  */
9 
10 #ifndef COTASK_TASK_MANAGER_H
11 #define COTASK_TASK_MANAGER_H
12 
13 #pragma once
14 
15 #include <algorithm>
16 #include <assert.h>
17 #include <ctime>
18 #include <map>
19 #include <stdint.h>
20 #include <vector>
21 
22 #include <libcotask/task_macros.h>
23 
24 
25 namespace cotask {
26 
27  namespace detail {
28  struct tickspec_t {
29  time_t tv_sec; /* Seconds. */
30  int tv_nsec; /* Nanoseconds. */
31 
32  friend bool operator<(const tickspec_t &l, const tickspec_t &r) {
33  return (l.tv_sec != r.tv_sec) ? l.tv_sec < r.tv_sec : l.tv_nsec < r.tv_nsec;
34  }
35 
36  friend bool operator==(const tickspec_t &l, const tickspec_t &r) { return l.tv_sec == r.tv_sec && l.tv_nsec == r.tv_nsec; }
37 
38  friend bool operator<=(const tickspec_t &l, const tickspec_t &r) {
39  return (l.tv_sec != r.tv_sec) ? l.tv_sec <= r.tv_sec : l.tv_nsec <= r.tv_nsec;
40  }
41  };
42  } // namespace detail
43 
44  template <typename TTask>
45  struct task_mgr_node {
46  typedef typename TTask::ptr_t task_ptr_t;
47 
49  task_ptr_t task_;
50  };
51 
55  template <typename TTask, typename TTaskContainer = std::map<typename TTask::id_t, task_mgr_node<TTask> > >
56  class task_manager {
57  public:
58  typedef TTask task_t;
59  typedef TTaskContainer container_t;
60  typedef typename task_t::id_t id_t;
61  typedef typename task_t::ptr_t task_ptr_t;
63  typedef std::shared_ptr<self_t> ptr_t;
64 
65  struct flag_t {
66  enum type {
67  EN_TM_NONE = 0x00,
68  EN_TM_IN_TICK = 0x01,
69  EN_TM_IN_RESET = 0x02,
70  };
71  };
72 
73  private:
74  struct flag_guard_t {
75  int * data_;
76  typename flag_t::type flag_;
77  inline flag_guard_t(int *flags, typename flag_t::type v) : data_(flags), flag_(v) {
78  if (NULL == data_ || (*data_ & flag_)) {
79  flag_ = flag_t::EN_TM_NONE;
80  data_ = NULL;
81  } else {
82  (*data_) |= flag_;
83  }
84  }
85  inline ~flag_guard_t() {
86  if (*this) {
87  (*data_) &= ~flag_;
88  }
89  }
90 
91  inline operator bool() { return NULL != data_ && flag_t::EN_TM_NONE != flag_; }
92  };
93 
94  public:
95  task_manager() : flags_(0) {
96  last_tick_time_.tv_sec = 0;
97  last_tick_time_.tv_nsec = 0;
98  }
99 
101  // safe remove all task
102  reset();
103  }
104 
105  void reset() {
106  flag_guard_t reset_flag(&flags_, flag_t::EN_TM_IN_RESET);
107  if (!reset_flag) {
108  return;
109  }
110 
111  std::vector<task_ptr_t> all_tasks;
112  // first, lock and reset all data
113  {
114 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
115  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
116 #endif
117 
118  for (typename container_t::iterator iter = tasks_.begin(); iter != tasks_.end(); ++iter) {
119  all_tasks.push_back(iter->second.task_);
120  }
121 
122  tasks_.clear();
123  task_timeout_checkpoints_.clear();
124  flags_ = 0;
125  last_tick_time_.tv_sec = 0;
126  last_tick_time_.tv_nsec = 0;
127  }
128 
129  // then, kill all tasks
130  for (typename std::vector<task_ptr_t>::iterator iter = all_tasks.begin(); iter != all_tasks.end(); ++iter) {
131  (*iter)->kill(EN_TS_KILLED);
132  }
133  }
134 
139  static ptr_t create() { return std::make_shared<self_t>(); }
140 
154  int add_task(const task_ptr_t &task, time_t timeout_sec, int timeout_nsec) {
155  if (!task) {
156  assert(task);
158  }
159 
160  if (flags_ & flag_t::EN_TM_IN_RESET) {
161  return copp::COPP_EC_IN_RESET;
162  }
163 
164  // try to cast type
165  typedef typename container_t::value_type pair_type;
166  task_mgr_node<task_t> task_node;
167  task_node.task_ = task;
168  task_node.expired_time_.tv_sec = last_tick_time_.tv_sec + timeout_sec;
169  task_node.expired_time_.tv_nsec = last_tick_time_.tv_nsec + timeout_nsec;
170 
171  if (!task_node.task_) {
172  assert(task_node.task_);
174  }
175 
176  // lock before we will operator tasks_
177 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
178  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
179 #endif
180 
181  id_t task_id = task->get_id();
182  if (tasks_.end() != tasks_.find(task_id)) {
184  }
185 
186  // try to insert to container
187  if (false == tasks_.insert(pair_type(task_id, task_node)).second) {
189  }
190 
191  // add timeout controller
192  if (0 != timeout_sec || 0 != timeout_nsec) {
193  typedef typename std::multimap<detail::tickspec_t, id_t>::value_type pair_type;
194  task_timeout_checkpoints_.insert(pair_type(task_node.expired_time_, task_id));
195  }
196 
197  return copp::COPP_EC_SUCCESS;
198  }
199 
208  int add_task(const task_ptr_t &task) { return add_task(task, 0, 0); }
209 
215  int remove_task(id_t id) {
216  if (flags_ & flag_t::EN_TM_IN_RESET) {
217  return copp::COPP_EC_IN_RESET;
218  }
219 
220  task_ptr_t task_inst;
221  {
222 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
223  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
224 #endif
225 
226  typedef typename container_t::iterator iter_type;
227  iter_type iter = tasks_.find(id);
228  if (tasks_.end() == iter) return copp::COPP_EC_NOT_FOUND;
229 
230  // make sure running task be killed first
231  task_inst = COPP_MACRO_STD_MOVE(iter->second.task_);
232  tasks_.erase(iter);
233  }
234 
235  if (task_inst) {
236  EN_TASK_STATUS task_status = task_inst->get_status();
237  if (task_status > EN_TS_CREATED && task_status < EN_TS_DONE) {
238  return task_inst->kill(EN_TS_KILLED, NULL);
239  }
240  }
241 
242  return copp::COPP_EC_SUCCESS;
243  }
244 
250  task_ptr_t find_task(id_t id) {
251  if (flags_ & flag_t::EN_TM_IN_RESET) {
252  return task_ptr_t();
253  }
254 
255 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
256  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
257 #endif
258 
259  typedef typename container_t::iterator iter_type;
260  iter_type iter = tasks_.find(id);
261  if (tasks_.end() == iter) return task_ptr_t();
262 
263  return iter->second.task_;
264  }
265 
266  // int add_scheduler();
267  // int scheduling_once();
268  // int scheduling_loop();
269 
270  int start(id_t id, void *priv_data = NULL) {
271  if (flags_ & flag_t::EN_TM_IN_RESET) {
272  return copp::COPP_EC_IN_RESET;
273  }
274 
275  task_ptr_t task_inst;
276  {
277 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
278  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
279 #endif
280 
281  typedef typename container_t::iterator iter_type;
282  iter_type iter = tasks_.find(id);
283  if (tasks_.end() == iter) return copp::COPP_EC_NOT_FOUND;
284 
285  task_inst = iter->second.task_;
286  }
287 
288  // unlock and then run start
289  if (task_inst) {
290  int ret = task_inst->start(priv_data);
291 
292  // if task is finished, remove it
293  if (task_inst->get_status() >= EN_TS_DONE) {
294  // lock again and prepare to remove from tasks_
295 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
296  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
297 #endif
298  tasks_.erase(id);
299  }
300 
301  return ret;
302  } else {
304  }
305  }
306 
307  int resume(id_t id, void *priv_data = NULL) {
308  if (flags_ & flag_t::EN_TM_IN_RESET) {
309  return copp::COPP_EC_IN_RESET;
310  }
311 
312  task_ptr_t task_inst;
313  {
314 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
315  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
316 #endif
317 
318  typedef typename container_t::iterator iter_type;
319  iter_type iter = tasks_.find(id);
320  if (tasks_.end() == iter) return copp::COPP_EC_NOT_FOUND;
321 
322  task_inst = iter->second.task_;
323  }
324 
325  // unlock and then run resume
326  if (task_inst) {
327  int ret = task_inst->resume(priv_data);
328 
329  // if task is finished, remove it
330  if (task_inst->get_status() >= EN_TS_DONE) {
331  // lock again and prepare to remove from tasks_
332 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
333  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
334 #endif
335  tasks_.erase(id);
336  }
337 
338  return ret;
339  } else {
341  }
342  }
343 
344  int cancel(id_t id, void *priv_data = NULL) {
345  if (flags_ & flag_t::EN_TM_IN_RESET) {
346  return copp::COPP_EC_IN_RESET;
347  }
348 
349  task_ptr_t task_inst;
350  {
351 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
352  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
353 #endif
354 
355  typedef typename container_t::iterator iter_type;
356  iter_type iter = tasks_.find(id);
357  if (tasks_.end() == iter) {
359  }
360 
361  task_inst = COPP_MACRO_STD_MOVE(iter->second.task_);
362  tasks_.erase(iter); // remove from container
363  }
364 
365  // unlock and then run cancel
366  if (task_inst) {
367  return task_inst->cancel(priv_data);
368  } else {
370  }
371  }
372 
373  int kill(id_t id, enum EN_TASK_STATUS status, void *priv_data = NULL) {
374  if (flags_ & flag_t::EN_TM_IN_RESET) {
375  return copp::COPP_EC_IN_RESET;
376  }
377 
378  task_ptr_t task_inst;
379  {
380 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
381  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
382 #endif
383 
384  typedef typename container_t::iterator iter_type;
385  iter_type iter = tasks_.find(id);
386  if (tasks_.end() == iter) {
388  }
389 
390  task_inst = COPP_MACRO_STD_MOVE(iter->second.task_);
391  tasks_.erase(iter); // remove from container
392  }
393 
394  // unlock and then run kill
395  if (task_inst) {
396  return task_inst->kill(status, priv_data);
397  } else {
399  }
400  }
401 
402  int kill(id_t id, void *priv_data = NULL) { return kill(id, EN_TS_KILLED, priv_data); }
403 
412  int tick(time_t sec, int nsec = 0) {
413  detail::tickspec_t now_tick_time;
414  now_tick_time.tv_sec = sec;
415  now_tick_time.tv_nsec = nsec;
416 
417  // we will ignore tick when in a recursive call
418  flag_guard_t tick_flag(&flags_, flag_t::EN_TM_IN_TICK);
419  if (!tick_flag) {
420  return copp::COPP_EC_SUCCESS;
421  }
422 
423  if (flags_ & flag_t::EN_TM_IN_RESET) {
424  return copp::COPP_EC_IN_RESET;
425  }
426 
427  // first tick, init and reset task timeout
428  if (0 == last_tick_time_.tv_sec && 0 == last_tick_time_.tv_nsec) {
429  // hold lock
430 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
431  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
432 #endif
433 
434  std::multimap<detail::tickspec_t, id_t> real_checkpoints;
435  for (typename std::multimap<detail::tickspec_t, id_t>::iterator iter = task_timeout_checkpoints_.begin();
436  task_timeout_checkpoints_.end() != iter; ++iter) {
437  detail::tickspec_t new_checkpoint;
438  new_checkpoint.tv_sec = iter->first.tv_sec + sec;
439  new_checkpoint.tv_nsec = iter->first.tv_nsec + nsec;
440  typedef typename std::multimap<detail::tickspec_t, id_t>::value_type pair_type;
441  real_checkpoints.insert(pair_type(new_checkpoint, iter->second));
442  }
443 
444  real_checkpoints.swap(task_timeout_checkpoints_);
445  last_tick_time_ = now_tick_time;
446  return copp::COPP_EC_SUCCESS;
447  }
448 
449  // remove timeout tasks
450  while (false == task_timeout_checkpoints_.empty()) {
451  task_ptr_t task_inst;
452 
453  {
454  // hold lock
455 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
456  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
457 #endif
458 
459  typename std::multimap<detail::tickspec_t, id_t>::value_type &task_node = *task_timeout_checkpoints_.begin();
460  // all tasks those expired time less than now are timeout
461  if (now_tick_time <= task_node.first) {
462  break;
463  }
464 
465  // check expire time(may be changed)
466  typedef typename container_t::iterator iter_type;
467  iter_type iter = tasks_.find(task_node.second);
468  if (tasks_.end() != iter && iter->second.expired_time_ < now_tick_time) {
469  // task may be removed before
470  task_inst = COPP_MACRO_STD_MOVE(iter->second.task_);
471  tasks_.erase(iter); // remove from container
472  }
473 
474  // remove timeout checkpoint
475  task_timeout_checkpoints_.erase(task_timeout_checkpoints_.begin());
476  }
477 
478  // task call can not be used when lock is on
479  if (task_inst) {
480  task_inst->kill(EN_TS_TIMEOUT);
481  }
482  }
483 
484  last_tick_time_ = now_tick_time;
485  return copp::COPP_EC_SUCCESS;
486  }
487 
492  size_t get_tick_checkpoint_size() const UTIL_CONFIG_NOEXCEPT { return task_timeout_checkpoints_.size(); }
493 
498  size_t get_task_size() const UTIL_CONFIG_NOEXCEPT { return tasks_.size(); }
499 
504  detail::tickspec_t get_last_tick_time() const UTIL_CONFIG_NOEXCEPT { return last_tick_time_; }
505 
510  inline const container_t &get_container() const UTIL_CONFIG_NOEXCEPT { return tasks_; }
511 
516  inline const std::multimap<detail::tickspec_t, id_t> &get_checkpoints() const UTIL_CONFIG_NOEXCEPT {
517  return task_timeout_checkpoints_;
518  }
519 
520  private:
521  container_t tasks_;
523  std::multimap<detail::tickspec_t, id_t> task_timeout_checkpoints_;
524 
525 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
527 #endif
528  int flags_;
529  };
530 } // namespace cotask
531 
532 
533 #endif /* TASK_MANAGER_H_ */
COPP_EC_ALREADY_EXIST.
Definition: errno.h:29
int kill(id_t id, enum EN_TASK_STATUS status, void *priv_data=NULL)
Definition: task_manager.h:373
COPP_EC_NOT_FOUND.
Definition: errno.h:28
size_t get_tick_checkpoint_size() const UTIL_CONFIG_NOEXCEPT
get timeout checkpoint number in this manager
Definition: task_manager.h:492
int tick(time_t sec, int nsec=0)
active tick event and deal with clock
Definition: task_manager.h:412
task_ptr_t find_task(id_t id)
find task by id
Definition: task_manager.h:250
int cancel(id_t id, void *priv_data=NULL)
Definition: task_manager.h:344
COPP_EC_SUCCESS.
Definition: errno.h:12
detail::tickspec_t expired_time_
Definition: task_manager.h:48
COPP_EC_EXTERNAL_INSERT_FAILED.
Definition: errno.h:15
EN_TASK_STATUS
Definition: task_impl.h:28
const container_t & get_container() const UTIL_CONFIG_NOEXCEPT
task container, this api is just used for provide information to users
Definition: task_manager.h:510
const std::multimap< detail::tickspec_t, id_t > & get_checkpoints() const UTIL_CONFIG_NOEXCEPT
get all task checkpoints, this api is just used for provide information to users
Definition: task_manager.h:516
std::shared_ptr< self_t > ptr_t
Definition: task_manager.h:63
detail::tickspec_t last_tick_time_
Definition: task_manager.h:522
friend bool operator==(const tickspec_t &l, const tickspec_t &r)
Definition: task_manager.h:36
#define COPP_MACRO_STD_MOVE(x)
Definition: features.h:185
int kill(id_t id, void *priv_data=NULL)
Definition: task_manager.h:402
TTaskContainer container_t
Definition: task_manager.h:59
int add_task(const task_ptr_t &task)
add task to manager please make the task has method of get_id() and will return a unique id ...
Definition: task_manager.h:208
flag_guard_t(int *flags, typename flag_t::type v)
Definition: task_manager.h:77
int add_task(const task_ptr_t &task, time_t timeout_sec, int timeout_nsec)
add task to manager please make the task has method of get_id() and will return a unique id ...
Definition: task_manager.h:154
task_manager< task_t, container_t > self_t
Definition: task_manager.h:62
int start(id_t id, void *priv_data=NULL)
Definition: task_manager.h:270
task manager
Definition: task_manager.h:56
util::lock::spin_lock action_lock_
Definition: task_manager.h:526
COPP_EC_CAST_FAILED.
Definition: errno.h:31
friend bool operator<=(const tickspec_t &l, const tickspec_t &r)
Definition: task_manager.h:38
COPP_EC_IN_RESET.
Definition: errno.h:17
std::multimap< detail::tickspec_t, id_t > task_timeout_checkpoints_
Definition: task_manager.h:523
detail::tickspec_t get_last_tick_time() const UTIL_CONFIG_NOEXCEPT
get last tick time
Definition: task_manager.h:504
task_t::ptr_t task_ptr_t
Definition: task_manager.h:61
TTask::ptr_t task_ptr_t
Definition: task_manager.h:46
int resume(id_t id, void *priv_data=NULL)
Definition: task_manager.h:307
task_t::id_t id_t
Definition: task_manager.h:60
static ptr_t create()
create a new task manager
Definition: task_manager.h:139
size_t get_task_size() const UTIL_CONFIG_NOEXCEPT
get task number in this manager
Definition: task_manager.h:498
std::shared_ptr< cli::cmd_option_value > value_type
Definition: cmd_option.h:53
friend bool operator<(const tickspec_t &l, const tickspec_t &r)
Definition: task_manager.h:32
int remove_task(id_t id)
remove task in this manager
Definition: task_manager.h:215
COPP_EC_ARGS_ERROR.
Definition: errno.h:30