OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
json_rpc_http_server.cpp
Go to the documentation of this file.
1// Copyright (c) 2022 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
7
8#include <arpa/inet.h>
9#include <iostream>
10#include <thread>
11
12namespace cbdc::rpc {
14 bool enable_cors)
15 : m_host(endpoint.first),
16 m_port(endpoint.second),
17 m_enable_cors(enable_cors) {}
18
20 // Set running flag
21 m_running = false;
22
23 // Stop accepting incoming connections
24 auto sock = MHD_quiesce_daemon(m_daemon);
25
26 // Wait for existing connections to drain
27 for(;;) {
28 const auto* inf
29 = MHD_get_daemon_info(m_daemon,
30 MHD_DAEMON_INFO_CURRENT_CONNECTIONS);
31 if(inf->num_connections == 0) {
32 break;
33 }
34 // std::cout << "Waiting for " << inf->num_connections << "
35 // connections to close" << std::endl;
36 constexpr auto wait_time = std::chrono::milliseconds(100);
37 std::this_thread::sleep_for(wait_time);
38 }
39
40 // std::cout << "All connections closed" << std::endl;
41
42 // Stop HTTP daemon
43 MHD_stop_daemon(m_daemon);
44
45 // Close listening socket
46 if(sock != -1) {
47 close(sock);
48 }
49 }
50
52 auto has_epoll
53 = (MHD_is_feature_supported(MHD_FEATURE_EPOLL) == MHD_YES);
54 auto use_flag
55 = has_epoll ? MHD_USE_EPOLL_INTERNALLY : MHD_USE_POLL_INTERNALLY;
56 auto connection_limit = has_epoll ? 65536 : FD_SETSIZE - 4;
57 auto addr = sockaddr_in{};
58 addr.sin_family = AF_INET;
59 addr.sin_port = htons(m_port);
60 inet_aton(m_host.c_str(),
61 reinterpret_cast<in_addr*>(&addr.sin_addr.s_addr));
62 m_daemon = MHD_start_daemon(use_flag | MHD_ALLOW_SUSPEND_RESUME
63 | MHD_USE_DEBUG,
64 m_port,
65 nullptr,
66 nullptr,
67 callback,
68 this,
69 MHD_OPTION_NOTIFY_COMPLETED,
70 request_complete,
71 this,
72 MHD_OPTION_THREAD_POOL_SIZE,
73 std::thread::hardware_concurrency(),
74 MHD_OPTION_CONNECTION_TIMEOUT,
75 3,
76 MHD_OPTION_CONNECTION_LIMIT,
77 connection_limit,
78 MHD_OPTION_SOCK_ADDR,
79 &addr,
80 MHD_OPTION_END);
81 return m_daemon != nullptr;
82 }
83
84 auto json_rpc_http_server::callback(void* cls,
85 struct MHD_Connection* connection,
86 const char* /* url */,
87 const char* method,
88 const char* /* version */,
89 const char* upload_data,
90 size_t* upload_data_size,
91 void** con_cls) -> MHD_Result {
92 if(*con_cls == nullptr) {
93 auto new_req = std::make_unique<request>();
94 new_req->m_connection = connection;
95 auto* server = static_cast<json_rpc_http_server*>(cls);
96 new_req->m_server = server;
97 new_req->m_origin = MHD_lookup_connection_value(connection,
98 MHD_HEADER_KIND,
99 "Origin");
100 *con_cls = new_req.get();
101 {
102 std::unique_lock l(server->m_requests_mut);
103 server->m_requests.emplace(new_req.get(), std::move(new_req));
104 }
105 // server->m_requests_started++;
106 return MHD_YES;
107 }
108
109 auto* req = static_cast<request*>(*con_cls);
110
111 // cors response if enabled
112 if(method == std::string("OPTIONS") && req->m_server->m_enable_cors) {
113 send_cors_response(req);
114 return MHD_YES;
115 }
116
117 if(method != std::string("POST")) {
118 req->m_code = MHD_HTTP_METHOD_NOT_ALLOWED;
119 send_response("HTTP method not allowed", req);
120 return MHD_YES;
121 }
122
123 if(*upload_data_size != 0) {
124 req->m_request.write(
125 upload_data,
126 static_cast<std::streamsize>(*upload_data_size));
127 *upload_data_size = 0;
128 return MHD_YES;
129 }
130
131 if(!req->m_server->m_running) {
132 req->m_code = MHD_HTTP_SERVICE_UNAVAILABLE;
133 send_response("Server is shutting down", req);
134 return MHD_NO;
135 }
136
137 // std::cout << "Handling HTTP request with m_running " <<
138 // req->m_server->m_running << std::endl;
139 auto success = req->m_server->handle_request(req);
140 if(!success) {
141 req->m_code = MHD_HTTP_BAD_REQUEST;
142 send_response("Invalid request payload", req);
143 }
144
145 return MHD_YES;
146 }
147
148 auto json_rpc_http_server::send_cors_response(request* request_info)
149 -> bool {
150 std::string response = "";
151 auto* result = MHD_create_response_from_buffer(
152 response.size(),
153 static_cast<void*>(response.data()),
154 MHD_RESPMEM_MUST_COPY);
155 if(!request_info->m_origin) {
156 request_info->m_origin = "*";
157 }
158 MHD_add_response_header(result,
159 "Access-Control-Allow-Origin",
160 request_info->m_origin);
161
162 MHD_add_response_header(result,
163 "Access-Control-Allow-Methods",
164 "POST");
165 MHD_add_response_header(result,
166 "Access-Control-Allow-Headers",
167 "Content-Type");
168 MHD_add_response_header(result, "Access-Control-Max-Age", "600");
169 MHD_add_response_header(result, "Vary", "Origin");
170 MHD_add_response_header(result,
171 "Vary",
172 "Access-Control-Request-Method");
173 MHD_add_response_header(result,
174 "Vary",
175 "Access-Control-Request-Headers");
176 auto ret = MHD_queue_response(request_info->m_connection, 200, result);
177 MHD_destroy_response(result);
178 const auto* inf = MHD_get_connection_info(
179 request_info->m_connection,
180 MHD_CONNECTION_INFO_CONNECTION_SUSPENDED);
181 if(inf->suspended == MHD_YES) {
182 MHD_resume_connection(request_info->m_connection);
183 }
184 return ret == MHD_YES;
185 }
186
187 auto json_rpc_http_server::send_response(std::string response,
188 request* request_info) -> bool {
189 auto* result = MHD_create_response_from_buffer(
190 response.size(),
191 static_cast<void*>(response.data()),
192 MHD_RESPMEM_MUST_COPY);
193
194 if(request_info->m_server->m_enable_cors) {
195 if(!request_info->m_origin) {
196 request_info->m_origin = "*";
197 }
198 MHD_add_response_header(result,
199 "Access-Control-Allow-Origin",
200 request_info->m_origin);
201 MHD_add_response_header(result, "Vary", "Origin");
202 }
203
204 MHD_add_response_header(result, "Content-Type", "application/json");
205 auto ret = MHD_queue_response(request_info->m_connection,
206 request_info->m_code,
207 result);
208 MHD_destroy_response(result);
209 const auto* inf = MHD_get_connection_info(
210 request_info->m_connection,
211 MHD_CONNECTION_INFO_CONNECTION_SUSPENDED);
212 if(inf->suspended == MHD_YES) {
213 MHD_resume_connection(request_info->m_connection);
214 }
215 return ret == MHD_YES;
216 }
217
218 auto json_rpc_http_server::handle_request(request* request_info) -> bool {
219 auto req = Json::Value();
220 auto r = Json::Reader();
221 auto success = r.parse(request_info->m_request.str(), req, false);
222 if(!success) {
223 return false;
224 }
225
226 if(!req.isMember("method")) {
227 return false;
228 }
229
230 if(!req["method"].isString()) {
231 return false;
232 }
233
234 auto method = req["method"].asString();
235 auto params = Json::Value();
236
237 if(req.isMember("params")) {
238 params = req["params"];
239 }
240
241 uint64_t id = 0;
242 if(req["id"].isUInt64()) {
243 id = req["id"].asUInt64();
244 }
245
246 MHD_suspend_connection(request_info->m_connection);
247
248 auto maybe_sent
249 = m_cb(method,
250 params,
251 [this, request_info, id](std::optional<Json::Value> resp) {
252 handle_response(id, request_info, std::move(resp));
253 });
254 return maybe_sent;
255 }
256
257 void
258 json_rpc_http_server::handle_response(uint64_t id,
259 request* request_info,
260 std::optional<Json::Value> resp) {
261 if(!resp.has_value()) {
262 request_info->m_code = MHD_HTTP_INTERNAL_SERVER_ERROR;
263 request_info->m_server->send_response("Error processing request",
264 request_info);
265 // std::cout << "request processing error" << std::endl;
266 return;
267 }
268
269 auto resp_payload = resp.value();
270 resp_payload["jsonrpc"] = "2.0";
271 resp_payload["id"] = id;
272
273 request_info->m_code = MHD_HTTP_OK;
274 auto resp_str = Json::writeString(m_builder, resp_payload);
275 request_info->m_server->send_response(resp_str, request_info);
276 }
277
279 handler_callback_type handler_callback) {
280 m_cb = std::move(handler_callback);
281 }
282
283 void json_rpc_http_server::request_complete(
284 void* cls,
285 struct MHD_Connection* /* connection */,
286 void** con_cls,
287 MHD_RequestTerminationCode /* toe */) {
288 if(*con_cls == nullptr) {
289 return;
290 }
291 auto* req = static_cast<request*>(*con_cls);
292 auto* server = static_cast<json_rpc_http_server*>(cls);
293 {
294 std::unique_lock l(server->m_requests_mut);
295 server->m_requests.erase(req);
296 // std::cout << "waiting on requests " << server->m_requests.size()
297 // << " " << server->m_requests_started << std::endl;
298 }
299 }
300}
Asynchrounous HTTP JSON-RPC server implemented using libmicrohttpd and libjsoncpp.
json_rpc_http_server(network::endpoint_t endpoint, bool enable_cors=false)
Construct a new server.
void register_handler_callback(handler_callback_type handler_callback)
Register the application request handler function with the server.
std::function< bool(std::string, Json::Value, result_callback_type)> handler_callback_type
Callback function type provided by the application for processing requests.
auto init() -> bool
Start listening for incoming connections and processing requests.
Generic RPC server.
std::pair< ip_address, port_number_t > endpoint_t
[host name, port number].
Definition socket.hpp:19
interface::exec_return_type response
Agent RPC response type.