12 std::vector<network::endpoint_t> raft_endpoints,
13 const std::string& node_type,
15 nuraft::ptr<nuraft::state_machine> sm,
16 size_t asio_thread_pool_size,
17 std::shared_ptr<logging::log> logger,
18 nuraft::cb_func::func_type raft_cb)
19 : m_node_id(static_cast<uint32_t>(node_id)),
21 m_port(raft_endpoints[m_node_id].second),
24 static_cast<uint32_t>(m_node_id + 1),
25 node_type +
"_raft_log_" + std::
to_string(m_node_id),
26 node_type +
"_raft_config_" + std::
to_string(m_node_id) +
".dat",
27 node_type +
"_raft_state_" + std::
to_string(m_node_id) +
".dat",
28 std::move(raft_endpoints))),
30 m_log(std::move(logger)) {
31 m_asio_opt.thread_pool_size_ = asio_thread_pool_size;
32 m_init_opts.raft_callback_ = std::move(raft_cb);
34 m_init_opts.skip_initial_election_timeout_ =
true;
38 auto node::init(
const nuraft::raft_params& raft_params) ->
bool {
40 auto params = raft_params;
42 params.return_method_ = nuraft::raft_params::blocking;
44 params.return_method_ = nuraft::raft_params::async_handler;
46 params.auto_forwarding_ =
false;
48 m_raft_instance = m_launcher.init(m_sm,
56 if(!m_raft_instance) {
57 m_log->error(
"Failed to initialize raft launcher");
61 m_log->info(
"Waiting for raft initialization");
62 static constexpr auto wait_time = std::chrono::milliseconds(100);
63 static constexpr auto timeout = std::chrono::seconds(30);
65 auto start_time = std::chrono::steady_clock::now();
67 while(!m_raft_instance->is_initialized()) {
68 std::this_thread::sleep_for(wait_time);
69 auto elapsed_time = std::chrono::steady_clock::now() - start_time;
71 if(elapsed_time > timeout) {
72 m_log->error(
"Raft initialization timed out");
73 m_raft_instance->shutdown();
74 m_raft_instance.reset();
78 m_log->info(
"Raft initialization complete");
84 return m_raft_instance->is_leader();
89 auto ret = m_raft_instance->append_entries({std::move(new_log)});
90 if(!ret->get_accepted()) {
95 ret->when_ready(result_fn);
102 -> std::optional<nuraft::ptr<nuraft::buffer>> {
103 auto ret = m_raft_instance->append_entries({new_log});
104 if(!ret->get_accepted()) {
107 auto result_code = nuraft::cmd_result_code::RESULT_NOT_EXIST_YET;
108 auto blocking_promise = std::promise<void>();
109 auto blocking_future = blocking_promise.get_future();
110 ret->when_ready([&result_code,
112 nuraft::ptr<std::exception>& err) {
114 result_code = nuraft::cmd_result_code::FAILED;
116 result_code = r.get_result_code();
118 blocking_promise.set_value();
120 blocking_future.wait();
121 if(result_code != nuraft::cmd_result_code::OK) {
132 return m_sm->last_commit_index();
140 m_launcher.shutdown();
nuraft::logger implementation using logging::log.
void stop()
Shut down the NuRaft instance.
auto replicate(nuraft::ptr< nuraft::buffer > new_log, const callback_type &result_fn) const -> bool
Replicates the given log entry in the cluster.
auto get_sm() const -> nuraft::state_machine *
Returns a pointer to the state machine replicated by this raft node.
auto init(const nuraft::raft_params &raft_params) -> bool
Initializes the NuRaft instance with the given state machine and raft parameters.
auto last_log_idx() const -> uint64_t
Returns the last replicated log index.
auto replicate_sync(const nuraft::ptr< nuraft::buffer > &new_log) const -> std::optional< nuraft::ptr< nuraft::buffer > >
Replicates the provided log entry and returns the results from the state machine if the replication w...
auto is_leader() const -> bool
Indicates whether this node is the current raft leader.
Implementation of nuraft::state_mgr using a file.
nuraft::cmd_result< nuraft::ptr< nuraft::buffer > > result_type
A NuRaft state machine execution result.
std::function< void(result_type &r, nuraft::ptr< std::exception > &err)> callback_type
Function type for raft state machine execution result callbacks.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.