20 std::shared_ptr<logging::log> logger)
21 : m_opts(std::move(opts)),
22 m_logger(std::move(logger)),
26 m_opts.m_seed_from != m_opts.m_seed_to
27 ?
"2pc_shard_preseed_"
28 + std::
to_string(m_opts.m_seed_to - m_opts.m_seed_from)
35 <<
"[ERROR] The logger pointer in locking_shard::controller"
36 <<
" is null." << std::endl;
40 auto params = nuraft::raft_params();
41 params.election_timeout_lower_bound_
42 =
static_cast<int>(m_opts.m_election_timeout_lower);
43 params.election_timeout_upper_bound_
44 =
static_cast<int>(m_opts.m_election_timeout_upper);
45 params.heart_beat_interval_ =
static_cast<int>(m_opts.m_heartbeat);
46 params.snapshot_distance_ = 0;
47 params.max_append_size_ =
static_cast<int>(m_opts.m_raft_max_batch);
49 if(m_shard_id > (m_opts.m_shard_ranges.size() - 1)) {
51 "The shard ID is out of range of the m_shard_ranges vector.");
55 m_state_machine = nuraft::cs_new<state_machine>(
56 m_opts.m_shard_ranges[m_shard_id],
58 m_opts.m_shard_completed_txs_cache_size,
62 m_shard = m_state_machine->get_shard_instance();
64 if(m_shard_id > (m_opts.m_locking_shard_raft_endpoints.size() - 1)) {
65 m_logger->error(
"The shard ID is out of range "
66 "of the m_locking_shard_raft_endpoints vector.");
70 for(
const auto& vec : m_opts.m_locking_shard_raft_endpoints) {
71 if(m_node_id > (vec.size() - 1)) {
73 "The node ID is out of range "
74 "of the m_locking_shard_raft_endpoints vector.");
79 m_raft_serv = std::make_shared<raft::node>(
80 static_cast<int>(m_node_id),
81 m_opts.m_locking_shard_raft_endpoints[m_shard_id],
82 "shard" + std::to_string(m_shard_id),
87 [&](
auto&& res,
auto&& err) {
88 return raft_callback(std::forward<decltype(res)>(res),
89 std::forward<decltype(err)>(err));
92 if(!m_raft_serv->init(params)) {
93 m_logger->error(
"Failed to initialize raft server");
97 auto status_rpc_server = std::make_unique<
100 m_opts.m_locking_shard_readonly_endpoints[m_shard_id][m_node_id]);
101 if(!status_rpc_server->init()) {
102 m_logger->error(
"Failed to start status RPC server");
107 = std::make_unique<decltype(m_status_server)::element_type>(
109 std::move(status_rpc_server));