OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
connection_manager.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
7
8namespace cbdc::network {
12
14 unsigned short port) -> bool {
15 if(!m_listener.listen(host, port)) {
16 return false;
17 }
18 if(!m_listen_selector.init()) {
19 return false;
20 }
21 return m_listen_selector.add(m_listener);
22 }
23
24 auto connection_manager::pump() -> bool {
25 while(m_running) {
26 if(!m_listen_selector.wait()) {
27 continue;
28 }
29 auto sock = std::make_unique<tcp_socket>();
30 if(m_listener.accept(*sock)) {
31 add(std::move(sock), false);
32 } else {
33 return false;
34 }
35 }
36
37 return true;
38 }
39
40 void connection_manager::broadcast(const std::shared_ptr<buffer>& data) {
41 {
42 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
43 for(const auto& peer : m_peers) {
44 peer.m_peer->send(data);
45 }
46 }
47 }
48
49 auto connection_manager::handle_messages() -> std::vector<message_t> {
50 std::vector<message_t> pkts;
51
52 {
53 std::unique_lock<std::mutex> l(m_async_recv_mut);
54 m_async_recv_cv.wait(l, [&]() {
55 if(!m_running) {
56 return true;
57 }
58
59 return m_async_recv_data;
60 });
61 m_async_recv_data = false;
62
63 for(auto&& q : m_async_recv_queues) {
64 if(!q.empty()) {
65 pkts.push_back(std::move(q.front()));
66 q.pop();
67
68 // The queue still has packets so don't block
69 // next call.
70 if(!q.empty()) {
71 m_async_recv_data = true;
72 }
73 }
74 }
75 }
76
77 return pkts;
78 }
79
80 auto connection_manager::add(std::unique_ptr<tcp_socket> sock,
81 bool attempt_reconnect) -> peer_id_t {
82 size_t q_idx{};
83 const auto peer_id = m_next_peer_id++;
84 {
85 // Register the async recv callback with the peer
86 std::lock_guard<std::mutex> l(m_async_recv_mut);
87 m_async_recv_queues.emplace_back();
88 q_idx = m_async_recv_queues.size() - 1;
89 }
90 auto recv_cb = [&, q_idx, peer_id](std::shared_ptr<buffer> pkt) {
91 {
92 std::lock_guard<std::mutex> ll(m_async_recv_mut);
93 m_async_recv_queues[q_idx].emplace(
94 message_t{std::move(pkt), peer_id});
95 m_async_recv_data = true;
96 }
97 m_async_recv_cv.notify_one();
98 };
99
100 {
101 std::unique_lock<std::shared_mutex> l(m_peer_mutex);
102 auto p = std::make_unique<peer>(std::move(sock),
103 recv_cb,
104 attempt_reconnect);
105 if(m_running) {
106 m_peers.emplace_back(std::move(p), peer_id);
107 }
108 }
109
110 return peer_id;
111 }
112
114 const std::vector<endpoint_t>& endpoints,
115 bool error_fatal) -> bool {
116 for(const auto& ep : endpoints) {
117 auto ep_sock = std::make_unique<tcp_socket>();
118 if(!ep_sock->connect(ep)) {
119 if(error_fatal) {
120 return false;
121 }
122 }
123 add(std::move(ep_sock));
124 }
125 return true;
126 }
127
129 const std::vector<endpoint_t>& endpoints,
130 const packet_handler_t& handler) -> std::optional<std::thread> {
131 if(!cluster_connect(endpoints)) {
132 return std::nullopt;
133 }
134
135 return start_handler(handler);
136 }
137
138 auto connection_manager::start_server(const endpoint_t& listen_endpoint,
139 const packet_handler_t& handler)
140 -> std::optional<std::thread> {
141 if(!listen(listen_endpoint.first, listen_endpoint.second)) {
142 return std::nullopt;
143 }
144
145 return std::thread{[this, handler]() {
146 auto l_thread = start_server_listener();
147 auto h_thread = start_handler(handler);
148 h_thread.join();
149 l_thread.join();
150 }};
151 }
152
154 return std::thread{[this]() {
155 if(!pump()) {
156 m_running = false;
157 }
158 }};
159 }
160
162 -> std::thread {
163 return std::thread{[this, handler]() {
164 while(m_running) {
165 auto pkts = handle_messages();
166
167 for(auto&& pkt : pkts) {
168 if(!pkt.m_pkt) {
169 continue;
170 }
171
172 auto pid = pkt.m_peer_id;
173 auto res = handler(std::move(pkt));
174
175 if(res.has_value()) {
176 send(std::make_shared<buffer>(std::move(res.value())),
177 pid);
178 }
179 }
180 }
181 }};
182 }
183
185 m_running = false;
186 m_listen_selector.unblock();
187 m_listener.close();
188 {
189 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
190 for(auto&& peer : m_peers) {
191 peer.m_peer->shutdown();
192 }
193 }
194 {
195 std::unique_lock<std::shared_mutex> l(m_peer_mutex);
196 m_peers.clear();
197 }
198 m_async_recv_cv.notify_all();
199 }
200
201 void connection_manager::send(const std::shared_ptr<buffer>& data,
202 peer_id_t peer_id) {
203 std::shared_ptr<peer> peer;
204 {
205 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
206 for(const auto& p : m_peers) {
207 if(p.m_peer_id == peer_id) {
208 peer = p.m_peer;
209 break;
210 }
211 }
212 }
213
214 if(peer) {
215 peer->send(data);
216 }
217 }
218
220 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
221 return m_peers.size();
222 }
223
225 close();
226 assert(!m_running);
227 m_running = true;
228 m_next_peer_id = 0;
229 {
230 std::lock_guard<std::mutex> l(m_async_recv_mut);
231 m_async_recv_queues.clear();
232 m_async_recv_data = false;
233 }
234 }
235
236 auto connection_manager::send_to_one(const std::shared_ptr<buffer>& data)
237 -> bool {
238 bool sent{false};
239 {
240 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
241 // Start at a random place in the peers vector to balance load
242 // across connections
243 auto dist
244 = std::uniform_int_distribution<size_t>(0, m_peers.size() - 1);
245 auto offset = dist(m_rnd);
246 for(size_t i = 0; i < m_peers.size(); i++) {
247 auto idx = (i + offset) % m_peers.size();
248 const auto& p = m_peers[idx];
249 if(p.m_peer->connected()) {
250 p.m_peer->send(data);
251 sent = true;
252 break;
253 }
254 }
255 }
256 return sent;
257 }
258
259 connection_manager::m_peer_t::m_peer_t(std::unique_ptr<peer> peer,
260 peer_id_t peer_id)
261 : m_peer(std::move(peer)),
262 m_peer_id(peer_id) {}
263
265 {
266 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
267 for(const auto& p : m_peers) {
268 if(p.m_peer_id == peer_id) {
269 return p.m_peer->connected();
270 }
271 }
272 }
273 return false;
274 }
275
277 {
278 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
279 for(const auto& p : m_peers) {
280 if(p.m_peer->connected()) {
281 return true;
282 }
283 }
284 }
285 return false;
286 }
287}
auto start_handler(const packet_handler_t &handler) -> std::thread
Starts a thread to handle messages from established connections using the specified handler function.
auto pump() -> bool
Listens for and accepts inbound connections.
auto start_server(const endpoint_t &listen_endpoint, const packet_handler_t &handler) -> std::optional< std::thread >
Establishes a server at the specified endpoint which handles inbound traffic with the specified handl...
void reset()
Resets the network instance to a fresh state.
auto connected(peer_id_t peer_id) -> bool
Determines whether the given peer ID is connected.
auto cluster_connect(const std::vector< endpoint_t > &endpoints, bool error_fatal=true) -> bool
Establishes connections to the provided list of endpoints.
auto start_cluster_handler(const std::vector< endpoint_t > &endpoints, const packet_handler_t &handler) -> std::optional< std::thread >
Connects to the provided endpoints and calls the provided handler for packets received from those end...
void close()
Shuts down the network listener and all existing peer connections.
auto start_server_listener() -> std::thread
Launches a thread that listens for and accepts inbound connections.
auto add(std::unique_ptr< tcp_socket > sock, bool attempt_reconnect=true) -> peer_id_t
Registers the provided socket as a peer to which messages can be sent or broadcast.
auto send_to_one(const std::shared_ptr< buffer > &data) -> bool
Send the provided data to an online peer managed by this network.
auto peer_count() -> size_t
Returns the number of peers connected to this network.
auto listen(const ip_address &host, unsigned short port) -> bool
Starts a listener to listen for inbound connections on the specified IP address and port.
auto handle_messages() -> std::vector< message_t >
Collects and return unhandled packets received from connected peers.
void send(const std::shared_ptr< buffer > &data, peer_id_t peer_id)
Sends the provided data to the specified peer.
auto connected_to_one() -> bool
Determines if the network is connected to at least one peer.
void broadcast(const std::shared_ptr< buffer > &data)
Sends the provided data to all added peers.
Maintains a TCP socket.
Definition peer.hpp:21
void send(const std::shared_ptr< cbdc::buffer > &data)
Sends buffered data.
Definition peer.cpp:27
void shutdown()
Clears any packets in the pending send queue.
Definition peer.cpp:33
void unblock()
Unblocks a blocked wait() call.
void close()
Stops the listener and unblocks any blocking calls associated with this listener.
size_t peer_id_t
Peer IDs within a connection_manager.
std::function< std::optional< buffer >(message_t &&)> packet_handler_t
Function type for packet handler callbacks.
std::string ip_address
An IP addresses.
Definition socket.hpp:15
std::pair< ip_address, port_number_t > endpoint_t
[host name, port number].
Definition socket.hpp:19
Received message type.