Subordination
A framework for distributed programming
socket_pipeline.hh
1 #ifndef SUBORDINATION_PPL_SOCKET_PIPELINE_HH
2 #define SUBORDINATION_PPL_SOCKET_PIPELINE_HH
3 
4 #include <iosfwd>
5 #include <unordered_map>
6 #include <vector>
7 
8 #include <unistdx/base/log_message>
9 #include <unistdx/it/field_iterator>
10 #include <unistdx/net/socket_address>
11 #include <unistdx/net/interface_address>
12 
13 #include <subordination/kernel/kernel_instance_registry.hh>
14 #include <subordination/kernel/kstream.hh>
15 #include <subordination/ppl/basic_socket_pipeline.hh>
16 #include <subordination/ppl/local_server.hh>
17 
18 namespace sbn {
19 
20  template <class K, class S, class R>
21  class local_server;
22 
23  template <class K, class S, class R>
24  class remote_client;
25 
26  template <class K, class S, class R>
28 
29  template<class K, class S, class R>
31 
32  public:
33  typedef S socket_type;
34  typedef R router_type;
35  typedef sys::ipv4_address addr_type;
36  typedef sys::interface_address<addr_type> ifaddr_type;
41 
42  using typename base_pipeline::kernel_type;
43  using typename base_pipeline::mutex_type;
44  using typename base_pipeline::lock_type;
45  using typename base_pipeline::sem_type;
46  using typename base_pipeline::kernel_pool;
47  using typename base_pipeline::duration;
48 
49  private:
52  typedef sys::ipaddr_traits<addr_type> traits_type;
54  typedef typename server_container_type::iterator server_iterator;
55  typedef typename server_container_type::const_iterator
56  server_const_iterator;
59  typedef typename client_container_type::iterator client_iterator;
60  typedef ifaddr_type::rep_type rep_type;
61  typedef mobile_kernel::id_type id_type;
62  typedef sys::field_iterator<server_const_iterator,0> ifaddr_iterator;
65  typedef uint32_t weight_type;
66 
67  private:
68  server_container_type _servers;
69  client_container_type _clients;
72  client_iterator _iterator = this->_clients.end();
75  weight_type _weightcnt = 0;
76  sys::port_type _port = 33333;
78  id_type _counter = 0;
79  bool _uselocalhost = true;
80 
81  public:
82 
84 
85  ~socket_pipeline() = default;
86 
87  socket_pipeline(const socket_pipeline&) = delete;
88 
89  socket_pipeline(socket_pipeline&&) = delete;
90 
92  operator=(const socket_pipeline&) = delete;
93 
95  operator=(socket_pipeline&&) = delete;
96 
97  void
98  add_client(const sys::socket_address& addr) {
99  lock_type lock(this->_mutex);
100  this->do_add_client(addr);
101  }
102 
103  void
104  stop_client(const sys::socket_address& addr);
105 
106  void
107  set_client_weight(const sys::socket_address& addr, weight_type new_weight);
108 
109  void
110  add_server(const ifaddr_type& rhs) {
111  this->add_server(
112  sys::socket_address(rhs.address(), this->_port),
113  rhs.netmask()
114  );
115  }
116 
117  void
118  add_server(const sys::socket_address& rhs, addr_type netmask);
119 
120  void
121  forward(foreign_kernel* hdr);
122 
123  inline void
124  set_port(sys::port_type rhs) noexcept {
125  this->_port = rhs;
126  }
127 
128  inline sys::port_type
129  port() const noexcept {
130  return this->_port;
131  }
132 
133  inline server_const_iterator
134  servers_begin() const noexcept {
135  return this->_servers.begin();
136  }
137 
138  inline server_const_iterator
139  servers_end() const noexcept {
140  return this->_servers.end();
141  }
142 
143  inline void
144  use_localhost(bool b) noexcept {
145  this->_uselocalhost = b;
146  }
147 
148  void
149  remove_server(const ifaddr_type& interface_address);
150 
151  void
152  print_state(std::ostream& out);
153 
154  private:
155 
156  void
157  remove_client(const sys::socket_address& vaddr);
158 
159  void
160  remove_client(client_iterator result);
161 
162  void
163  remove_server(server_iterator result);
164 
165  server_iterator
166  find_server(const ifaddr_type& interface_address);
167 
168  server_iterator
169  find_server(sys::fd_type fd);
170 
171  server_iterator
172  find_server(const sys::socket_address& dest);
173 
174  void
175  ensure_identity(kernel_type* k, const sys::socket_address& dest);
176 
178  void
179  find_next_client();
180 
181  inline bool
182  end_reached() const noexcept {
183  return this->_iterator == this->_clients.end();
184  }
185 
186  inline void
187  reset_iterator() noexcept {
188  this->_iterator = this->_clients.end();
189  this->_weightcnt = 0;
190  }
191 
192  inline const client_type&
193  current_client() const noexcept {
194  return *this->_iterator->second;
195  }
196 
197  inline client_type&
198  current_client() noexcept {
199  return *this->_iterator->second;
200  }
201 
202  inline void
203  advance_client_iterator() noexcept {
204  ++this->_iterator;
205  this->_weightcnt = 0;
206  }
207 
208  void
209  emplace_client(const sys::socket_address& vaddr, const event_handler_ptr& s);
210 
211  inline sys::socket_address
212  virtual_addr(const sys::socket_address& addr) const {
213  return addr.family() == sys::family_type::unix
214  ? addr
215  : sys::socket_address(addr, this->_port);
216  }
217 
218  void
219  process_kernels() override;
220 
221  void
222  process_kernel(kernel_type* k);
223 
225  find_or_create_client(const sys::socket_address& addr);
226 
228  do_add_client(const sys::socket_address& addr);
229 
231  do_add_client(socket_type&& sock, sys::socket_address vaddr);
232 
233  template <class K1, class S1, class R1>
234  friend class local_server;
235 
236  template <class K1, class S1, class R1>
237  friend class remote_client;
238 
239  template <class K1, class S1, class R1>
240  friend class socket_notify_handler;
241 
242  };
243 
244 }
245 
246 #endif // vim:filetype=cpp
Definition: socket_pipeline.hh:30
T end(T... args)
Definition: basic_socket_pipeline.hh:41
Definition: socket_pipeline.cc:174
Definition: socket_pipeline.cc:52
Definition: socket_pipeline.hh:27
T begin(T... args)
Definition: foreign_kernel.hh:9