1 #ifndef SUBORDINATION_PPL_BASIC_SOCKET_PIPELINE_HH     2 #define SUBORDINATION_PPL_BASIC_SOCKET_PIPELINE_HH    10 #include <unordered_map>    13 #include <unistdx/base/log_message>    17 #include <unistdx/io/fildesbuf>    18 #include <unistdx/io/poller>    19 #include <unistdx/net/pstream>    21 #include <subordination/base/container_traits.hh>    22 #include <subordination/base/static_lock.hh>    23 #include <subordination/kernel/kstream.hh>    24 #include <subordination/ppl/basic_handler.hh>    25 #include <subordination/ppl/basic_pipeline.hh>    31              class Traits=queue_traits<Kernels>,
    33     using Proxy_pipeline_base = basic_pipeline<T, Kernels, Traits, Threads,
    48         typedef typename handler_container_type::const_iterator
    49             handler_const_iterator;
    51         typedef typename handler_type::time_point time_point;
    52         typedef typename handler_type::duration duration;
    55         using typename base_pipeline::kernel_type;
    56         using typename base_pipeline::mutex_type;
    57         using typename base_pipeline::lock_type;
    58         using typename base_pipeline::sem_type;
    59         using typename base_pipeline::kernel_pool;
    65         mutex_type* _othermutex = 
nullptr;
    69         duration _start_timeout = duration::zero();
    75         _start_timeout(rhs._start_timeout) { }
    79             this->emplace_notify_handler(std::make_shared<basic_handler>());
    90         set_other_mutex(mutex_type* rhs) noexcept {
    91             this->_othermutex = rhs;
    95         other_mutex() noexcept {
    96             return this->_othermutex;
   101             return &this->_mutex;
   108             return this->_semaphore;
   111         inline const sem_type&
   112         poller() 
const noexcept {
   113             return this->_semaphore;
   120             this->log(
"add _, ev=_", *ptr, ev);
   121             this->_handlers[ev.fd()] = ptr;
   122             this->poller().insert(ev);
   128             this->emplace_handler(ev, std::static_pointer_cast<handler_type>(ptr));
   133             sys::fd_type fd = this->poller().pipe_in();
   134             this->log(
"add _", *ptr);
   135             this->_handlers.
emplace(fd, ptr);
   141             this->emplace_notify_handler(
   142                 std::static_pointer_cast<handler_type>(ptr)
   149             while (!this->has_stopped()) {
   150                 bool timeout = 
false;
   151                 if (this->_start_timeout > duration::zero()) {
   152                     handler_const_iterator result =
   153                         this->handler_with_min_start_time_point();
   154                     if (result != this->_handlers.
end()) {
   156                         const time_point tp = result->second->start_time_point()
   157                                               + this->_start_timeout;
   158                         this->poller().wait_until(lock, tp);
   161                 auto process = [
this,timeout] () {
   162                     this->process_kernels();
   163                     this->handle_events();
   164                     this->flush_buffers(timeout);
   165                     return this->has_stopped();
   170                     this->poller().wait(lock, process);
   190         set_start_timeout(
const duration& rhs) noexcept {
   191             this->_start_timeout = rhs;
   195         process_kernels() = 0;
   200         flush_buffers(
bool timeout) {
   201             const time_point now = timeout
   203                                    : time_point(duration::zero());
   204             handler_const_iterator first = this->_handlers.
begin();
   205             handler_const_iterator last = this->_handlers.
end();
   206             while (first != last) {
   207                 handler_type& h = *first->second;
   208                 if (h.has_stopped() || (timeout && this->is_timed_out(
   212                     this->log(
"remove _ (_)", h, h.has_stopped() ? 
"stop" : 
"timeout");
   213                     h.remove(this->poller());
   214                     first = this->_handlers.
erase(first);
   216                     first->second->flush();
   222         handler_const_iterator
   223         handler_with_min_start_time_point() const noexcept {
   224             handler_const_iterator first = this->_handlers.
begin();
   225             handler_const_iterator last = this->_handlers.
end();
   226             handler_const_iterator result = last;
   227             while (first != last) {
   228                 handler_type& h = *first->second;
   229                 if (h.is_starting() && h.has_start_time_point()) {
   230                     if (result == last) {
   233                         const auto old_t = result->second->start_time_point();
   234                         const auto new_t = h.start_time_point();
   242             if (result != last) {
   243                 this->log(
"min _", *result->second);
   250             for (
const sys::epoll_event& ev : this->poller()) {
   251                 auto result = this->_handlers.
find(ev.fd());
   252                 if (result == this->_handlers.
end()) {
   253                     this->log(
"unable to process fd _", ev.fd());
   255                     handler_type& h = *result->second;
   260                         this->log(
"failed to process fd _: _", ev.fd(), err.
what());
   263                         this->log(
"remove _ (bad event _)", h, ev);
   264                         h.remove(this->poller());
   265                         this->_handlers.
erase(result);
   272         is_timed_out(
const handler_type& rhs, 
const time_point& now) {
   273             return rhs.is_starting() &&
   274                    rhs.start_time_point() + this->_start_timeout <= now;
   281 #endif // vim:filetype=cpp 
Definition: basic_pipeline.hh:40
 
Definition: basic_handler.hh:13
 
Definition: basic_socket_pipeline.hh:41
 
void run(Thread_context *) override
Definition: basic_socket_pipeline.hh:181
 
Definition: thread_context.hh:10
 
Definition: static_lock.hh:7