14                                    unsigned short port) -> 
bool {
 
   15        if(!m_listener.listen(host, port)) {
 
   18        if(!m_listen_selector.init()) {
 
   21        return m_listen_selector.add(m_listener);
 
 
   26            if(!m_listen_selector.wait()) {
 
   29            auto sock = std::make_unique<tcp_socket>();
 
   30            if(m_listener.accept(*sock)) {
 
   31                add(std::move(sock), 
false);
 
 
   42            std::shared_lock<std::shared_mutex> l(m_peer_mutex);
 
   43            for(
const auto& 
peer : m_peers) {
 
 
   50        std::vector<message_t> pkts;
 
   53            std::unique_lock<std::mutex> l(m_async_recv_mut);
 
   54            m_async_recv_cv.wait(l, [&]() {
 
   59                return m_async_recv_data;
 
   61            m_async_recv_data = 
false;
 
   63            for(
auto&& q : m_async_recv_queues) {
 
   65                    pkts.push_back(std::move(q.front()));
 
   71                        m_async_recv_data = 
true;
 
 
   83        const auto peer_id = m_next_peer_id++;
 
   86            std::lock_guard<std::mutex> l(m_async_recv_mut);
 
   87            m_async_recv_queues.emplace_back();
 
   88            q_idx = m_async_recv_queues.size() - 1;
 
   90        auto recv_cb = [&, q_idx, peer_id](std::shared_ptr<buffer> pkt) {
 
   92                std::lock_guard<std::mutex> ll(m_async_recv_mut);
 
   93                m_async_recv_queues[q_idx].emplace(
 
   95                m_async_recv_data = 
true;
 
   97            m_async_recv_cv.notify_one();
 
  101            std::unique_lock<std::shared_mutex> l(m_peer_mutex);
 
  102            auto p = std::make_unique<peer>(std::move(sock),
 
  106                m_peers.emplace_back(std::move(p), peer_id);
 
 
  114        const std::vector<endpoint_t>& endpoints,
 
  115        bool error_fatal) -> 
bool {
 
  116        for(
const auto& ep : endpoints) {
 
  117            auto ep_sock = std::make_unique<tcp_socket>();
 
  118            if(!ep_sock->connect(ep)) {
 
  123            add(std::move(ep_sock));
 
 
  129        const std::vector<endpoint_t>& endpoints,
 
  131        if(!cluster_connect(endpoints)) {
 
  135        return start_handler(handler);
 
 
  140        -> std::optional<std::thread> {
 
  141        if(!listen(listen_endpoint.first, listen_endpoint.second)) {
 
  145        return std::thread{[
this, handler]() {
 
  146            auto l_thread = start_server_listener();
 
  147            auto h_thread = start_handler(handler);
 
 
  154        return std::thread{[
this]() {
 
 
  163        return std::thread{[
this, handler]() {
 
  165                auto pkts = handle_messages();
 
  167                for(
auto&& pkt : pkts) {
 
  172                    auto pid = pkt.m_peer_id;
 
  173                    auto res = handler(std::move(pkt));
 
  175                    if(res.has_value()) {
 
  176                        send(std::make_shared<buffer>(std::move(res.value())),
 
 
  189            std::shared_lock<std::shared_mutex> l(m_peer_mutex);
 
  190            for(
auto&& 
peer : m_peers) {
 
  195            std::unique_lock<std::shared_mutex> l(m_peer_mutex);
 
  198        m_async_recv_cv.notify_all();
 
 
  203        std::shared_ptr<peer> 
peer;
 
  205            std::shared_lock<std::shared_mutex> l(m_peer_mutex);
 
  206            for(
const auto& p : m_peers) {
 
  207                if(p.m_peer_id == peer_id) {
 
 
  220        std::shared_lock<std::shared_mutex> l(m_peer_mutex);
 
  221        return m_peers.size();
 
 
  230            std::lock_guard<std::mutex> l(m_async_recv_mut);
 
  231            m_async_recv_queues.clear();
 
  232            m_async_recv_data = 
false;
 
 
  240            std::shared_lock<std::shared_mutex> l(m_peer_mutex);
 
  244                = std::uniform_int_distribution<size_t>(0, m_peers.size() - 1);
 
  245            auto offset = dist(m_rnd);
 
  246            for(
size_t i = 0; i < m_peers.size(); i++) {
 
  247                auto idx = (i + offset) % m_peers.size();
 
  248                const auto& p = m_peers[idx];
 
  249                if(p.m_peer->connected()) {
 
  250                    p.m_peer->send(data);
 
 
  259    connection_manager::m_peer_t::m_peer_t(std::unique_ptr<peer> 
peer,
 
  261        : m_peer(std::move(
peer)),
 
  262          m_peer_id(peer_id) {}
 
  266            std::shared_lock<std::shared_mutex> l(m_peer_mutex);
 
  267            for(
const auto& p : m_peers) {
 
  268                if(p.m_peer_id == peer_id) {
 
  269                    return p.m_peer->connected();
 
 
  278            std::shared_lock<std::shared_mutex> l(m_peer_mutex);
 
  279            for(
const auto& p : m_peers) {
 
  280                if(p.m_peer->connected()) {
 
 
 
auto start_handler(const packet_handler_t &handler) -> std::thread
Starts a thread to handle messages from established connections using the specified handler function.
auto pump() -> bool
Listens for and accepts inbound connections.
auto start_server(const endpoint_t &listen_endpoint, const packet_handler_t &handler) -> std::optional< std::thread >
Establishes a server at the specified endpoint which handles inbound traffic with the specified handl...
void reset()
Resets the network instance to a fresh state.
auto connected(peer_id_t peer_id) -> bool
Determines whether the given peer ID is connected.
auto cluster_connect(const std::vector< endpoint_t > &endpoints, bool error_fatal=true) -> bool
Establishes connections to the provided list of endpoints.
auto start_cluster_handler(const std::vector< endpoint_t > &endpoints, const packet_handler_t &handler) -> std::optional< std::thread >
Connects to the provided endpoints and calls the provided handler for packets received from those end...
void close()
Shuts down the network listener and all existing peer connections.
auto start_server_listener() -> std::thread
Launches a thread that listens for and accepts inbound connections.
auto add(std::unique_ptr< tcp_socket > sock, bool attempt_reconnect=true) -> peer_id_t
Registers the provided socket as a peer to which messages can be sent or broadcast.
auto send_to_one(const std::shared_ptr< buffer > &data) -> bool
Send the provided data to an online peer managed by this network.
auto peer_count() -> size_t
Returns the number of peers connected to this network.
auto listen(const ip_address &host, unsigned short port) -> bool
Starts a listener to listen for inbound connections on the specified IP address and port.
auto handle_messages() -> std::vector< message_t >
Collects and return unhandled packets received from connected peers.
void send(const std::shared_ptr< buffer > &data, peer_id_t peer_id)
Sends the provided data to the specified peer.
auto connected_to_one() -> bool
Determines if the network is connected to at least one peer.
void broadcast(const std::shared_ptr< buffer > &data)
Sends the provided data to all added peers.
void send(const std::shared_ptr< cbdc::buffer > &data)
Sends buffered data.
void shutdown()
Clears any packets in the pending send queue.
void unblock()
Unblocks a blocked wait() call.
void close()
Stops the listener and unblocks any blocking calls associated with this listener.
size_t peer_id_t
Peer IDs within a connection_manager.
std::function< std::optional< buffer >(message_t &&)> packet_handler_t
Function type for packet handler callbacks.
std::string ip_address
An IP addresses.
std::pair< ip_address, port_number_t > endpoint_t
[host name, port number].