1 #ifndef SUBORDINATION_PPL_BASIC_SOCKET_PIPELINE_HH 2 #define SUBORDINATION_PPL_BASIC_SOCKET_PIPELINE_HH 10 #include <unordered_map> 13 #include <unistdx/base/log_message> 17 #include <unistdx/io/fildesbuf> 18 #include <unistdx/io/poller> 19 #include <unistdx/net/pstream> 21 #include <subordination/base/container_traits.hh> 22 #include <subordination/base/static_lock.hh> 23 #include <subordination/kernel/kstream.hh> 24 #include <subordination/ppl/basic_handler.hh> 25 #include <subordination/ppl/basic_pipeline.hh> 31 class Traits=queue_traits<Kernels>,
33 using Proxy_pipeline_base = basic_pipeline<T, Kernels, Traits, Threads,
48 typedef typename handler_container_type::const_iterator
49 handler_const_iterator;
51 typedef typename handler_type::time_point time_point;
52 typedef typename handler_type::duration duration;
55 using typename base_pipeline::kernel_type;
56 using typename base_pipeline::mutex_type;
57 using typename base_pipeline::lock_type;
58 using typename base_pipeline::sem_type;
59 using typename base_pipeline::kernel_pool;
65 mutex_type* _othermutex =
nullptr;
69 duration _start_timeout = duration::zero();
75 _start_timeout(rhs._start_timeout) { }
79 this->emplace_notify_handler(std::make_shared<basic_handler>());
90 set_other_mutex(mutex_type* rhs) noexcept {
91 this->_othermutex = rhs;
95 other_mutex() noexcept {
96 return this->_othermutex;
101 return &this->_mutex;
108 return this->_semaphore;
111 inline const sem_type&
112 poller()
const noexcept {
113 return this->_semaphore;
120 this->log(
"add _, ev=_", *ptr, ev);
121 this->_handlers[ev.fd()] = ptr;
122 this->poller().insert(ev);
128 this->emplace_handler(ev, std::static_pointer_cast<handler_type>(ptr));
133 sys::fd_type fd = this->poller().pipe_in();
134 this->log(
"add _", *ptr);
135 this->_handlers.
emplace(fd, ptr);
141 this->emplace_notify_handler(
142 std::static_pointer_cast<handler_type>(ptr)
149 while (!this->has_stopped()) {
150 bool timeout =
false;
151 if (this->_start_timeout > duration::zero()) {
152 handler_const_iterator result =
153 this->handler_with_min_start_time_point();
154 if (result != this->_handlers.
end()) {
156 const time_point tp = result->second->start_time_point()
157 + this->_start_timeout;
158 this->poller().wait_until(lock, tp);
161 auto process = [
this,timeout] () {
162 this->process_kernels();
163 this->handle_events();
164 this->flush_buffers(timeout);
165 return this->has_stopped();
170 this->poller().wait(lock, process);
190 set_start_timeout(
const duration& rhs) noexcept {
191 this->_start_timeout = rhs;
195 process_kernels() = 0;
200 flush_buffers(
bool timeout) {
201 const time_point now = timeout
203 : time_point(duration::zero());
204 handler_const_iterator first = this->_handlers.
begin();
205 handler_const_iterator last = this->_handlers.
end();
206 while (first != last) {
207 handler_type& h = *first->second;
208 if (h.has_stopped() || (timeout && this->is_timed_out(
212 this->log(
"remove _ (_)", h, h.has_stopped() ?
"stop" :
"timeout");
213 h.remove(this->poller());
214 first = this->_handlers.
erase(first);
216 first->second->flush();
222 handler_const_iterator
223 handler_with_min_start_time_point() const noexcept {
224 handler_const_iterator first = this->_handlers.
begin();
225 handler_const_iterator last = this->_handlers.
end();
226 handler_const_iterator result = last;
227 while (first != last) {
228 handler_type& h = *first->second;
229 if (h.is_starting() && h.has_start_time_point()) {
230 if (result == last) {
233 const auto old_t = result->second->start_time_point();
234 const auto new_t = h.start_time_point();
242 if (result != last) {
243 this->log(
"min _", *result->second);
250 for (
const sys::epoll_event& ev : this->poller()) {
251 auto result = this->_handlers.
find(ev.fd());
252 if (result == this->_handlers.
end()) {
253 this->log(
"unable to process fd _", ev.fd());
255 handler_type& h = *result->second;
260 this->log(
"failed to process fd _: _", ev.fd(), err.
what());
263 this->log(
"remove _ (bad event _)", h, ev);
264 h.remove(this->poller());
265 this->_handlers.
erase(result);
272 is_timed_out(
const handler_type& rhs,
const time_point& now) {
273 return rhs.is_starting() &&
274 rhs.start_time_point() + this->_start_timeout <= now;
281 #endif // vim:filetype=cpp
Definition: basic_pipeline.hh:40
Definition: basic_handler.hh:13
Definition: basic_socket_pipeline.hh:41
void run(Thread_context *) override
Definition: basic_socket_pipeline.hh:181
Definition: thread_context.hh:10
Definition: static_lock.hh:7