libcopp  1.2.0
task_manager.h
Go to the documentation of this file.
1 /*
2  * task_manager.h
3  *
4  * Created on: 2014年6月16日
5  * Author: owent
6  *
7  * Released under the MIT license
8  */
9 
10 #ifndef COTASK_TASK_MANAGER_H
11 #define COTASK_TASK_MANAGER_H
12 
13 #pragma once
14 
15 #include <algorithm>
16 #include <assert.h>
17 #include <ctime>
18 #include <set>
19 #include <stdint.h>
20 #include <vector>
21 
22 #if (defined(__cplusplus) && __cplusplus >= 201103L) || (defined(_MSVC_LANG) && _MSVC_LANG >= 201103L)
23 #include <unordered_map>
24 #define COTASK_MACRO_MANAGER_USING_UNORDERED_MAP 1
25 #else
26 #include <map>
27 #endif
28 
29 #include <libcotask/task_macros.h>
30 
31 
32 namespace cotask {
33 
34  namespace detail {
35  struct tickspec_t {
36  time_t tv_sec; /* Seconds. */
37  int tv_nsec; /* Nanoseconds. */
38 
39  friend bool operator==(const tickspec_t &l, const tickspec_t &r) { return l.tv_sec == r.tv_sec && l.tv_nsec == r.tv_nsec; }
40 
41  friend bool operator!=(const tickspec_t &l, const tickspec_t &r) { return l.tv_sec != r.tv_sec || l.tv_nsec != r.tv_nsec; }
42 
43  friend bool operator<(const tickspec_t &l, const tickspec_t &r) {
44  return (l.tv_sec != r.tv_sec) ? l.tv_sec < r.tv_sec : l.tv_nsec < r.tv_nsec;
45  }
46 
47  friend bool operator<=(const tickspec_t &l, const tickspec_t &r) {
48  return (l.tv_sec != r.tv_sec) ? l.tv_sec <= r.tv_sec : l.tv_nsec <= r.tv_nsec;
49  }
50  };
51 
52  template <typename TTask>
53  struct task_timer_node {
55  typename TTask::id_t task_id;
56 
57  friend bool operator==(const task_timer_node &l, const task_timer_node &r) {
58  return l.expired_time == r.expired_time && l.task_id == r.task_id;
59  }
60 
61  friend bool operator!=(const task_timer_node &l, const task_timer_node &r) {
62  return l.expired_time != r.expired_time || l.task_id != r.task_id;
63  }
64 
65  friend bool operator<(const task_timer_node &l, const task_timer_node &r) {
66  if (l.expired_time != r.expired_time) {
67  return l.expired_time < r.expired_time;
68  }
69 
70  return l.task_id < r.task_id;
71  }
72 
73  friend bool operator<=(const task_timer_node &l, const task_timer_node &r) {
74  if (l.expired_time != r.expired_time) {
75  return l.expired_time <= r.expired_time;
76  }
77 
78  return l.task_id <= r.task_id;
79  }
80  };
81 
82  template <typename TTask>
84  typedef typename TTask::ptr_t task_ptr_t;
85 
86  task_ptr_t task_;
87  typename std::set<task_timer_node<TTask> >::iterator timer_node;
88  };
89 
90  } // namespace detail
91 
95  template <typename TTask,
96 #if defined(COTASK_MACRO_MANAGER_USING_UNORDERED_MAP)
97  typename TTaskContainer = std::unordered_map<typename TTask::id_t, detail::task_manager_node<TTask> >
98 #else
99  typename TTaskContainer = std::map<typename TTask::id_t, detail::task_manager_node<TTask> >
100 #endif
101  >
102  class task_manager {
103  public:
104  typedef TTask task_t;
105  typedef TTaskContainer container_t;
106  typedef typename task_t::id_t id_t;
107  typedef typename task_t::ptr_t task_ptr_t;
109  typedef std::shared_ptr<self_t> ptr_t;
110 
111  struct flag_t {
112  enum type {
113  EN_TM_NONE = 0x00,
114  EN_TM_IN_TICK = 0x01,
115  EN_TM_IN_RESET = 0x02,
116  };
117  };
118 
119  private:
120  struct flag_guard_t {
121  int * data_;
122  typename flag_t::type flag_;
123  inline flag_guard_t(int *flags, typename flag_t::type v) : data_(flags), flag_(v) {
124  if (NULL == data_ || (*data_ & flag_)) {
125  flag_ = flag_t::EN_TM_NONE;
126  data_ = NULL;
127  } else {
128  (*data_) |= flag_;
129  }
130  }
131  inline ~flag_guard_t() {
132  if (*this) {
133  (*data_) &= ~flag_;
134  }
135  }
136 
137  inline operator bool() { return NULL != data_ && flag_t::EN_TM_NONE != flag_; }
138  };
139 
140  public:
141  task_manager() : flags_(0) {
142  last_tick_time_.tv_sec = 0;
143  last_tick_time_.tv_nsec = 0;
144  }
145 
147  // safe remove all task
148  reset();
149  }
150 
151  void reset() {
152  flag_guard_t reset_flag(&flags_, flag_t::EN_TM_IN_RESET);
153  if (!reset_flag) {
154  return;
155  }
156 
157  std::vector<task_ptr_t> all_tasks;
158  // first, lock and reset all data
159  {
160 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
161  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
162 #endif
163 
164  for (typename container_t::iterator iter = tasks_.begin(); iter != tasks_.end(); ++iter) {
165  all_tasks.push_back(iter->second.task_);
166  remove_timeout_timer(iter->second);
167  }
168 
169  tasks_.clear();
170  task_timeout_timer_.clear();
171  flags_ = 0;
172  last_tick_time_.tv_sec = 0;
173  last_tick_time_.tv_nsec = 0;
174  }
175 
176  // then, kill all tasks
177  for (typename std::vector<task_ptr_t>::iterator iter = all_tasks.begin(); iter != all_tasks.end(); ++iter) {
178  if (!(*iter)->is_exiting()) {
179  (*iter)->kill(EN_TS_KILLED);
180  }
181  }
182  }
183 
188  static ptr_t create() { return std::make_shared<self_t>(); }
189 
203  int add_task(const task_ptr_t &task, time_t timeout_sec, int timeout_nsec) {
204  if (!task) {
205  assert(task);
207  }
208 
209  if (flags_ & flag_t::EN_TM_IN_RESET) {
210  return copp::COPP_EC_IN_RESET;
211  }
212 
213  // try to cast type
214  typedef typename container_t::value_type pair_type;
216  task_node.task_ = task;
217  task_node.timer_node = task_timeout_timer_.end();
218 
219  if (!task_node.task_) {
220  assert(task_node.task_);
222  }
223 
224  // lock before we will operator tasks_
225 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
226  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
227 #endif
228 
229  id_t task_id = task->get_id();
230  if (tasks_.end() != tasks_.find(task_id)) {
232  }
233 
234  // try to insert to container
235  std::pair<typename container_t::iterator, bool> res = tasks_.insert(pair_type(task_id, task_node));
236  if (false == res.second) {
238  }
239 
240  // add timeout controller
241  set_timeout_timer(res.first->second, timeout_sec, timeout_nsec);
242  return copp::COPP_EC_SUCCESS;
243  }
244 
253  int add_task(const task_ptr_t &task) { return add_task(task, 0, 0); }
254 
269  int set_timeout(id_t id, time_t timeout_sec, int timeout_nsec) {
270  if (flags_ & flag_t::EN_TM_IN_RESET) {
271  return copp::COPP_EC_IN_RESET;
272  }
273 
274  {
275 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
276  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
277 #endif
278 
279  typedef typename container_t::iterator iter_type;
280  iter_type iter = tasks_.find(id);
281  if (tasks_.end() == iter) return copp::COPP_EC_NOT_FOUND;
282 
283  set_timeout_timer(iter->second, timeout_sec, timeout_nsec);
284  }
285 
286  return copp::COPP_EC_SUCCESS;
287  }
288 
294  int remove_task(id_t id) {
295  if (flags_ & flag_t::EN_TM_IN_RESET) {
296  return copp::COPP_EC_IN_RESET;
297  }
298 
299  task_ptr_t task_inst;
300  {
301 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
302  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
303 #endif
304 
305  typedef typename container_t::iterator iter_type;
306  iter_type iter = tasks_.find(id);
307  if (tasks_.end() == iter) return copp::COPP_EC_NOT_FOUND;
308 
309  // make sure running task be killed first
310  task_inst = COPP_MACRO_STD_MOVE(iter->second.task_);
311 
312  remove_timeout_timer(iter->second);
313  tasks_.erase(iter);
314  }
315 
316  if (task_inst) {
317  EN_TASK_STATUS task_status = task_inst->get_status();
318  if (task_status > EN_TS_CREATED && task_status < EN_TS_DONE) {
319  return task_inst->kill(EN_TS_KILLED, NULL);
320  }
321  }
322 
323  return copp::COPP_EC_SUCCESS;
324  }
325 
331  task_ptr_t find_task(id_t id) {
332  if (flags_ & flag_t::EN_TM_IN_RESET) {
333  return task_ptr_t();
334  }
335 
336 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
337  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
338 #endif
339 
340  typedef typename container_t::iterator iter_type;
341  iter_type iter = tasks_.find(id);
342  if (tasks_.end() == iter) return task_ptr_t();
343 
344  return iter->second.task_;
345  }
346 
347  // int add_scheduler();
348  // int scheduling_once();
349  // int scheduling_loop();
350 
351  int start(id_t id, void *priv_data = NULL) {
352  if (flags_ & flag_t::EN_TM_IN_RESET) {
353  return copp::COPP_EC_IN_RESET;
354  }
355 
356  task_ptr_t task_inst;
357  {
358 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
359  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
360 #endif
361 
362  typedef typename container_t::iterator iter_type;
363  iter_type iter = tasks_.find(id);
364  if (tasks_.end() == iter) return copp::COPP_EC_NOT_FOUND;
365 
366  task_inst = iter->second.task_;
367  }
368 
369  // unlock and then run start
370  if (task_inst) {
371  int ret = task_inst->start(priv_data);
372 
373  // if task is finished, remove it
374  if (task_inst->get_status() >= EN_TS_DONE) {
375  remove_task(id);
376  }
377 
378  return ret;
379  } else {
381  }
382  }
383 
384  int resume(id_t id, void *priv_data = NULL) {
385  if (flags_ & flag_t::EN_TM_IN_RESET) {
386  return copp::COPP_EC_IN_RESET;
387  }
388 
389  task_ptr_t task_inst;
390  {
391 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
392  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
393 #endif
394 
395  typedef typename container_t::iterator iter_type;
396  iter_type iter = tasks_.find(id);
397  if (tasks_.end() == iter) return copp::COPP_EC_NOT_FOUND;
398 
399  task_inst = iter->second.task_;
400  }
401 
402  // unlock and then run resume
403  if (task_inst) {
404  int ret = task_inst->resume(priv_data);
405 
406  // if task is finished, remove it
407  if (task_inst->get_status() >= EN_TS_DONE) {
408  remove_task(id);
409  }
410 
411  return ret;
412  } else {
414  }
415  }
416 
417  int cancel(id_t id, void *priv_data = NULL) {
418  if (flags_ & flag_t::EN_TM_IN_RESET) {
419  return copp::COPP_EC_IN_RESET;
420  }
421 
422  task_ptr_t task_inst;
423  {
424 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
425  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
426 #endif
427 
428  typedef typename container_t::iterator iter_type;
429  iter_type iter = tasks_.find(id);
430  if (tasks_.end() == iter) {
432  }
433 
434  task_inst = COPP_MACRO_STD_MOVE(iter->second.task_);
435 
436  remove_timeout_timer(iter->second);
437  tasks_.erase(iter); // remove from container
438  }
439 
440  // unlock and then run cancel
441  if (task_inst) {
442  return task_inst->cancel(priv_data);
443  } else {
445  }
446  }
447 
448  int kill(id_t id, enum EN_TASK_STATUS status, void *priv_data = NULL) {
449  if (flags_ & flag_t::EN_TM_IN_RESET) {
450  return copp::COPP_EC_IN_RESET;
451  }
452 
453  task_ptr_t task_inst;
454  {
455 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
456  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
457 #endif
458 
459  typedef typename container_t::iterator iter_type;
460  iter_type iter = tasks_.find(id);
461  if (tasks_.end() == iter) {
463  }
464 
465  task_inst = COPP_MACRO_STD_MOVE(iter->second.task_);
466 
467  remove_timeout_timer(iter->second);
468  tasks_.erase(iter); // remove from container
469  }
470 
471  // unlock and then run kill
472  if (task_inst) {
473  return task_inst->kill(status, priv_data);
474  } else {
476  }
477  }
478 
479  int kill(id_t id, void *priv_data = NULL) { return kill(id, EN_TS_KILLED, priv_data); }
480 
489  int tick(time_t sec, int nsec = 0) {
490  detail::tickspec_t now_tick_time;
491  // time can not be back
492  if (sec < last_tick_time_.tv_sec || (sec == last_tick_time_.tv_sec && nsec <= last_tick_time_.tv_nsec)) {
493  return 0;
494  }
495 
496  now_tick_time.tv_sec = sec;
497  now_tick_time.tv_nsec = nsec;
498 
499  // we will ignore tick when in a recursive call
500  flag_guard_t tick_flag(&flags_, flag_t::EN_TM_IN_TICK);
501  if (!tick_flag) {
502  return copp::COPP_EC_SUCCESS;
503  }
504 
505  if (flags_ & flag_t::EN_TM_IN_RESET) {
506  return copp::COPP_EC_IN_RESET;
507  }
508 
509  // first tick, init and reset task timeout
510  if (0 == last_tick_time_.tv_sec && 0 == last_tick_time_.tv_nsec) {
511  // hold lock
512 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
513  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
514 #endif
515 
516  std::set<detail::task_timer_node<task_t> > real_checkpoints;
517  for (typename std::set<detail::task_timer_node<task_t> >::iterator iter = task_timeout_timer_.begin();
518  task_timeout_timer_.end() != iter; ++iter) {
519  detail::task_timer_node<task_t> new_checkpoint = (*iter);
520  new_checkpoint.expired_time.tv_sec += sec;
521  new_checkpoint.expired_time.tv_nsec += nsec;
522  real_checkpoints.insert(new_checkpoint);
523  }
524 
525  task_timeout_timer_.swap(task_timeout_timer_);
526  last_tick_time_ = now_tick_time;
527  return copp::COPP_EC_SUCCESS;
528  }
529 
530  // remove timeout tasks
531  while (false == task_timeout_timer_.empty()) {
532  task_ptr_t task_inst;
533 
534  {
535  // hold lock
536 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
537  util::lock::lock_holder<util::lock::spin_lock> lock_guard(action_lock_);
538 #endif
539 
540  const typename std::set<detail::task_timer_node<task_t> >::value_type &timer_node = *task_timeout_timer_.begin();
541  // all tasks those expired time less than now are timeout
542  if (now_tick_time <= timer_node.expired_time) {
543  break;
544  }
545 
546  // check expire time(may be changed)
547  typedef typename container_t::iterator iter_type;
548 
549  iter_type iter = tasks_.find(timer_node.task_id);
550 
551  if (tasks_.end() != iter) {
552  // task may be removed before
553  task_inst = COPP_MACRO_STD_MOVE(iter->second.task_);
554 
555  remove_timeout_timer(iter->second);
556  tasks_.erase(iter); // remove from container
557  }
558  }
559 
560  // task call can not be used when lock is on
561  if (task_inst && !task_inst->is_exiting()) {
562  task_inst->kill(EN_TS_TIMEOUT);
563  }
564  }
565 
566  last_tick_time_ = now_tick_time;
567  return copp::COPP_EC_SUCCESS;
568  }
569 
574  size_t get_tick_checkpoint_size() const UTIL_CONFIG_NOEXCEPT { return task_timeout_timer_.size(); }
575 
580  size_t get_task_size() const UTIL_CONFIG_NOEXCEPT { return tasks_.size(); }
581 
586  detail::tickspec_t get_last_tick_time() const UTIL_CONFIG_NOEXCEPT { return last_tick_time_; }
587 
592  inline const container_t &get_container() const UTIL_CONFIG_NOEXCEPT { return tasks_; }
593 
598  inline const std::set<detail::task_timer_node<task_t> > &get_checkpoints() const UTIL_CONFIG_NOEXCEPT {
599  return task_timeout_timer_;
600  }
601 
602  private:
603  void set_timeout_timer(detail::task_manager_node<task_t> &node, time_t timeout_sec, int timeout_nsec) {
604  remove_timeout_timer(node);
605 
606  if (timeout_sec <= 0 && timeout_nsec <= 0) {
607  return;
608  }
609 
610  if (!node.task_) {
611  return;
612  }
613 
615  timer_node.task_id = node.task_->get_id();
616  timer_node.expired_time.tv_sec = last_tick_time_.tv_sec + timeout_sec;
617  timer_node.expired_time.tv_nsec = last_tick_time_.tv_nsec + timeout_nsec;
618 
619  std::pair<typename std::set<detail::task_timer_node<task_t> >::iterator, bool> res = task_timeout_timer_.insert(timer_node);
620  if (res.second) {
621  node.timer_node = res.first;
622  }
623  }
624 
626  if (node.timer_node != task_timeout_timer_.end()) {
627  task_timeout_timer_.erase(node.timer_node);
628  node.timer_node = task_timeout_timer_.end();
629  }
630  }
631 
632  private:
633  container_t tasks_;
635  std::set<detail::task_timer_node<task_t> > task_timeout_timer_;
636 
637 #if !defined(PROJECT_DISABLE_MT) || !(PROJECT_DISABLE_MT)
639 #endif
640  int flags_;
641  };
642 } // namespace cotask
643 
644 
645 #endif /* TASK_MANAGER_H_ */
COPP_EC_ALREADY_EXIST.
Definition: errno.h:29
std::set< detail::task_timer_node< task_t > > task_timeout_timer_
Definition: task_manager.h:635
friend bool operator==(const task_timer_node &l, const task_timer_node &r)
Definition: task_manager.h:57
int kill(id_t id, enum EN_TASK_STATUS status, void *priv_data=NULL)
Definition: task_manager.h:448
COPP_EC_NOT_FOUND.
Definition: errno.h:28
size_t get_tick_checkpoint_size() const UTIL_CONFIG_NOEXCEPT
get timeout checkpoint number in this manager
Definition: task_manager.h:574
int tick(time_t sec, int nsec=0)
active tick event and deal with clock
Definition: task_manager.h:489
void remove_timeout_timer(detail::task_manager_node< task_t > &node)
Definition: task_manager.h:625
task_ptr_t find_task(id_t id)
find task by id
Definition: task_manager.h:331
int cancel(id_t id, void *priv_data=NULL)
Definition: task_manager.h:417
COPP_EC_SUCCESS.
Definition: errno.h:12
COPP_EC_EXTERNAL_INSERT_FAILED.
Definition: errno.h:15
friend bool operator<=(const task_timer_node &l, const task_timer_node &r)
Definition: task_manager.h:73
EN_TASK_STATUS
Definition: task_impl.h:28
const container_t & get_container() const UTIL_CONFIG_NOEXCEPT
task container, this api is just used for provide information to users
Definition: task_manager.h:592
std::set< task_timer_node< TTask > >::iterator timer_node
Definition: task_manager.h:87
std::shared_ptr< self_t > ptr_t
Definition: task_manager.h:109
detail::tickspec_t last_tick_time_
Definition: task_manager.h:634
void set_timeout_timer(detail::task_manager_node< task_t > &node, time_t timeout_sec, int timeout_nsec)
Definition: task_manager.h:603
friend bool operator==(const tickspec_t &l, const tickspec_t &r)
Definition: task_manager.h:39
#define COPP_MACRO_STD_MOVE(x)
Definition: features.h:185
int kill(id_t id, void *priv_data=NULL)
Definition: task_manager.h:479
TTaskContainer container_t
Definition: task_manager.h:105
int add_task(const task_ptr_t &task)
add task to manager please make the task has method of get_id() and will return a unique id ...
Definition: task_manager.h:253
flag_guard_t(int *flags, typename flag_t::type v)
Definition: task_manager.h:123
friend bool operator!=(const tickspec_t &l, const tickspec_t &r)
Definition: task_manager.h:41
const std::set< detail::task_timer_node< task_t > > & get_checkpoints() const UTIL_CONFIG_NOEXCEPT
get all task checkpoints, this api is just used for provide information to users
Definition: task_manager.h:598
int add_task(const task_ptr_t &task, time_t timeout_sec, int timeout_nsec)
add task to manager please make the task has method of get_id() and will return a unique id ...
Definition: task_manager.h:203
task_manager< task_t, container_t > self_t
Definition: task_manager.h:108
int start(id_t id, void *priv_data=NULL)
Definition: task_manager.h:351
util::lock::spin_lock action_lock_
Definition: task_manager.h:638
COPP_EC_CAST_FAILED.
Definition: errno.h:31
friend bool operator!=(const task_timer_node &l, const task_timer_node &r)
Definition: task_manager.h:61
friend bool operator<=(const tickspec_t &l, const tickspec_t &r)
Definition: task_manager.h:47
COPP_EC_IN_RESET.
Definition: errno.h:17
detail::tickspec_t get_last_tick_time() const UTIL_CONFIG_NOEXCEPT
get last tick time
Definition: task_manager.h:586
task_t::ptr_t task_ptr_t
Definition: task_manager.h:107
int resume(id_t id, void *priv_data=NULL)
Definition: task_manager.h:384
static ptr_t create()
create a new task manager
Definition: task_manager.h:188
size_t get_task_size() const UTIL_CONFIG_NOEXCEPT
get task number in this manager
Definition: task_manager.h:580
std::shared_ptr< cli::cmd_option_value > value_type
Definition: cmd_option.h:53
int set_timeout(id_t id, time_t timeout_sec, int timeout_nsec)
set or update task timeout
Definition: task_manager.h:269
friend bool operator<(const tickspec_t &l, const tickspec_t &r)
Definition: task_manager.h:43
int remove_task(id_t id)
remove task in this manager
Definition: task_manager.h:294
friend bool operator<(const task_timer_node &l, const task_timer_node &r)
Definition: task_manager.h:65
COPP_EC_ARGS_ERROR.
Definition: errno.h:30