1 #ifndef SUBORDINATION_PPL_BASIC_PIPELINE_HH 2 #define SUBORDINATION_PPL_BASIC_PIPELINE_HH 9 #include <unistdx/base/simple_lock> 10 #include <unistdx/base/spin_mutex> 11 #include <unistdx/ipc/process> 12 #include <unistdx/ipc/thread_semaphore> 13 #include <unistdx/util/system> 15 #include <subordination/base/container_traits.hh> 16 #include <subordination/base/queue_popper.hh> 17 #include <subordination/base/queue_pusher.hh> 18 #include <subordination/base/thread_name.hh> 19 #include <subordination/kernel/kernel_type.hh> 20 #include <subordination/ppl/pipeline_base.hh> 21 #include <subordination/ppl/thread_context.hh> 26 graceful_shutdown(
int ret);
34 class Traits=queue_traits<Kernels>,
36 class Mutex=sys::spin_mutex,
37 class Lock=sys::simple_lock<Mutex>,
38 class Semaphore=sys::thread_semaphore
43 typedef T kernel_type;
44 typedef Kernels kernel_pool;
45 typedef Threads thread_pool;
46 typedef Mutex mutex_type;
47 typedef Lock lock_type;
48 typedef Semaphore sem_type;
49 typedef Traits traits_type;
58 mutable mutex_type _mutex;
59 mutable sem_type _semaphore;
67 _threads(std::max(1u, concurrency)),
74 _kernels(std::move(rhs._kernels)),
75 _threads(std::move(rhs._threads)),
85 this->collect_kernels(std::back_inserter(sack));
94 send(kernel_type* k) {
96 this->log(
"send _", *k);
98 lock_type lock(this->_mutex);
99 traits_type::push(this->_kernels, k);
100 this->_semaphore.notify_one();
104 send(kernel_type** kernels,
size_t n) {
105 lock_type lock(this->_mutex);
111 std::mem_fn(&kernel_type::moves_downstream)
115 std::copy_n(kernels, n, queue_pusher(this->_kernels));
116 this->_semaphore.notify_one();
121 this->setstate(pipeline_state::started);
122 unsigned thread_no = this->_number;
125 [
this,thread_no] () {
127 sys::this_process::name(this->_name);
130 this_thread::name = this->_name;
131 this_thread::number = thread_no;
132 this->run(&this_thread::context);
141 lock_type lock(this->_mutex);
143 const size_t nthreads = this->_threads.size();
144 for (
size_t i=0; i<nthreads; ++i) {
145 this->_semaphore.notify_one();
155 if (thr.joinable()) {
162 concurrency()
const noexcept {
163 return this->_threads.size();
170 this->setstate(pipeline_state::stopped);
179 Thread_context_guard lock(*context);
180 context->register_thread();
187 this->collect_kernels(*context);
195 collect_kernels(It sack) {
200 [sack] (kernel_type* rhs) { rhs->mark_as_deleted(sack); }
210 Thread_context_guard lock(context);
215 this->collect_kernels(std::back_inserter(sack));
227 #endif // vim:filetype=cpp
Definition: basic_pipeline.hh:40
Definition: pipeline_base.hh:17
Definition: thread_context.hh:10
Input iterator that removes element from the container on increment.
Definition: queue_popper.hh:20