OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
peer.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "peer.hpp"
7
8#include <cassert>
9#include <utility>
10
11namespace cbdc::network {
12 peer::peer(std::unique_ptr<tcp_socket> sock,
14 bool attempt_reconnect)
15 : m_sock(std::move(sock)),
16 m_attempt_reconnect(attempt_reconnect),
17 m_recv_cb(std::move(cb)) {
18 do_send();
19 do_recv();
20 do_reconnect();
21 }
22
24 shutdown();
25 }
26
27 void peer::send(const std::shared_ptr<cbdc::buffer>& data) {
28 if(!m_shut_down) {
29 m_send_queue.push(data);
30 }
31 }
32
34 m_shut_down = true;
35 m_reconnect_cv.notify_one();
36 if(m_reconnect_thread.joinable()) {
37 m_reconnect_thread.join();
38 }
39 close();
40 }
41
42 auto peer::connected() const -> bool {
43 return !m_shut_down && m_running && m_sock->connected();
44 }
45
46 void peer::do_send() {
47 m_send_thread = std::thread([&]() {
48 while(m_running) {
49 std::shared_ptr<cbdc::buffer> pkt;
50 if(!m_send_queue.pop(pkt)) {
51 assert(!m_running);
52 break;
53 }
54
55 if(pkt) {
56 const auto result = m_sock->send(*pkt);
57 if(!result) {
58 signal_reconnect();
59 return;
60 }
61 }
62 }
63 });
64 }
65
66 void peer::do_recv() {
67 m_recv_thread = std::thread([&]() {
68 while(m_running) {
69 auto pkt = std::make_shared<cbdc::buffer>();
70 if(!m_sock->receive(*pkt)) {
71 signal_reconnect();
72 return;
73 }
74
75 m_recv_cb(std::move(pkt));
76 }
77 });
78 }
79
80 void peer::do_reconnect() {
81 m_reconnect_thread = std::thread([&]() {
82 while(!m_shut_down) {
83 {
84 std::unique_lock<std::mutex> l(m_reconnect_mut);
85 m_reconnect_cv.wait(l, [&]() {
86 return m_reconnect || m_shut_down;
87 });
88 m_reconnect = false;
89 }
90 if(m_shut_down) {
91 break;
92 }
93 if(m_attempt_reconnect) {
94 close();
95 while(!m_shut_down && !m_sock->reconnect()) {
96 static constexpr auto retry_delay
97 = std::chrono::seconds(3);
98 std::unique_lock<std::mutex> l(m_reconnect_mut);
99 m_reconnect_cv.wait_for(l, retry_delay, [&]() -> bool {
100 return m_shut_down;
101 });
102 }
103 if(!m_shut_down) {
104 m_running = true;
105 do_send();
106 do_recv();
107 }
108 } else {
109 m_shut_down = true;
110 close();
111 m_send_queue.clear();
112 return;
113 }
114 }
115 });
116 }
117
118 void peer::close() {
119 m_running = false;
120 m_sock->disconnect();
121 m_send_queue.clear();
122 if(m_send_thread.joinable()) {
123 m_send_thread.join();
124 }
125 if(m_recv_thread.joinable()) {
126 m_recv_thread.join();
127 }
128 m_send_queue.reset();
129 }
130
131 void peer::signal_reconnect() {
132 {
133 std::lock_guard<std::mutex> l(m_reconnect_mut);
134 m_reconnect = true;
135 }
136 m_reconnect_cv.notify_one();
137 }
138}
auto connected() const -> bool
Indicates whether the TCP socket is currently connected.
Definition peer.cpp:42
peer(std::unique_ptr< tcp_socket > sock, callback_type cb, bool attempt_reconnect)
Constructor.
Definition peer.cpp:12
std::function< void(std::shared_ptr< cbdc::buffer >)> callback_type
Type for the packet receipt callback function.
Definition peer.hpp:25
void send(const std::shared_ptr< cbdc::buffer > &data)
Sends buffered data.
Definition peer.cpp:27
void shutdown()
Clears any packets in the pending send queue.
Definition peer.cpp:33
~peer()
Destructor. Calls shutdown().
Definition peer.cpp:23