OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
tcp_client.hpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#ifndef OPENCBDC_TX_SRC_RPC_TCP_CLIENT_H_
7#define OPENCBDC_TX_SRC_RPC_TCP_CLIENT_H_
8
9#include "client.hpp"
12
13#include <future>
14#include <unordered_map>
15
16namespace cbdc::rpc {
22 template<typename Request, typename Response>
23 class tcp_client : public client<Request, Response> {
24 public:
27 explicit tcp_client(std::vector<network::endpoint_t> server_endpoints)
28 : m_server_endpoints(std::move(server_endpoints)) {}
29
31 auto operator=(tcp_client&&) -> tcp_client& = delete;
32 tcp_client(const tcp_client&) = delete;
33 auto operator=(const tcp_client&) -> tcp_client& = delete;
34
37
40 ~tcp_client() override {
41 m_net.close();
42 if(m_handler_thread.joinable()) {
43 m_handler_thread.join();
44 }
45 {
46 std::unique_lock<std::mutex> l(m_responses_mut);
47 for(auto& [request_id, action] : m_responses) {
48 set_response_value(action, std::nullopt);
49 }
50 m_responses.clear();
51 }
52 }
53
71 [[nodiscard]] auto init(std::optional<bool> error_fatal = std::nullopt)
72 -> bool {
73 if(!error_fatal) {
74 error_fatal = m_server_endpoints.size() <= 1;
75 }
76 if(!m_net.cluster_connect(m_server_endpoints,
77 error_fatal.value())) {
78 return false;
79 }
80
81 m_handler_thread = m_net.start_handler(
82 [&](network::message_t&& msg) -> std::optional<buffer> {
83 return response_handler(std::move(msg));
84 });
85
86 return true;
87 }
88
89 private:
91 std::vector<network::endpoint_t> m_server_endpoints;
92 std::thread m_handler_thread;
93
94 using raw_callback_type =
96
97 using promise_type = std::promise<std::optional<response_type>>;
98 using response_action_type
99 = std::variant<promise_type, raw_callback_type>;
100
101 std::mutex m_responses_mut;
102 std::unordered_map<request_id_type, response_action_type> m_responses;
103
104 auto send_request(cbdc::buffer request_buf,
105 request_id_type request_id,
106 response_action_type response_action) -> bool {
107 {
108 std::unique_lock<std::mutex> l(m_responses_mut);
109 assert(m_responses.find(request_id) == m_responses.end());
110 m_responses[request_id] = std::move(response_action);
111 }
112 auto pkt = std::make_shared<buffer>(std::move(request_buf));
113 return m_net.send_to_one(pkt);
114 }
115
116 void set_response_value(response_action_type& response_action,
117 std::optional<response_type> value) {
118 std::visit(overloaded{[&](promise_type& p) {
119 p.set_value(std::move(value));
120 },
121 [&](raw_callback_type& cb) {
122 cb(std::move(value));
123 }},
124 response_action);
125 }
126
127 auto call_raw(cbdc::buffer request_buf,
128 request_id_type request_id,
129 std::chrono::milliseconds timeout)
130 -> std::optional<response_type> override {
131 auto response_promise = promise_type();
132 auto response_future = response_promise.get_future();
133
134 if(!send_request(std::move(request_buf),
135 request_id,
136 std::move(response_promise))) {
137 set_response(request_id, std::nullopt);
138 return std::nullopt;
139 }
140
141 if(timeout != std::chrono::milliseconds::zero()) {
142 auto res = response_future.wait_for(timeout);
143 if(res == std::future_status::timeout) {
144 set_response(request_id, std::nullopt);
145 return std::nullopt;
146 }
147 }
148
149 return response_future.get();
150 }
151
152 auto response_handler(network::message_t&& msg)
153 -> std::optional<buffer> {
154 auto resp
156 if(resp.has_value()) {
157 set_response(resp.value().m_header.m_request_id,
158 std::move(resp.value()));
159 }
160 return std::nullopt;
161 }
162
163 void set_response(request_id_type request_id,
164 std::optional<response_type> value) {
165 auto response_node = [&]() {
166 std::unique_lock<std::mutex> l(m_responses_mut);
167 return m_responses.extract(request_id);
168 }();
169
170 if(!response_node.empty()) {
171 set_response_value(response_node.mapped(), std::move(value));
172 }
173 }
174
175 auto call_raw(cbdc::buffer request_buf,
176 request_id_type request_id,
177 raw_callback_type response_callback) -> bool override {
178 if(!send_request(std::move(request_buf),
179 request_id,
180 std::move(response_callback))) {
181 {
182 std::unique_lock<std::mutex> l(m_responses_mut);
183 m_responses.erase(request_id);
184 }
185 return false;
186 }
187
188 return true;
189 }
190 };
191}
192
193#endif
Buffer to store and retrieve byte data.
Definition buffer.hpp:15
auto start_handler(const packet_handler_t &handler) -> std::thread
Starts a thread to handle messages from established connections using the specified handler function.
auto cluster_connect(const std::vector< endpoint_t > &endpoints, bool error_fatal=true) -> bool
Establishes connections to the provided list of endpoints.
void close()
Shuts down the network listener and all existing peer connections.
auto send_to_one(const std::shared_ptr< buffer > &data) -> bool
Send the provided data to an online peer managed by this network.
Generic RPC client.
auto deserialize_response(cbdc::buffer &response_buf) -> std::optional< response_type >
Deserializes a response object from the given buffer.
std::function< void(std::optional< response_type >)> raw_callback_type
Response callback function type for handling an RPC response.
Implements an RPC client over TCP sockets.
~tcp_client() override
Destructor.
auto operator=(tcp_client &&) -> tcp_client &=delete
tcp_client(tcp_client &&)=delete
auto operator=(const tcp_client &) -> tcp_client &=delete
auto init(std::optional< bool > error_fatal=std::nullopt) -> bool
Initializes the client.
typename client< Request, Response >::response_type response_type
tcp_client(std::vector< network::endpoint_t > server_endpoints)
Constructor.
tcp_client(const tcp_client &)=delete
uint64_t request_id_type
Definition header.hpp:12
overloaded(Ts...) -> overloaded< Ts... >
Received message type.
RPC response message.