OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
uhs/twophase/coordinator/controller.hpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#ifndef OPENCBDC_TX_SRC_COORDINATOR_CONTROLLER_H_
7#define OPENCBDC_TX_SRC_COORDINATOR_CONTROLLER_H_
8
9#include "distributed_tx.hpp"
10#include "interface.hpp"
11#include "server.hpp"
12#include "state_machine.hpp"
17#include "util/raft/node.hpp"
18
19#include <secp256k1.h>
20
21namespace cbdc::coordinator {
27 class controller : public interface {
28 public:
29 using send_fn_t = std::function<void(const std::shared_ptr<buffer>&,
31
38 controller(size_t node_id,
39 size_t coordinator_id,
41 std::shared_ptr<logging::log> logger);
42
43 controller() = delete;
44 controller(const controller&) = delete;
45 auto operator=(const controller&) -> controller& = delete;
47 auto operator=(controller&&) -> controller& = delete;
48
49 ~controller() override;
50
53 auto init() -> bool;
54
56 void quit();
57
60 using prepare_tx = std::vector<transaction::compact_tx>;
61
64 using prepare_txs = std::
65 unordered_map<hash_t, prepare_tx, hashing::const_sip_hash<hash_t>>;
66
73 = std::pair<std::vector<bool>, std::vector<std::vector<uint64_t>>>;
74
77 using commit_txs = std::
78 unordered_map<hash_t, commit_tx, hashing::const_sip_hash<hash_t>>;
79
81 using discard_txs
82 = std::unordered_set<hash_t, hashing::const_sip_hash<hash_t>>;
88
91 std::optional<hash_t> m_dtx_id{};
92
93 auto operator==(const sm_command_header& rhs) const -> bool;
94 };
102 std::optional<std::variant<prepare_tx, commit_tx>> m_data{};
103 };
104
119
120 auto operator==(const coordinator_state& rhs) const -> bool;
121 };
122
135 callback_type result_callback)
136 -> bool override;
137
138 private:
139 size_t m_node_id;
140 size_t m_coordinator_id;
142 std::shared_ptr<logging::log> m_logger;
143
144 nuraft::ptr<state_machine> m_state_machine;
145 std::shared_ptr<raft::node> m_raft_serv;
146 nuraft::raft_params m_raft_params{};
147 std::atomic_bool m_running{false};
148 std::vector<std::shared_ptr<cbdc::locking_shard::interface>> m_shards;
149 std::vector<std::vector<network::endpoint_t>> m_shard_endpoints;
150 std::vector<cbdc::config::shard_range_t> m_shard_ranges;
151 random_source m_rnd{config::random_source};
152 std::mutex m_batch_mut;
153 std::condition_variable m_batch_cv;
154 std::shared_ptr<distributed_tx> m_current_batch;
155 std::shared_ptr<std::unordered_map<hash_t,
156 std::pair<callback_type, size_t>,
157 hashing::const_sip_hash<hash_t>>>
158 m_current_txs;
159 size_t m_batch_size;
160 std::shared_mutex m_shards_mut;
161 std::thread m_batch_exec_thread;
162 std::unique_ptr<rpc::server> m_rpc_server;
163 network::endpoint_t m_handler_endpoint;
164 std::vector<std::pair<std::shared_ptr<std::thread>, std::atomic_bool>>
165 m_exec_threads;
166 std::shared_mutex m_exec_mut;
167
168 std::thread m_start_thread;
169 bool m_start_flag{false};
170 bool m_stop_flag{false};
171 std::condition_variable m_start_cv;
172 std::mutex m_start_mut;
173 bool m_quit{false};
174
175 void start_stop_func();
176
177 void start();
178
179 void stop();
180
181 auto recovery_func() -> bool;
182
183 void batch_executor_func();
184
185 auto raft_callback(nuraft::cb_func::Type type,
186 nuraft::cb_func::Param* param)
187 -> nuraft::cb_func::ReturnCode;
188
189 auto prepare_cb(const hash_t& dtx_id,
190 const std::vector<transaction::compact_tx>& txs)
191 -> bool;
192 auto commit_cb(const hash_t& dtx_id,
193 const std::vector<bool>& complete_txs,
194 const std::vector<std::vector<uint64_t>>& tx_idxs)
195 -> bool;
196 auto discard_cb(const hash_t& dtx_id) -> bool;
197 auto done_cb(const hash_t& dtx_id) -> bool;
198
199 void batch_set_cbs(distributed_tx& c);
200
201 [[nodiscard]] auto replicate_sm_command(const sm_command& c)
202 -> std::optional<nuraft::ptr<nuraft::buffer>>;
203
204 void connect_shards();
205
206 void schedule_exec(std::function<void(size_t)>&& f);
207
208 void join_execs();
209 };
210}
211
212#endif // OPENCBDC_TX_SRC_COORDINATOR_CONTROLLER_H_
std::pair< std::vector< bool >, std::vector< std::vector< uint64_t > > > commit_tx
Aggregated responses and metadata from the prepare phase.
controller(controller &&)=delete
void quit()
Terminates the replicated coordinator instance.
std:: unordered_map< hash_t, prepare_tx, hashing::const_sip_hash< hash_t > > prepare_txs
Map from distributed transaction IDs in the prepare phase to the associated compact transactions.
std::unordered_set< hash_t, hashing::const_sip_hash< hash_t > > discard_txs
Set of distributed transaction IDs in the discard phase.
auto init() -> bool
Starts the replicated coordinator and associated raft server.
auto operator=(const controller &) -> controller &=delete
controller(const controller &)=delete
std:: unordered_map< hash_t, commit_tx, hashing::const_sip_hash< hash_t > > commit_txs
Map from distributed transaction IDs in the commit phase to the associated responses and metadata fro...
std::vector< transaction::compact_tx > prepare_tx
List of compact transactions associated with a distributed transaction in the prepare phase.
auto execute_transaction(transaction::compact_tx tx, callback_type result_callback) -> bool override
Coordinates a transaction among locking shards.
auto operator=(controller &&) -> controller &=delete
std::function< void(const std::shared_ptr< buffer > &, cbdc::network::peer_id_t)> send_fn_t
std::function< void(std::optional< bool >)> callback_type
Signature of callback function for a transaction execution result.
command
Types of command the state machine can process.
size_t peer_id_t
Peer IDs within a connection_manager.
std::pair< ip_address, port_number_t > endpoint_t
[host name, port number].
Definition socket.hpp:19
std::array< unsigned char, cbdc::hash_size > hash_t
SHA256 hash container.
Pseudorandom number generator (PRNG) for generating random data from a given entropy source.
Project-wide configuration options.
Definition config.hpp:132
Current state of distributed transactions managed by a coordinator.
auto operator==(const coordinator_state &rhs) const -> bool
std::optional< hash_t > m_dtx_id
The ID of the distributed transaction the command applies to, if applicable.
auto operator==(const sm_command_header &rhs) const -> bool
A full command for the state machine to process.
std::optional< std::variant< prepare_tx, commit_tx > > m_data
Associated transactions to prepare or commit, if applicable.
A condensed, hash-only transaction representation.