scc  2022.4.0
SystemC components library
thread_pool.h
1 /*******************************************************************************
2  * Copyright 2021-2022 MINRES Technologies GmbH
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *******************************************************************************/
16 
17 #ifndef _COMMON_UTIL_THREAD_POOL_H_
18 #define _COMMON_UTIL_THREAD_POOL_H_
19 
20 #include <deque>
21 #include <future>
22 #include <thread>
23 #include <type_traits>
24 #include <vector>
25 
31 namespace util {
33 struct thread_pool {
34  // the mutex, condition variable and deque form a single thread-safe triggered queue of tasks:
35  std::mutex m;
36  std::condition_variable v;
37  // note that a packaged_task<void> can store a packaged_task<R>:
38  std::deque<std::packaged_task<void()>> work;
39  // this holds futures representing the worker threads being done:
40  std::vector<std::future<void>> finished;
41  // queue( lambda ) will enqueue the lambda into the tasks for the threads
42  // to use. A future of the type the lambda returns is given to let you get
43  // the result out.
44  // template<class F, class R=typename std::result_of<F>::type>
45  // std::future<R> queue(F&& f) {
46  template <class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
47  using return_type = typename std::result_of<F(Args...)>::type;
48  // wrap the function object into a packaged task, splitting
49  // execution from the return value:
50  std::packaged_task<return_type()> p(std::forward<F>(f));
51  auto r = p.get_future(); // get the return value before we hand off the task
52  {
53  std::unique_lock<std::mutex> l(m);
54  work.emplace_back(std::move(p)); // store the task<R()> as a task<void()>
55  }
56  v.notify_one(); // wake a thread to work on the task
57  return r; // return the future result of the task
58  }
59  // start N threads in the thread pool.
60  void start(std::size_t N = 1) {
61  for(std::size_t i = 0; i < N; ++i) {
62  // each thread is a std::async running this->thread_task():
63  finished.push_back(std::async(std::launch::async, [this] { thread_task(); }));
64  }
65  }
66  // abort() cancels all non-started tasks, and tells every working thread
67  // stop running, and waits for them to finish up.
68  void abort() {
69  cancel_pending();
70  finish();
71  }
72  // cancel_pending() merely cancels all non-started tasks:
73  void cancel_pending() {
74  std::unique_lock<std::mutex> l(m);
75  work.clear();
76  }
77  // finish enques a "stop the thread" message for every thread, then waits for them:
78  void finish() {
79  {
80  std::unique_lock<std::mutex> l(m);
81  for(auto&& unused : finished) {
82  work.push_back({});
83  }
84  }
85  v.notify_all();
86  finished.clear();
87  }
88  ~thread_pool() { finish(); }
89 
90 private:
91  void thread_task() {
92  while(true) {
93  std::packaged_task<void()> f;
94  {
95  std::unique_lock<std::mutex> l(m);
96  if(work.empty()) {
97  v.wait(l, [&] { return !work.empty(); });
98  }
99  f = std::move(work.front());
100  work.pop_front();
101  }
102  if(!f.valid())
103  return;
104  f();
105  }
106  }
107 };
108 } // namespace util
110 #endif /* _COMMON_UTIL_THREAD_POOL_H_ */
SCC common utilities.
Definition: bit_field.h:30
a simple thread pool
Definition: thread_pool.h:33