OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
json_rpc_http_client.cpp
Go to the documentation of this file.
1// Copyright (c) 2022 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
7
8#ifdef __APPLE__
10#endif
11
12#ifdef __linux__
14#endif
15
16#include <cassert>
17
18namespace cbdc::rpc {
20 curl_global_init(CURL_GLOBAL_ALL);
21 }
22
24 curl_global_cleanup();
25 }
26
28 std::vector<std::string> endpoints,
29 long timeout,
30 std::shared_ptr<logging::log> log)
31 : m_endpoints(std::move(endpoints)),
32 m_timeout(timeout),
33 m_log(std::move(log)) {
34// TODO: find a way to do this without the preprocessor
35#ifdef __APPLE__
36 m_ev_handler = std::make_unique<kqueue_event_handler>();
37#endif
38#ifdef __linux__
39 m_ev_handler = std::make_unique<epoll_event_handler>();
40#endif
41 if(!m_ev_handler->init()) {
42 return;
43 }
44 m_multi_handle = curl_multi_init();
45 curl_multi_setopt(m_multi_handle,
46 CURLMOPT_TIMERFUNCTION,
47 timer_callback);
48 curl_multi_setopt(m_multi_handle, CURLMOPT_TIMERDATA, this);
49 curl_multi_setopt(m_multi_handle,
50 CURLMOPT_SOCKETFUNCTION,
51 socket_callback);
52 curl_multi_setopt(m_multi_handle, CURLMOPT_SOCKETDATA, this);
53 m_headers
54 = curl_slist_append(m_headers, "Content-Type: application/json");
55 m_headers = curl_slist_append(m_headers, "charsets: utf-8");
56 m_payload["id"] = 1;
57 m_payload["jsonrpc"] = "2.0";
58 }
59
61 for(auto& [handle, t] : m_transfers) {
62 if(curl_multi_remove_handle(m_multi_handle, handle) != CURLM_OK) {
63 m_log->fatal("Error removing handle");
64 }
65 curl_easy_cleanup(handle);
66 t->m_cb(std::nullopt);
67 }
68 if(curl_multi_cleanup(m_multi_handle) != CURLM_OK) {
69 m_log->fatal("Error cleaning up multi_handle");
70 }
71 while(!m_handles.empty()) {
72 auto* handle = m_handles.front();
73 curl_easy_cleanup(handle);
74 m_handles.pop();
75 }
76 curl_slist_free_all(m_headers);
77 }
78
79 void json_rpc_http_client::call(const std::string& method,
80 Json::Value params,
81 callback_type result_fn) {
82 CURL* handle{};
83 if(m_handles.empty()) {
84 handle = curl_easy_init();
85 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
86 curl_easy_setopt(handle,
87 CURLOPT_URL,
88 m_endpoints[m_lb_idx].c_str());
89 m_lb_idx = (m_lb_idx + 1) % m_endpoints.size();
90 curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, write_data);
91 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, m_headers);
92 curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, m_timeout);
93 curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, 3);
94 // curl_easy_setopt(handle, CURLOPT_TIMEOUT, 5);
95 } else {
96 handle = m_handles.front();
97 m_handles.pop();
98 }
99
100 auto it = m_transfers.emplace(handle, std::make_unique<transfer>());
101 auto& tf = it.first->second;
102 tf->m_cb = std::move(result_fn);
103
104 curl_easy_setopt(handle, CURLOPT_WRITEDATA, tf.get());
105
106 m_payload["method"] = method;
107 m_payload["params"] = std::move(params);
108
109 tf->m_payload = Json::writeString(m_builder, m_payload);
110 curl_easy_setopt(handle, CURLOPT_POSTFIELDS, tf->m_payload.c_str());
111
112 if(curl_multi_add_handle(m_multi_handle, handle) != CURLM_OK) {
113 m_log->fatal("Error adding handle");
114 }
115
116 // m_requests_started++;
117 // m_log->trace("requests started:", m_requests_started);
118 }
119
120 auto json_rpc_http_client::write_data(void* ptr,
121 size_t size,
122 size_t nmemb,
123 struct transfer* t) -> size_t {
124 auto total_sz = size * nmemb;
125 t->m_result.write(static_cast<char*>(ptr),
126 static_cast<std::streamsize>(total_sz));
127 return total_sz;
128 }
129
131 auto maybe_events = m_ev_handler->poll();
132 if(!maybe_events.has_value()) {
133 m_log->error("Polling error");
134 return false;
135 }
136 auto& events = maybe_events.value();
137 if(events.empty()) {
138 return true;
139 }
140
141 int running{};
142 for(auto& [fd, is_timeout] : events) {
143 if(is_timeout) {
144 curl_multi_socket_action(m_multi_handle,
145 CURL_SOCKET_TIMEOUT,
146 0,
147 &running);
148 continue;
149 }
150
151 curl_multi_socket_action(m_multi_handle,
152 static_cast<curl_socket_t>(fd),
153 0,
154 &running);
155 }
156
157 int q_depth{};
158 do {
159 auto* m = curl_multi_info_read(m_multi_handle, &q_depth);
160 if(m == nullptr) {
161 break;
162 }
163
164 auto it = m_transfers.extract(m->easy_handle);
165 assert(!it.empty());
166 auto& tf = it.mapped();
167
168 if(m->msg != CURLMSG_DONE) {
169 tf->m_cb(std::nullopt);
170 } else {
171 if(m->data.result != CURLE_OK) {
172 m_log->warn("CURL error:",
173 curl_easy_strerror(m->data.result));
174 auto* handle = m->easy_handle;
175 if(curl_multi_remove_handle(m_multi_handle, handle)
176 != CURLM_OK) {
177 m_log->error("Error removing multi handle");
178 return false;
179 }
180 if(curl_multi_add_handle(m_multi_handle, handle)
181 != CURLM_OK) {
182 m_log->error("Error adding multi handle");
183 return false;
184 }
185 m_transfers.insert(std::move(it));
186 continue;
187 }
188
189 long http_code = 0;
190 curl_easy_getinfo(m->easy_handle,
191 CURLINFO_RESPONSE_CODE,
192 &http_code);
193
194 if(http_code / 100 != 2) {
195 m_log->warn("Bad return code:", http_code);
196 tf->m_cb(std::nullopt);
197 } else {
198 auto res = Json::Value();
199 auto r = Json::Reader();
200 auto success = r.parse(tf->m_result.str(), res, false);
201 if(success) {
202 // TODO: sanity check the value of res
203 tf->m_cb(std::move(res));
204 } else {
205 m_log->warn(r.getFormattedErrorMessages(),
206 "res:",
207 tf->m_result.str(),
208 "(",
209 tf->m_result.str().size(),
210 ")");
211 tf->m_cb(std::nullopt);
212 }
213 }
214 }
215
216 m_handles.push(m->easy_handle);
217 if(curl_multi_remove_handle(m_multi_handle, m->easy_handle)
218 != CURLM_OK) {
219 m_log->error("Error removing multi handle");
220 return false;
221 }
222 } while(q_depth > 0);
223
224 return true;
225 }
226
227 auto json_rpc_http_client::socket_callback(CURL* /* handle */,
228 curl_socket_t s,
229 int what,
231 void* /* socketp */) -> int {
233 switch(what) {
234 case CURL_POLL_REMOVE:
236 break;
237 case CURL_POLL_INOUT:
239 break;
240 case CURL_POLL_IN:
242 break;
243 case CURL_POLL_OUT:
245 break;
246 }
247
248 c->m_ev_handler->register_fd(s, et);
249
250 return 0;
251 }
252
253 auto json_rpc_http_client::timer_callback(CURLM* /* multi_handle */,
254 long timeout_ms,
255 json_rpc_http_client* c) -> int {
256 c->m_ev_handler->set_timeout(timeout_ms);
257 return 0;
258 }
259}
~curl_initializer()
Deinitializes libcurl.
curl_initializer()
Initializes libcurl.
event_type
Type of event to register interest in.
@ remove
Remove file descriptor.
@ inout
Read and write events.
Asynchronous HTTP JSON-RPC client implemented using libcurl.
void call(const std::string &method, Json::Value params, callback_type result_fn)
Calls the requested JSON-RPC method with the given parameters and returns the response asynchronously...
std::function< void(std::optional< Json::Value >)> callback_type
Type alias for the response callback function.
std::shared_ptr< cbdc::logging::log > m_log
~json_rpc_http_client()
Cancels any existing requests and stops the client.
auto pump() -> bool
Process events raised by the underlying libcurl implementation.
json_rpc_http_client(std::vector< std::string > endpoints, long timeout, std::shared_ptr< logging::log > log)
Construct a new client.