OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
uhs/twophase/locking_shard/controller.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "controller.hpp"
7
8#include "format.hpp"
9#include "state_machine.hpp"
10#include "status_client.hpp"
13
14#include <utility>
15
16namespace cbdc::locking_shard {
17 controller::controller(size_t shard_id,
18 size_t node_id,
19 config::options opts,
20 std::shared_ptr<logging::log> logger)
21 : m_opts(std::move(opts)),
22 m_logger(std::move(logger)),
23 m_shard_id(shard_id),
24 m_node_id(node_id),
25 m_preseed_dir(
26 m_opts.m_seed_from != m_opts.m_seed_to
27 ? "2pc_shard_preseed_"
28 + std::to_string(m_opts.m_seed_to - m_opts.m_seed_from)
29 + "_" + std::to_string(m_shard_id)
30 : "") {}
31
32 auto controller::init() -> bool {
33 if(!m_logger) {
34 std::cerr
35 << "[ERROR] The logger pointer in locking_shard::controller"
36 << " is null." << std::endl;
37 return false;
38 }
39
40 auto params = nuraft::raft_params();
41 params.election_timeout_lower_bound_
42 = static_cast<int>(m_opts.m_election_timeout_lower);
43 params.election_timeout_upper_bound_
44 = static_cast<int>(m_opts.m_election_timeout_upper);
45 params.heart_beat_interval_ = static_cast<int>(m_opts.m_heartbeat);
46 params.snapshot_distance_ = 0; // TODO: implement snapshots
47 params.max_append_size_ = static_cast<int>(m_opts.m_raft_max_batch);
48
49 if(m_shard_id > (m_opts.m_shard_ranges.size() - 1)) {
50 m_logger->error(
51 "The shard ID is out of range of the m_shard_ranges vector.");
52 return false;
53 }
54
55 m_state_machine = nuraft::cs_new<state_machine>(
56 m_opts.m_shard_ranges[m_shard_id],
57 m_logger,
58 m_opts.m_shard_completed_txs_cache_size,
59 m_preseed_dir,
60 m_opts);
61
62 m_shard = m_state_machine->get_shard_instance();
63
64 if(m_shard_id > (m_opts.m_locking_shard_raft_endpoints.size() - 1)) {
65 m_logger->error("The shard ID is out of range "
66 "of the m_locking_shard_raft_endpoints vector.");
67 return false;
68 }
69
70 for(const auto& vec : m_opts.m_locking_shard_raft_endpoints) {
71 if(m_node_id > (vec.size() - 1)) {
72 m_logger->error(
73 "The node ID is out of range "
74 "of the m_locking_shard_raft_endpoints vector.");
75 return false;
76 }
77 }
78
79 m_raft_serv = std::make_shared<raft::node>(
80 static_cast<int>(m_node_id),
81 m_opts.m_locking_shard_raft_endpoints[m_shard_id],
82 "shard" + std::to_string(m_shard_id),
83 false,
84 m_state_machine,
85 0,
86 m_logger,
87 [&](auto&& res, auto&& err) {
88 return raft_callback(std::forward<decltype(res)>(res),
89 std::forward<decltype(err)>(err));
90 });
91
92 if(!m_raft_serv->init(params)) {
93 m_logger->error("Failed to initialize raft server");
94 return false;
95 }
96
97 auto status_rpc_server = std::make_unique<
100 m_opts.m_locking_shard_readonly_endpoints[m_shard_id][m_node_id]);
101 if(!status_rpc_server->init()) {
102 m_logger->error("Failed to start status RPC server");
103 return false;
104 }
105
106 m_status_server
107 = std::make_unique<decltype(m_status_server)::element_type>(
108 m_shard,
109 std::move(status_rpc_server));
110
111 return true;
112 }
113
114 auto controller::raft_callback(nuraft::cb_func::Type type,
115 nuraft::cb_func::Param* /* param */)
116 -> nuraft::cb_func::ReturnCode {
117 if(type == nuraft::cb_func::Type::BecomeFollower) {
118 m_logger->warn("Became follower, stopping listener");
119 m_server.reset();
120 return nuraft::cb_func::ReturnCode::Ok;
121 }
122 if(type == nuraft::cb_func::Type::BecomeLeader) {
123 m_logger->warn("Became leader, starting listener");
124 m_server = std::make_unique<decltype(m_server)::element_type>(
125 m_opts.m_locking_shard_endpoints[m_shard_id][m_node_id]);
126 m_server->register_raft_node(m_raft_serv);
127 if(!m_server->init()) {
128 m_logger->fatal("Couldn't start message handler server");
129 }
130 }
131 return nuraft::cb_func::ReturnCode::Ok;
132 }
133}
auto init() -> bool
Initializes the locking shard by reading the pre-seed file if applicable, initializing the raft clust...
Implements an RPC server over a TCP socket.
bool status_response
Status response RPC messages indicating whether the shard contains given UHS or TX ID.
std::variant< uhs_status_request, tx_status_request > status_request
Status request RPC message wrapper, holding either a UHS ID or TX ID query request.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
Project-wide configuration options.
Definition config.hpp:132