27 explicit tcp_client(std::vector<network::endpoint_t> server_endpoints)
28 : m_server_endpoints(std::move(server_endpoints)) {}
42 if(m_handler_thread.joinable()) {
43 m_handler_thread.join();
46 std::unique_lock<std::mutex> l(m_responses_mut);
47 for(
auto& [request_id, action] : m_responses) {
48 set_response_value(action, std::nullopt);
71 [[nodiscard]]
auto init(std::optional<bool> error_fatal = std::nullopt)
74 error_fatal = m_server_endpoints.size() <= 1;
77 error_fatal.value())) {
83 return response_handler(std::move(msg));
91 std::vector<network::endpoint_t> m_server_endpoints;
92 std::thread m_handler_thread;
94 using raw_callback_type =
97 using promise_type = std::promise<std::optional<response_type>>;
98 using response_action_type
99 = std::variant<promise_type, raw_callback_type>;
101 std::mutex m_responses_mut;
102 std::unordered_map<request_id_type, response_action_type> m_responses;
106 response_action_type response_action) ->
bool {
108 std::unique_lock<std::mutex> l(m_responses_mut);
109 assert(m_responses.find(request_id) == m_responses.end());
110 m_responses[request_id] = std::move(response_action);
112 auto pkt = std::make_shared<buffer>(std::move(request_buf));
116 void set_response_value(response_action_type& response_action,
117 std::optional<response_type> value) {
119 p.set_value(std::move(value));
121 [&](raw_callback_type& cb) {
122 cb(std::move(value));
129 std::chrono::milliseconds timeout)
130 -> std::optional<response_type>
override {
131 auto response_promise = promise_type();
132 auto response_future = response_promise.get_future();
134 if(!send_request(std::move(request_buf),
136 std::move(response_promise))) {
137 set_response(request_id, std::nullopt);
141 if(timeout != std::chrono::milliseconds::zero()) {
142 auto res = response_future.wait_for(timeout);
143 if(res == std::future_status::timeout) {
144 set_response(request_id, std::nullopt);
149 return response_future.get();
152 auto response_handler(network::message_t&& msg)
153 -> std::optional<buffer> {
156 if(resp.has_value()) {
157 set_response(resp.value().m_header.m_request_id,
158 std::move(resp.value()));
164 std::optional<response_type> value) {
165 auto response_node = [&]() {
166 std::unique_lock<std::mutex> l(m_responses_mut);
167 return m_responses.extract(request_id);
170 if(!response_node.empty()) {
171 set_response_value(response_node.mapped(), std::move(value));
177 raw_callback_type response_callback) ->
bool override {
178 if(!send_request(std::move(request_buf),
180 std::move(response_callback))) {
182 std::unique_lock<std::mutex> l(m_responses_mut);
183 m_responses.erase(request_id);