OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
node.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "node.hpp"
7
8#include <future>
9
10namespace cbdc::raft {
11 node::node(int node_id,
12 std::vector<network::endpoint_t> raft_endpoints,
13 const std::string& node_type,
14 bool blocking,
15 nuraft::ptr<nuraft::state_machine> sm,
16 size_t asio_thread_pool_size,
17 std::shared_ptr<logging::log> logger,
18 nuraft::cb_func::func_type raft_cb)
19 : m_node_id(static_cast<uint32_t>(node_id)),
20 m_blocking(blocking),
21 m_port(raft_endpoints[m_node_id].second),
22 m_raft_logger(nuraft::cs_new<console_logger>(logger)),
23 m_smgr(nuraft::cs_new<state_manager>(
24 static_cast<uint32_t>(m_node_id + 1),
25 node_type + "_raft_log_" + std::to_string(m_node_id),
26 node_type + "_raft_config_" + std::to_string(m_node_id) + ".dat",
27 node_type + "_raft_state_" + std::to_string(m_node_id) + ".dat",
28 std::move(raft_endpoints))),
29 m_sm(std::move(sm)),
30 m_log(std::move(logger)) {
31 m_asio_opt.thread_pool_size_ = asio_thread_pool_size;
32 m_init_opts.raft_callback_ = std::move(raft_cb);
33 if(m_node_id != 0) {
34 m_init_opts.skip_initial_election_timeout_ = true;
35 }
36 }
37
38 auto node::init(const nuraft::raft_params& raft_params) -> bool {
39 // Set these explicitly as they depend on the structure of this class
40 auto params = raft_params;
41 if(m_blocking) {
42 params.return_method_ = nuraft::raft_params::blocking;
43 } else {
44 params.return_method_ = nuraft::raft_params::async_handler;
45 }
46 params.auto_forwarding_ = false;
47
48 m_raft_instance = m_launcher.init(m_sm,
49 m_smgr,
50 m_raft_logger,
51 m_port,
52 m_asio_opt,
53 params,
54 m_init_opts);
55
56 if(!m_raft_instance) {
57 m_log->error("Failed to initialize raft launcher");
58 return false;
59 }
60
61 m_log->info("Waiting for raft initialization");
62 static constexpr auto wait_time = std::chrono::milliseconds(100);
63 static constexpr auto timeout = std::chrono::seconds(30);
64
65 auto start_time = std::chrono::steady_clock::now();
66
67 while(!m_raft_instance->is_initialized()) {
68 std::this_thread::sleep_for(wait_time);
69 auto elapsed_time = std::chrono::steady_clock::now() - start_time;
70
71 if(elapsed_time > timeout) {
72 m_log->error("Raft initialization timed out");
73 m_raft_instance->shutdown();
74 m_raft_instance.reset();
75 return false;
76 }
77 }
78 m_log->info("Raft initialization complete");
79
80 return true;
81 }
82
83 auto node::is_leader() const -> bool {
84 return m_raft_instance->is_leader();
85 }
86
87 auto node::replicate(nuraft::ptr<nuraft::buffer> new_log,
88 const callback_type& result_fn) const -> bool {
89 auto ret = m_raft_instance->append_entries({std::move(new_log)});
90 if(!ret->get_accepted()) {
91 return false;
92 }
93
94 if(result_fn) {
95 ret->when_ready(result_fn);
96 }
97
98 return true;
99 }
100
101 auto node::replicate_sync(const nuraft::ptr<nuraft::buffer>& new_log) const
102 -> std::optional<nuraft::ptr<nuraft::buffer>> {
103 auto ret = m_raft_instance->append_entries({new_log});
104 if(!ret->get_accepted()) {
105 return std::nullopt;
106 }
107 auto result_code = nuraft::cmd_result_code::RESULT_NOT_EXIST_YET;
108 auto blocking_promise = std::promise<void>();
109 auto blocking_future = blocking_promise.get_future();
110 ret->when_ready([&result_code,
111 &blocking_promise](raft::result_type& r,
112 nuraft::ptr<std::exception>& err) {
113 if(err) {
114 result_code = nuraft::cmd_result_code::FAILED;
115 } else {
116 result_code = r.get_result_code();
117 }
118 blocking_promise.set_value();
119 });
120 blocking_future.wait();
121 if(result_code != nuraft::cmd_result_code::OK) {
122 return std::nullopt;
123 }
124 return ret->get();
125 }
126
128 stop();
129 }
130
131 auto node::last_log_idx() const -> uint64_t {
132 return m_sm->last_commit_index();
133 }
134
135 auto node::get_sm() const -> nuraft::state_machine* {
136 return m_sm.get();
137 }
138
139 void node::stop() {
140 m_launcher.shutdown();
141 }
142}
nuraft::logger implementation using logging::log.
void stop()
Shut down the NuRaft instance.
Definition node.cpp:139
auto replicate(nuraft::ptr< nuraft::buffer > new_log, const callback_type &result_fn) const -> bool
Replicates the given log entry in the cluster.
Definition node.cpp:87
auto get_sm() const -> nuraft::state_machine *
Returns a pointer to the state machine replicated by this raft node.
Definition node.cpp:135
auto init(const nuraft::raft_params &raft_params) -> bool
Initializes the NuRaft instance with the given state machine and raft parameters.
Definition node.cpp:38
auto last_log_idx() const -> uint64_t
Returns the last replicated log index.
Definition node.cpp:131
auto replicate_sync(const nuraft::ptr< nuraft::buffer > &new_log) const -> std::optional< nuraft::ptr< nuraft::buffer > >
Replicates the provided log entry and returns the results from the state machine if the replication w...
Definition node.cpp:101
auto is_leader() const -> bool
Indicates whether this node is the current raft leader.
Definition node.cpp:83
Implementation of nuraft::state_mgr using a file.
nuraft::cmd_result< nuraft::ptr< nuraft::buffer > > result_type
A NuRaft state machine execution result.
Definition node.hpp:18
std::function< void(result_type &r, nuraft::ptr< std::exception > &err)> callback_type
Function type for raft state machine execution result callbacks.
Definition node.hpp:21
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.