14 unsigned short port) ->
bool {
15 if(!m_listener.listen(host, port)) {
18 if(!m_listen_selector.init()) {
21 return m_listen_selector.add(m_listener);
26 if(!m_listen_selector.wait()) {
29 auto sock = std::make_unique<tcp_socket>();
30 if(m_listener.accept(*sock)) {
31 add(std::move(sock),
false);
42 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
43 for(
const auto&
peer : m_peers) {
50 std::vector<message_t> pkts;
53 std::unique_lock<std::mutex> l(m_async_recv_mut);
54 m_async_recv_cv.wait(l, [&]() {
59 return m_async_recv_data;
61 m_async_recv_data =
false;
63 for(
auto&& q : m_async_recv_queues) {
65 pkts.push_back(std::move(q.front()));
71 m_async_recv_data =
true;
83 const auto peer_id = m_next_peer_id++;
86 std::lock_guard<std::mutex> l(m_async_recv_mut);
87 m_async_recv_queues.emplace_back();
88 q_idx = m_async_recv_queues.size() - 1;
90 auto recv_cb = [&, q_idx, peer_id](std::shared_ptr<buffer> pkt) {
92 std::lock_guard<std::mutex> ll(m_async_recv_mut);
93 m_async_recv_queues[q_idx].emplace(
95 m_async_recv_data =
true;
97 m_async_recv_cv.notify_one();
101 std::unique_lock<std::shared_mutex> l(m_peer_mutex);
102 auto p = std::make_unique<peer>(std::move(sock),
106 m_peers.emplace_back(std::move(p), peer_id);
114 const std::vector<endpoint_t>& endpoints,
115 bool error_fatal) ->
bool {
116 for(
const auto& ep : endpoints) {
117 auto ep_sock = std::make_unique<tcp_socket>();
118 if(!ep_sock->connect(ep)) {
123 add(std::move(ep_sock));
129 const std::vector<endpoint_t>& endpoints,
131 if(!cluster_connect(endpoints)) {
135 return start_handler(handler);
140 -> std::optional<std::thread> {
141 if(!listen(listen_endpoint.first, listen_endpoint.second)) {
145 return std::thread{[
this, handler]() {
146 auto l_thread = start_server_listener();
147 auto h_thread = start_handler(handler);
154 return std::thread{[
this]() {
163 return std::thread{[
this, handler]() {
165 auto pkts = handle_messages();
167 for(
auto&& pkt : pkts) {
172 auto pid = pkt.m_peer_id;
173 auto res = handler(std::move(pkt));
175 if(res.has_value()) {
176 send(std::make_shared<buffer>(std::move(res.value())),
189 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
190 for(
auto&&
peer : m_peers) {
195 std::unique_lock<std::shared_mutex> l(m_peer_mutex);
198 m_async_recv_cv.notify_all();
203 std::shared_ptr<peer>
peer;
205 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
206 for(
const auto& p : m_peers) {
207 if(p.m_peer_id == peer_id) {
220 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
221 return m_peers.size();
230 std::lock_guard<std::mutex> l(m_async_recv_mut);
231 m_async_recv_queues.clear();
232 m_async_recv_data =
false;
240 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
244 = std::uniform_int_distribution<size_t>(0, m_peers.size() - 1);
245 auto offset = dist(m_rnd);
246 for(
size_t i = 0; i < m_peers.size(); i++) {
247 auto idx = (i + offset) % m_peers.size();
248 const auto& p = m_peers[idx];
249 if(p.m_peer->connected()) {
250 p.m_peer->send(data);
259 connection_manager::m_peer_t::m_peer_t(std::unique_ptr<peer>
peer,
261 : m_peer(std::move(
peer)),
262 m_peer_id(peer_id) {}
266 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
267 for(
const auto& p : m_peers) {
268 if(p.m_peer_id == peer_id) {
269 return p.m_peer->connected();
278 std::shared_lock<std::shared_mutex> l(m_peer_mutex);
279 for(
const auto& p : m_peers) {
280 if(p.m_peer->connected()) {
auto start_handler(const packet_handler_t &handler) -> std::thread
Starts a thread to handle messages from established connections using the specified handler function.
auto pump() -> bool
Listens for and accepts inbound connections.
auto start_server(const endpoint_t &listen_endpoint, const packet_handler_t &handler) -> std::optional< std::thread >
Establishes a server at the specified endpoint which handles inbound traffic with the specified handl...
void reset()
Resets the network instance to a fresh state.
auto connected(peer_id_t peer_id) -> bool
Determines whether the given peer ID is connected.
auto cluster_connect(const std::vector< endpoint_t > &endpoints, bool error_fatal=true) -> bool
Establishes connections to the provided list of endpoints.
auto start_cluster_handler(const std::vector< endpoint_t > &endpoints, const packet_handler_t &handler) -> std::optional< std::thread >
Connects to the provided endpoints and calls the provided handler for packets received from those end...
void close()
Shuts down the network listener and all existing peer connections.
auto start_server_listener() -> std::thread
Launches a thread that listens for and accepts inbound connections.
auto add(std::unique_ptr< tcp_socket > sock, bool attempt_reconnect=true) -> peer_id_t
Registers the provided socket as a peer to which messages can be sent or broadcast.
auto send_to_one(const std::shared_ptr< buffer > &data) -> bool
Send the provided data to an online peer managed by this network.
auto peer_count() -> size_t
Returns the number of peers connected to this network.
auto listen(const ip_address &host, unsigned short port) -> bool
Starts a listener to listen for inbound connections on the specified IP address and port.
auto handle_messages() -> std::vector< message_t >
Collects and return unhandled packets received from connected peers.
void send(const std::shared_ptr< buffer > &data, peer_id_t peer_id)
Sends the provided data to the specified peer.
auto connected_to_one() -> bool
Determines if the network is connected to at least one peer.
void broadcast(const std::shared_ptr< buffer > &data)
Sends the provided data to all added peers.
void send(const std::shared_ptr< cbdc::buffer > &data)
Sends buffered data.
void shutdown()
Clears any packets in the pending send queue.
void unblock()
Unblocks a blocked wait() call.
void close()
Stops the listener and unblocks any blocking calls associated with this listener.
size_t peer_id_t
Peer IDs within a connection_manager.
std::function< std::optional< buffer >(message_t &&)> packet_handler_t
Function type for packet handler callbacks.
std::string ip_address
An IP addresses.
std::pair< ip_address, port_number_t > endpoint_t
[host name, port number].