OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
uhs/atomizer/atomizer/controller.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "controller.hpp"
7
8#include "atomizer_raft.hpp"
9#include "format.hpp"
11#include "util/raft/util.hpp"
13
14#include <utility>
15
16namespace cbdc::atomizer {
17 controller::controller(uint32_t atomizer_id,
18 const config::options& opts,
19 std::shared_ptr<logging::log> log)
20 : m_atomizer_id(atomizer_id),
21 m_opts(opts),
22 m_logger(std::move(log)),
23 m_raft_node(static_cast<uint32_t>(atomizer_id),
24 opts.m_atomizer_raft_endpoints,
25 m_opts.m_stxo_cache_depth,
26 m_logger,
27 m_opts,
28 [&](auto&& type, auto&& param) {
29 return raft_callback(
30 std::forward<decltype(type)>(type),
31 std::forward<decltype(param)>(param));
32 }) {}
33
35 m_raft_node.stop();
36 m_atomizer_network.close();
37
38 m_running = false;
39
40 if(m_tx_notify_thread.joinable()) {
41 m_tx_notify_thread.join();
42 }
43
44 if(m_atomizer_server.joinable()) {
45 m_atomizer_server.join();
46 }
47
48 if(m_main_thread.joinable()) {
49 m_main_thread.join();
50 }
51
52 m_notification_queue.clear();
53 for(auto& t : m_notification_threads) {
54 if(t.joinable()) {
55 t.join();
56 }
57 }
58 }
59
60 auto controller::init() -> bool {
61 if(!m_watchtower_network.cluster_connect(
62 m_opts.m_watchtower_internal_endpoints)) {
63 m_logger->warn("Failed to connect to watchtowers.");
64 }
65
66 auto raft_params = nuraft::raft_params();
67 raft_params.election_timeout_lower_bound_
68 = static_cast<int>(m_opts.m_election_timeout_lower);
69 raft_params.election_timeout_upper_bound_
70 = static_cast<int>(m_opts.m_election_timeout_upper);
71 raft_params.heart_beat_interval_
72 = static_cast<int>(m_opts.m_heartbeat);
73 raft_params.snapshot_distance_
74 = static_cast<int>(m_opts.m_snapshot_distance);
75 raft_params.max_append_size_
76 = static_cast<int>(m_opts.m_raft_max_batch);
77
78 if(!m_raft_node.init(raft_params)) {
79 return false;
80 }
81
82 m_tx_notify_thread = std::thread{[&] {
83 tx_notify_handler();
84 }};
85
86 m_main_thread = std::thread{[&] {
87 main_handler();
88 }};
89
90 auto n_threads = std::thread::hardware_concurrency();
91 for(size_t i = 0; i < n_threads; i++) {
92 m_notification_threads.emplace_back([&]() {
93 notification_consumer();
94 });
95 }
96
97 m_logger->info("Atomizer started...");
98
99 return true;
100 }
101
102 auto controller::server_handler(cbdc::network::message_t&& pkt)
103 -> std::optional<cbdc::buffer> {
104 if(!m_raft_node.is_leader()) {
105 return std::nullopt;
106 }
107
108 auto maybe_req = from_buffer<request>(*pkt.m_pkt);
109 if(!maybe_req.has_value()) {
110 m_logger->error("Invalid request packet");
111 return std::nullopt;
112 }
113
114 std::visit(
116 [&](tx_notify_request& notif) {
117 m_logger->trace("Received transaction notification",
118 to_string(notif.m_tx.m_id),
119 "with height",
120 notif.m_block_height);
121 m_notification_queue.push(notif);
122 },
123 [&](const prune_request& p) {
124 m_raft_node.make_request(p, nullptr);
125 },
126 [&](const get_block_request& g) {
127 auto result_fn = [&, peer_id = pkt.m_peer_id](
129 nuraft::ptr<std::exception>& err) {
130 if(err) {
131 m_logger->error("Exception handling log entry:",
132 err->what());
133 return;
134 }
135
136 const auto res = r.get();
137 if(!res) {
138 m_logger->error("Requested block not found.");
139 return;
140 }
141
142 auto maybe_resp
144 assert(maybe_resp.has_value());
145 assert(std::holds_alternative<get_block_response>(
146 maybe_resp.value()));
147 auto& resp
148 = std::get<get_block_response>(maybe_resp.value());
149 m_atomizer_network.send(resp.m_blk, peer_id);
150 };
151 m_raft_node.make_request(g, result_fn);
152 }},
153 maybe_req.value());
154
155 return std::nullopt;
156 }
157
158 void controller::tx_notify_handler() {
159 while(m_running) {
160 if(!m_raft_node.send_complete_txs(
161 [&, this](auto&& res, auto&& err) {
162 err_return_handler(std::forward<decltype(res)>(res),
163 std::forward<decltype(err)>(err));
164 })) {
165 static constexpr auto batch_send_delay
166 = std::chrono::milliseconds(20);
167 std::this_thread::sleep_for(batch_send_delay);
168 }
169 }
170 }
171
172 void controller::main_handler() {
173 auto last_time = std::chrono::high_resolution_clock::now();
174
175 while(m_running) {
176 const auto next_time
177 = last_time
178 + std::chrono::milliseconds(m_opts.m_target_block_interval);
179 std::this_thread::sleep_until(next_time);
180 last_time = std::chrono::high_resolution_clock::now();
181
182 if(m_raft_node.is_leader()) {
183 auto req = make_block_request();
184 auto res
185 = m_raft_node.make_request(req, [&](auto&& r, auto&& err) {
186 raft_result_handler(
187 std::forward<decltype(r)>(r),
188 std::forward<decltype(err)>(err));
189 });
190 if(!res && m_running) {
191 m_logger->error("Failed to make block at time",
192 last_time.time_since_epoch().count());
193 }
194 }
195 }
196 }
197
198 void controller::raft_result_handler(raft::result_type& r,
199 nuraft::ptr<std::exception>& err) {
200 if(err) {
201 return;
202 }
203
204 const auto res = r.get();
205 assert(res);
206 auto maybe_resp = from_buffer<state_machine::response>(*res);
207 assert(maybe_resp.has_value());
208 assert(
209 std::holds_alternative<make_block_response>(maybe_resp.value()));
210 auto& resp = std::get<make_block_response>(maybe_resp.value());
211
212 auto blk_pkt = make_shared_buffer(resp.m_blk);
213
214 m_atomizer_network.broadcast(blk_pkt);
215
216 m_logger->info("Block h:",
217 resp.m_blk.m_height,
218 ", nTXs:",
219 resp.m_blk.m_transactions.size(),
220 ", log idx:",
221 m_raft_node.last_log_idx(),
222 ", notifications:",
223 m_raft_node.tx_notify_count());
224
225 if(!resp.m_errs.empty()) {
226 auto buf = make_shared_buffer(resp.m_errs);
227 m_watchtower_network.broadcast(buf);
228 }
229 }
230
231 void controller::err_return_handler(raft::result_type& r,
232 nuraft::ptr<std::exception>& err) {
233 if(err) {
234 m_logger->warn("Exception handling log entry:", err->what());
235 return;
236 }
237
238 const auto res = r.get();
239 if(res) {
240 auto maybe_resp = from_buffer<state_machine::response>(*res);
241 assert(maybe_resp.has_value());
242 assert(std::holds_alternative<errors>(maybe_resp.value()));
243 auto& resp = std::get<errors>(maybe_resp.value());
244 m_watchtower_network.broadcast(resp);
245 }
246 }
247
248 auto controller::raft_callback(nuraft::cb_func::Type type,
249 nuraft::cb_func::Param* /* param */)
250 -> nuraft::cb_func::ReturnCode {
251 if(type == nuraft::cb_func::Type::BecomeFollower) {
252 // We became a follower so shutdown the client network handler and
253 // stop listening.
254 m_atomizer_network.close();
255 if(m_atomizer_server.joinable()) {
256 m_atomizer_server.join();
257 }
258 m_logger->debug("Became follower, stopped listening");
259 } else if(type == nuraft::cb_func::Type::BecomeLeader) {
260 // We became the leader. Ensure the previous handler thread is
261 // stopped and network shut down.
262 m_atomizer_network.close();
263 if(m_atomizer_server.joinable()) {
264 m_atomizer_server.join();
265 }
266 // Reset the client network so we can use it again.
267 m_atomizer_network.reset();
268 // Start listening on our client endpoint and start the handler
269 // thread.
270 auto as = m_atomizer_network.start_server(
271 m_opts.m_atomizer_endpoints[m_atomizer_id],
272 [&](auto&& pkt) {
273 return server_handler(std::forward<decltype(pkt)>(pkt));
274 });
275
276 if(!as.has_value()) {
277 m_logger->fatal("Failed to establish atomizer server.");
278 }
279 m_atomizer_server = std::move(as.value());
280 m_logger->debug("Became leader, started listening");
281 }
282 return nuraft::cb_func::ReturnCode::Ok;
283 }
284
285 void controller::notification_consumer() {
286 while(m_running) {
287 auto notif = tx_notify_request();
288 auto popped = m_notification_queue.pop(notif);
289 if(!popped) {
290 break;
291 }
292 m_raft_node.tx_notify(std::move(notif));
293 }
294 }
295}
void tx_notify(tx_notify_request &&notif)
Add the given transaction notification to the set of pending notifications.
auto send_complete_txs(const raft::callback_type &result_fn) -> bool
Replicate a transaction notification command in the state machine containing the current set of compl...
auto make_request(const state_machine::request &r, const raft::callback_type &result_fn) -> bool
Serialize and replicate the given request in the atomizer raft cluster.
auto tx_notify_count() -> uint64_t
Return the number of transaction notifications handled by the state machine.
auto init() -> bool
Initializes the controller.
void close()
Shuts down the network listener and all existing peer connections.
void broadcast(const std::shared_ptr< buffer > &data)
Sends the provided data to all added peers.
void stop()
Shut down the NuRaft instance.
Definition node.cpp:139
auto last_log_idx() const -> uint64_t
Returns the last replicated log index.
Definition node.cpp:131
auto is_leader() const -> bool
Indicates whether this node is the current raft leader.
Definition node.cpp:83
nuraft::cmd_result< nuraft::ptr< nuraft::buffer > > result_type
A NuRaft state machine execution result.
Definition node.hpp:18
overloaded(Ts...) -> overloaded< Ts... >
auto from_buffer(nuraft::buffer &buf) -> std::optional< T >
Deserialize object of given type from a nuraft::buffer.
auto make_shared_buffer(const T &obj) -> std::shared_ptr< cbdc::buffer >
Serialize object into std::shared_ptr<cbdc::buffer> using a cbdc::buffer_serializer.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
Project-wide configuration options.
Definition config.hpp:132
size_t m_target_block_interval
Target block creation interval in the atomizer in milliseconds.
Definition config.hpp:169
Received message type.