14 bool attempt_reconnect)
15 : m_sock(std::move(sock)),
16 m_attempt_reconnect(attempt_reconnect),
17 m_recv_cb(std::move(cb)) {
27 void peer::send(
const std::shared_ptr<cbdc::buffer>& data) {
29 m_send_queue.push(data);
35 m_reconnect_cv.notify_one();
36 if(m_reconnect_thread.joinable()) {
37 m_reconnect_thread.join();
43 return !m_shut_down && m_running && m_sock->connected();
46 void peer::do_send() {
47 m_send_thread = std::thread([&]() {
49 std::shared_ptr<cbdc::buffer> pkt;
50 if(!m_send_queue.pop(pkt)) {
56 const auto result = m_sock->send(*pkt);
66 void peer::do_recv() {
67 m_recv_thread = std::thread([&]() {
69 auto pkt = std::make_shared<cbdc::buffer>();
70 if(!m_sock->receive(*pkt)) {
75 m_recv_cb(std::move(pkt));
80 void peer::do_reconnect() {
81 m_reconnect_thread = std::thread([&]() {
84 std::unique_lock<std::mutex> l(m_reconnect_mut);
85 m_reconnect_cv.wait(l, [&]() {
86 return m_reconnect || m_shut_down;
93 if(m_attempt_reconnect) {
95 while(!m_shut_down && !m_sock->reconnect()) {
96 static constexpr auto retry_delay
97 = std::chrono::seconds(3);
98 std::unique_lock<std::mutex> l(m_reconnect_mut);
99 m_reconnect_cv.wait_for(l, retry_delay, [&]() ->
bool {
111 m_send_queue.clear();
120 m_sock->disconnect();
121 m_send_queue.clear();
122 if(m_send_thread.joinable()) {
123 m_send_thread.join();
125 if(m_recv_thread.joinable()) {
126 m_recv_thread.join();
128 m_send_queue.reset();
131 void peer::signal_reconnect() {
133 std::lock_guard<std::mutex> l(m_reconnect_mut);
136 m_reconnect_cv.notify_one();
auto connected() const -> bool
Indicates whether the TCP socket is currently connected.
peer(std::unique_ptr< tcp_socket > sock, callback_type cb, bool attempt_reconnect)
Constructor.
std::function< void(std::shared_ptr< cbdc::buffer >)> callback_type
Type for the packet receipt callback function.
void send(const std::shared_ptr< cbdc::buffer > &data)
Sends buffered data.
void shutdown()
Clears any packets in the pending send queue.
~peer()
Destructor. Calls shutdown().