28 std::vector<std::string> endpoints,
30 std::shared_ptr<logging::log> log)
31 : m_endpoints(std::move(endpoints)),
33 m_log(std::move(log)) {
36 m_ev_handler = std::make_unique<kqueue_event_handler>();
39 m_ev_handler = std::make_unique<epoll_event_handler>();
41 if(!m_ev_handler->init()) {
44 m_multi_handle = curl_multi_init();
45 curl_multi_setopt(m_multi_handle,
46 CURLMOPT_TIMERFUNCTION,
48 curl_multi_setopt(m_multi_handle, CURLMOPT_TIMERDATA,
this);
49 curl_multi_setopt(m_multi_handle,
50 CURLMOPT_SOCKETFUNCTION,
52 curl_multi_setopt(m_multi_handle, CURLMOPT_SOCKETDATA,
this);
54 = curl_slist_append(m_headers,
"Content-Type: application/json");
55 m_headers = curl_slist_append(m_headers,
"charsets: utf-8");
57 m_payload[
"jsonrpc"] =
"2.0";
61 for(
auto& [handle, t] : m_transfers) {
62 if(curl_multi_remove_handle(m_multi_handle, handle) != CURLM_OK) {
63 m_log->fatal(
"Error removing handle");
65 curl_easy_cleanup(handle);
66 t->m_cb(std::nullopt);
68 if(curl_multi_cleanup(m_multi_handle) != CURLM_OK) {
69 m_log->fatal(
"Error cleaning up multi_handle");
71 while(!m_handles.empty()) {
72 auto* handle = m_handles.front();
73 curl_easy_cleanup(handle);
76 curl_slist_free_all(m_headers);
83 if(m_handles.empty()) {
84 handle = curl_easy_init();
85 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
86 curl_easy_setopt(handle,
88 m_endpoints[m_lb_idx].c_str());
89 m_lb_idx = (m_lb_idx + 1) % m_endpoints.size();
90 curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, write_data);
91 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, m_headers);
92 curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, m_timeout);
93 curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, 3);
96 handle = m_handles.front();
100 auto it = m_transfers.emplace(handle, std::make_unique<transfer>());
101 auto& tf = it.first->second;
102 tf->m_cb = std::move(result_fn);
104 curl_easy_setopt(handle, CURLOPT_WRITEDATA, tf.get());
106 m_payload[
"method"] = method;
107 m_payload[
"params"] = std::move(params);
109 tf->m_payload = Json::writeString(m_builder, m_payload);
110 curl_easy_setopt(handle, CURLOPT_POSTFIELDS, tf->m_payload.c_str());
112 if(curl_multi_add_handle(m_multi_handle, handle) != CURLM_OK) {
113 m_log->fatal(
"Error adding handle");
131 auto maybe_events = m_ev_handler->poll();
132 if(!maybe_events.has_value()) {
133 m_log->error(
"Polling error");
136 auto& events = maybe_events.value();
142 for(
auto& [fd, is_timeout] : events) {
144 curl_multi_socket_action(m_multi_handle,
151 curl_multi_socket_action(m_multi_handle,
152 static_cast<curl_socket_t
>(fd),
159 auto* m = curl_multi_info_read(m_multi_handle, &q_depth);
164 auto it = m_transfers.extract(m->easy_handle);
166 auto& tf = it.mapped();
168 if(m->msg != CURLMSG_DONE) {
169 tf->m_cb(std::nullopt);
171 if(m->data.result != CURLE_OK) {
172 m_log->warn(
"CURL error:",
173 curl_easy_strerror(m->data.result));
174 auto* handle = m->easy_handle;
175 if(curl_multi_remove_handle(m_multi_handle, handle)
177 m_log->error(
"Error removing multi handle");
180 if(curl_multi_add_handle(m_multi_handle, handle)
182 m_log->error(
"Error adding multi handle");
185 m_transfers.insert(std::move(it));
190 curl_easy_getinfo(m->easy_handle,
191 CURLINFO_RESPONSE_CODE,
194 if(http_code / 100 != 2) {
195 m_log->warn(
"Bad return code:", http_code);
196 tf->m_cb(std::nullopt);
198 auto res = Json::Value();
199 auto r = Json::Reader();
200 auto success = r.parse(tf->m_result.str(), res,
false);
203 tf->m_cb(std::move(res));
205 m_log->warn(r.getFormattedErrorMessages(),
209 tf->m_result.str().size(),
211 tf->m_cb(std::nullopt);
216 m_handles.push(m->easy_handle);
217 if(curl_multi_remove_handle(m_multi_handle, m->easy_handle)
219 m_log->error(
"Error removing multi handle");
222 }
while(q_depth > 0);