18 std::vector<network::endpoint_t> raft_endpoints,
19 std::shared_ptr<logging::log> logger)
20 : m_logger(std::move(logger)),
22 m_raft_serv(std::make_shared<raft::node>(
23 static_cast<int>(node_id),
25 "runtime_locking_shard" + std::
to_string(component_id),
30 [&](auto&& res, auto&& err) {
31 return raft_callback(std::forward<
decltype(res)>(res),
32 std::forward<
decltype(err)>(err));
35 std::make_shared<replicated_shard_client>(m_raft_serv)),
36 m_raft_endpoints(std::move(raft_endpoints)),
37 m_server_endpoint(std::move(server_endpoint)) {}
40 auto params = nuraft::raft_params();
41 params.snapshot_distance_ = 0;
42 params.max_append_size_ = config::defaults::raft_max_batch;
43 params.election_timeout_lower_bound_
44 = config::defaults::election_timeout_lower_bound;
45 params.election_timeout_upper_bound_
46 = config::defaults::election_timeout_upper_bound;
47 params.heart_beat_interval_ = config::defaults::heartbeat;
49 if(!m_raft_serv->init(params)) {
50 m_logger->error(
"Failed to initialize raft server");
57 auto controller::raft_callback(nuraft::cb_func::Type type,
58 nuraft::cb_func::Param* )
59 -> nuraft::cb_func::ReturnCode {
60 if(type == nuraft::cb_func::Type::BecomeFollower) {
61 m_logger->warn(
"Became follower, stopping listener");
63 return nuraft::cb_func::ReturnCode::Ok;
65 if(type == nuraft::cb_func::Type::BecomeLeader) {
66 m_logger->warn(
"Became leader, starting listener");
70 return nuraft::cb_func::ReturnCode::Ok;
73 void controller::do_recovery() {
75 auto success = m_raft_client->get_tickets(
77 handle_get_tickets(std::move(res));
80 m_logger->error(
"Failed to request tickets from state machine");
84 void controller::handle_get_tickets(
86 if(!std::holds_alternative<replicated_shard_interface::tickets_type>(
88 m_logger->error(
"Error requesting tickets from state machine");
92 m_shard = std::make_unique<impl>(m_logger);
95 = std::get<replicated_shard_interface::tickets_type>(res);
96 auto state = m_state_machine->get_shard()->get_state();
97 auto success = m_shard->recover(state, tickets);
99 m_logger->error(
"Error during shard recovery");
106 if(!rpc_server->init()) {
107 m_logger->fatal(
"Failed to start RPC server");
110 m_server = std::make_unique<decltype(m_server)::element_type>(
114 std::move(rpc_server));
116 m_logger->info(
"Recovered shard and started RPC server");
auto init() -> bool
Initializes the shard.
std::variant< tickets_type, error_code > get_tickets_return_type
Return type from a get tickets operation.
NuRaft state machine implementation for a runtime locking shard.
Generic asynchronous RPC server.
Implements an RPC server over a TCP socket.
std::pair< ip_address, port_number_t > endpoint_t
[host name, port number].
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.