19 std::shared_ptr<logging::log> log)
20 : m_atomizer_id(atomizer_id),
22 m_logger(std::move(log)),
23 m_raft_node(static_cast<uint32_t>(atomizer_id),
24 opts.m_atomizer_raft_endpoints,
25 m_opts.m_stxo_cache_depth,
28 [&](auto&& type, auto&& param) {
30 std::forward<
decltype(type)>(type),
31 std::forward<
decltype(param)>(param));
36 m_atomizer_network.
close();
40 if(m_tx_notify_thread.joinable()) {
41 m_tx_notify_thread.join();
44 if(m_atomizer_server.joinable()) {
45 m_atomizer_server.join();
48 if(m_main_thread.joinable()) {
52 m_notification_queue.clear();
53 for(
auto& t : m_notification_threads) {
61 if(!m_watchtower_network.cluster_connect(
62 m_opts.m_watchtower_internal_endpoints)) {
63 m_logger->warn(
"Failed to connect to watchtowers.");
66 auto raft_params = nuraft::raft_params();
67 raft_params.election_timeout_lower_bound_
68 =
static_cast<int>(m_opts.m_election_timeout_lower);
69 raft_params.election_timeout_upper_bound_
70 =
static_cast<int>(m_opts.m_election_timeout_upper);
71 raft_params.heart_beat_interval_
72 =
static_cast<int>(m_opts.m_heartbeat);
73 raft_params.snapshot_distance_
74 =
static_cast<int>(m_opts.m_snapshot_distance);
75 raft_params.max_append_size_
76 =
static_cast<int>(m_opts.m_raft_max_batch);
78 if(!m_raft_node.init(raft_params)) {
82 m_tx_notify_thread = std::thread{[&] {
86 m_main_thread = std::thread{[&] {
90 auto n_threads = std::thread::hardware_concurrency();
91 for(
size_t i = 0; i < n_threads; i++) {
92 m_notification_threads.emplace_back([&]() {
93 notification_consumer();
97 m_logger->info(
"Atomizer started...");
103 -> std::optional<cbdc::buffer> {
104 if(!m_raft_node.is_leader()) {
109 if(!maybe_req.has_value()) {
110 m_logger->error(
"Invalid request packet");
116 [&](tx_notify_request& notif) {
117 m_logger->trace(
"Received transaction notification",
120 notif.m_block_height);
121 m_notification_queue.push(notif);
123 [&](
const prune_request& p) {
124 m_raft_node.make_request(p,
nullptr);
126 [&](
const get_block_request& g) {
127 auto result_fn = [&, peer_id = pkt.m_peer_id](
129 nuraft::ptr<std::exception>& err) {
131 m_logger->error(
"Exception handling log entry:",
136 const auto res = r.get();
138 m_logger->error(
"Requested block not found.");
144 assert(maybe_resp.has_value());
145 assert(std::holds_alternative<get_block_response>(
146 maybe_resp.value()));
148 = std::get<get_block_response>(maybe_resp.value());
149 m_atomizer_network.send(resp.m_blk, peer_id);
151 m_raft_node.make_request(g, result_fn);
158 void controller::tx_notify_handler() {
161 [&,
this](
auto&& res,
auto&& err) {
162 err_return_handler(std::forward<decltype(res)>(res),
163 std::forward<decltype(err)>(err));
165 static constexpr auto batch_send_delay
166 = std::chrono::milliseconds(20);
167 std::this_thread::sleep_for(batch_send_delay);
172 void controller::main_handler() {
173 auto last_time = std::chrono::high_resolution_clock::now();
179 std::this_thread::sleep_until(next_time);
180 last_time = std::chrono::high_resolution_clock::now();
183 auto req = make_block_request();
185 = m_raft_node.
make_request(req, [&](
auto&& r,
auto&& err) {
187 std::forward<
decltype(r)>(r),
188 std::forward<
decltype(err)>(err));
190 if(!res && m_running) {
191 m_logger->error(
"Failed to make block at time",
192 last_time.time_since_epoch().count());
199 nuraft::ptr<std::exception>& err) {
204 const auto res = r.get();
207 assert(maybe_resp.has_value());
209 std::holds_alternative<make_block_response>(maybe_resp.value()));
210 auto& resp = std::get<make_block_response>(maybe_resp.value());
216 m_logger->info(
"Block h:",
219 resp.m_blk.m_transactions.size(),
225 if(!resp.m_errs.empty()) {
232 nuraft::ptr<std::exception>& err) {
234 m_logger->warn(
"Exception handling log entry:", err->what());
238 const auto res = r.get();
241 assert(maybe_resp.has_value());
242 assert(std::holds_alternative<errors>(maybe_resp.value()));
243 auto& resp = std::get<errors>(maybe_resp.value());
248 auto controller::raft_callback(nuraft::cb_func::Type type,
249 nuraft::cb_func::Param* )
250 -> nuraft::cb_func::ReturnCode {
251 if(type == nuraft::cb_func::Type::BecomeFollower) {
254 m_atomizer_network.close();
255 if(m_atomizer_server.joinable()) {
256 m_atomizer_server.join();
258 m_logger->debug(
"Became follower, stopped listening");
259 }
else if(type == nuraft::cb_func::Type::BecomeLeader) {
262 m_atomizer_network.close();
263 if(m_atomizer_server.joinable()) {
264 m_atomizer_server.join();
267 m_atomizer_network.reset();
270 auto as = m_atomizer_network.start_server(
271 m_opts.m_atomizer_endpoints[m_atomizer_id],
273 return server_handler(std::forward<decltype(pkt)>(pkt));
276 if(!as.has_value()) {
277 m_logger->fatal(
"Failed to establish atomizer server.");
279 m_atomizer_server = std::move(as.value());
280 m_logger->debug(
"Became leader, started listening");
282 return nuraft::cb_func::ReturnCode::Ok;
285 void controller::notification_consumer() {
287 auto notif = tx_notify_request();
288 auto popped = m_notification_queue.pop(notif);
void tx_notify(tx_notify_request &¬if)
Add the given transaction notification to the set of pending notifications.
auto send_complete_txs(const raft::callback_type &result_fn) -> bool
Replicate a transaction notification command in the state machine containing the current set of compl...
auto make_request(const state_machine::request &r, const raft::callback_type &result_fn) -> bool
Serialize and replicate the given request in the atomizer raft cluster.
auto tx_notify_count() -> uint64_t
Return the number of transaction notifications handled by the state machine.
auto init() -> bool
Initializes the controller.
void close()
Shuts down the network listener and all existing peer connections.
void broadcast(const std::shared_ptr< buffer > &data)
Sends the provided data to all added peers.
void stop()
Shut down the NuRaft instance.
auto last_log_idx() const -> uint64_t
Returns the last replicated log index.
auto is_leader() const -> bool
Indicates whether this node is the current raft leader.
nuraft::cmd_result< nuraft::ptr< nuraft::buffer > > result_type
A NuRaft state machine execution result.
overloaded(Ts...) -> overloaded< Ts... >
auto from_buffer(nuraft::buffer &buf) -> std::optional< T >
Deserialize object of given type from a nuraft::buffer.
auto make_shared_buffer(const T &obj) -> std::shared_ptr< cbdc::buffer >
Serialize object into std::shared_ptr<cbdc::buffer> using a cbdc::buffer_serializer.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
Project-wide configuration options.
size_t m_target_block_interval
Target block creation interval in the atomizer in milliseconds.