OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
parsec/runtime_locking_shard/controller.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "controller.hpp"
7
8#include "format.hpp"
9#include "state_machine.hpp"
10#include "util/rpc/format.hpp"
13
15 controller::controller(size_t component_id,
16 size_t node_id,
17 network::endpoint_t server_endpoint,
18 std::vector<network::endpoint_t> raft_endpoints,
19 std::shared_ptr<logging::log> logger)
20 : m_logger(std::move(logger)),
21 m_state_machine(nuraft::cs_new<state_machine>()),
22 m_raft_serv(std::make_shared<raft::node>(
23 static_cast<int>(node_id),
24 raft_endpoints,
25 "runtime_locking_shard" + std::to_string(component_id),
26 false,
27 m_state_machine,
28 0,
29 m_logger,
30 [&](auto&& res, auto&& err) {
31 return raft_callback(std::forward<decltype(res)>(res),
32 std::forward<decltype(err)>(err));
33 })),
34 m_raft_client(
35 std::make_shared<replicated_shard_client>(m_raft_serv)),
36 m_raft_endpoints(std::move(raft_endpoints)),
37 m_server_endpoint(std::move(server_endpoint)) {}
38
39 auto controller::init() -> bool {
40 auto params = nuraft::raft_params();
41 params.snapshot_distance_ = 0; // TODO: implement snapshots
42 params.max_append_size_ = config::defaults::raft_max_batch;
43 params.election_timeout_lower_bound_
44 = config::defaults::election_timeout_lower_bound;
45 params.election_timeout_upper_bound_
46 = config::defaults::election_timeout_upper_bound;
47 params.heart_beat_interval_ = config::defaults::heartbeat;
48
49 if(!m_raft_serv->init(params)) {
50 m_logger->error("Failed to initialize raft server");
51 return false;
52 }
53
54 return true;
55 }
56
57 auto controller::raft_callback(nuraft::cb_func::Type type,
58 nuraft::cb_func::Param* /* param */)
59 -> nuraft::cb_func::ReturnCode {
60 if(type == nuraft::cb_func::Type::BecomeFollower) {
61 m_logger->warn("Became follower, stopping listener");
62 m_server.reset();
63 return nuraft::cb_func::ReturnCode::Ok;
64 }
65 if(type == nuraft::cb_func::Type::BecomeLeader) {
66 m_logger->warn("Became leader, starting listener");
67 // Recover shard state from raft state machine
68 do_recovery();
69 }
70 return nuraft::cb_func::ReturnCode::Ok;
71 }
72
73 void controller::do_recovery() {
74 // Request tickets from state machine
75 auto success = m_raft_client->get_tickets(
77 handle_get_tickets(std::move(res));
78 });
79 if(!success) {
80 m_logger->error("Failed to request tickets from state machine");
81 }
82 }
83
84 void controller::handle_get_tickets(
86 if(!std::holds_alternative<replicated_shard_interface::tickets_type>(
87 res)) {
88 m_logger->error("Error requesting tickets from state machine");
89 return;
90 }
91
92 m_shard = std::make_unique<impl>(m_logger);
93
94 auto&& tickets
95 = std::get<replicated_shard_interface::tickets_type>(res);
96 auto state = m_state_machine->get_shard()->get_state();
97 auto success = m_shard->recover(state, tickets);
98 if(!success) {
99 m_logger->error("Error during shard recovery");
100 return;
101 }
102
103 auto rpc_server = std::make_unique<cbdc::rpc::tcp_server<
105 m_server_endpoint);
106 if(!rpc_server->init()) {
107 m_logger->fatal("Failed to start RPC server");
108 }
109
110 m_server = std::make_unique<decltype(m_server)::element_type>(
111 m_logger,
112 m_shard,
113 m_raft_client,
114 std::move(rpc_server));
115
116 m_logger->info("Recovered shard and started RPC server");
117 }
118}
std::variant< tickets_type, error_code > get_tickets_return_type
Return type from a get tickets operation.
NuRaft state machine implementation for a runtime locking shard.
Generic asynchronous RPC server.
Implements an RPC server over a TCP socket.
std::pair< ip_address, port_number_t > endpoint_t
[host name, port number].
Definition socket.hpp:19
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.