OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
connection_manager.hpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#ifndef OPENCBDC_TX_SRC_NETWORK_CONNECTION_MANAGER_H_
7#define OPENCBDC_TX_SRC_NETWORK_CONNECTION_MANAGER_H_
8
9#include "peer.hpp"
10#include "socket_selector.hpp"
11#include "tcp_listener.hpp"
12#include "tcp_socket.hpp"
14
15#include <atomic>
16#include <cassert>
17#include <condition_variable>
18#include <mutex>
19#include <queue>
20#include <random>
21#include <shared_mutex>
22#include <sys/socket.h>
23#include <thread>
24
25namespace cbdc::network {
27 using peer_id_t = size_t;
28
33 struct message_t {
35 std::shared_ptr<buffer> m_pkt;
38 };
39
45 using packet_handler_t = std::function<std::optional<buffer>(message_t&&)>;
46
54 public:
55 connection_manager() = default;
56
59 -> connection_manager& = delete;
60
63
65
71 [[nodiscard]] auto listen(const ip_address& host, unsigned short port)
72 -> bool;
73
76 [[nodiscard]] auto pump() -> bool;
77
81 void broadcast(const std::shared_ptr<buffer>& data);
82
86 template<typename Ta>
87 void broadcast(const Ta& data) {
88 auto pkt = make_shared_buffer(data);
89 return broadcast(pkt);
90 }
91
95 [[nodiscard]] auto handle_messages() -> std::vector<message_t>;
96
103 auto add(std::unique_ptr<tcp_socket> sock,
104 bool attempt_reconnect = true) -> peer_id_t;
105
110 auto cluster_connect(const std::vector<endpoint_t>& endpoints,
111 bool error_fatal = true) -> bool;
112
120 [[nodiscard]] auto
121 start_cluster_handler(const std::vector<endpoint_t>& endpoints,
122 const packet_handler_t& handler)
123 -> std::optional<std::thread>;
124
133 [[nodiscard]] auto start_server(const endpoint_t& listen_endpoint,
134 const packet_handler_t& handler)
135 -> std::optional<std::thread>;
136
140 [[nodiscard]] auto start_server_listener() -> std::thread;
141
146 [[nodiscard]] auto start_handler(const packet_handler_t& handler)
147 -> std::thread;
148
150 void close();
151
155 void send(const std::shared_ptr<buffer>& data, peer_id_t peer_id);
156
161 template<typename Ta>
162 void send(const Ta& data, peer_id_t peer_id) {
163 auto pkt = make_shared_buffer(data);
164 return send(pkt, peer_id);
165 }
166
169 [[nodiscard]] auto peer_count() -> size_t;
170
174 void reset();
175
179 [[nodiscard]] auto send_to_one(const std::shared_ptr<buffer>& data)
180 -> bool;
181
186 template<typename T>
187 [[nodiscard]] auto send_to_one(const T& data) -> bool {
188 auto pkt = make_shared_buffer(data);
189 return send_to_one(pkt);
190 }
191
195 [[nodiscard]] auto connected(peer_id_t peer_id) -> bool;
196
199 [[nodiscard]] auto connected_to_one() -> bool;
200
201 private:
202 tcp_listener m_listener;
203
204 struct m_peer_t {
205 m_peer_t() = delete;
206
207 m_peer_t(std::unique_ptr<peer> peer, peer_id_t peer_id);
208 ~m_peer_t() = default;
209
210 auto operator=(const m_peer_t& other) -> m_peer_t& = default;
211 m_peer_t(const m_peer_t& other) = default;
212
213 auto operator=(m_peer_t&& other) noexcept -> m_peer_t& = default;
214 m_peer_t(m_peer_t&& other) noexcept = default;
215
216 std::shared_ptr<peer> m_peer;
217 peer_id_t m_peer_id;
218 };
219
220 std::vector<m_peer_t> m_peers;
221 std::atomic<peer_id_t> m_next_peer_id{0};
222
223 std::shared_mutex m_peer_mutex;
224
225 std::atomic_bool m_running{true};
226
227 std::mutex m_async_recv_mut;
228 std::condition_variable m_async_recv_cv;
229 std::vector<std::queue<message_t>> m_async_recv_queues;
230 bool m_async_recv_data{false};
231
232 socket_selector m_listen_selector;
233
234 std::random_device m_r{cbdc::config::random_source};
235 std::default_random_engine m_rnd{m_r()};
236 };
237}
238
239#endif // OPENCBDC_TX_SRC_NETWORK_CONNECTION_MANAGER_H_
auto start_handler(const packet_handler_t &handler) -> std::thread
Starts a thread to handle messages from established connections using the specified handler function.
auto pump() -> bool
Listens for and accepts inbound connections.
auto start_server(const endpoint_t &listen_endpoint, const packet_handler_t &handler) -> std::optional< std::thread >
Establishes a server at the specified endpoint which handles inbound traffic with the specified handl...
connection_manager(connection_manager &&)=delete
auto send_to_one(const T &data) -> bool
Serialize and send the provided data to an online peer managed by this network.
void reset()
Resets the network instance to a fresh state.
auto connected(peer_id_t peer_id) -> bool
Determines whether the given peer ID is connected.
auto cluster_connect(const std::vector< endpoint_t > &endpoints, bool error_fatal=true) -> bool
Establishes connections to the provided list of endpoints.
void broadcast(const Ta &data)
Serialize the data and broadcast it to all peers.
auto start_cluster_handler(const std::vector< endpoint_t > &endpoints, const packet_handler_t &handler) -> std::optional< std::thread >
Connects to the provided endpoints and calls the provided handler for packets received from those end...
void close()
Shuts down the network listener and all existing peer connections.
auto start_server_listener() -> std::thread
Launches a thread that listens for and accepts inbound connections.
auto operator=(connection_manager &&) -> connection_manager &=delete
auto add(std::unique_ptr< tcp_socket > sock, bool attempt_reconnect=true) -> peer_id_t
Registers the provided socket as a peer to which messages can be sent or broadcast.
auto send_to_one(const std::shared_ptr< buffer > &data) -> bool
Send the provided data to an online peer managed by this network.
connection_manager(const connection_manager &)=delete
auto peer_count() -> size_t
Returns the number of peers connected to this network.
auto listen(const ip_address &host, unsigned short port) -> bool
Starts a listener to listen for inbound connections on the specified IP address and port.
auto operator=(const connection_manager &) -> connection_manager &=delete
auto handle_messages() -> std::vector< message_t >
Collects and return unhandled packets received from connected peers.
void send(const std::shared_ptr< buffer > &data, peer_id_t peer_id)
Sends the provided data to the specified peer.
void send(const Ta &data, peer_id_t peer_id)
Serialize the data and transmit it in a packet to the remote host at the specified peer ID.
auto connected_to_one() -> bool
Determines if the network is connected to at least one peer.
void broadcast(const std::shared_ptr< buffer > &data)
Sends the provided data to all added peers.
Maintains a TCP socket.
Definition peer.hpp:21
Listens for incoming TCP connections on a given endpoint.
Tools for reading options from a configuration file and building application-specific parameter sets ...
size_t peer_id_t
Peer IDs within a connection_manager.
std::function< std::optional< buffer >(message_t &&)> packet_handler_t
Function type for packet handler callbacks.
std::string ip_address
An IP addresses.
Definition socket.hpp:15
std::pair< ip_address, port_number_t > endpoint_t
[host name, port number].
Definition socket.hpp:19
auto make_shared_buffer(const T &obj) -> std::shared_ptr< cbdc::buffer >
Serialize object into std::shared_ptr<cbdc::buffer> using a cbdc::buffer_serializer.
Received message type.
std::shared_ptr< buffer > m_pkt
Packet data.
peer_id_t m_peer_id
Peer ID that sent packet.