29 using send_fn_t = std::function<void(
const std::shared_ptr<buffer>&,
39 size_t coordinator_id,
41 std::shared_ptr<logging::log> logger);
65 unordered_map<hash_t, prepare_tx, hashing::const_sip_hash<hash_t>>;
73 = std::pair<std::vector<bool>, std::vector<std::vector<uint64_t>>>;
78 unordered_map<hash_t, commit_tx, hashing::const_sip_hash<hash_t>>;
82 = std::unordered_set<hash_t, hashing::const_sip_hash<hash_t>>;
102 std::optional<std::variant<prepare_tx, commit_tx>>
m_data{};
140 size_t m_coordinator_id;
142 std::shared_ptr<logging::log> m_logger;
144 nuraft::ptr<state_machine> m_state_machine;
145 std::shared_ptr<raft::node> m_raft_serv;
146 nuraft::raft_params m_raft_params{};
147 std::atomic_bool m_running{
false};
148 std::vector<std::shared_ptr<cbdc::locking_shard::interface>> m_shards;
149 std::vector<std::vector<network::endpoint_t>> m_shard_endpoints;
150 std::vector<cbdc::config::shard_range_t> m_shard_ranges;
151 random_source m_rnd{config::random_source};
152 std::mutex m_batch_mut;
153 std::condition_variable m_batch_cv;
154 std::shared_ptr<distributed_tx> m_current_batch;
155 std::shared_ptr<std::unordered_map<
hash_t,
156 std::pair<callback_type, size_t>,
157 hashing::const_sip_hash<hash_t>>>
160 std::shared_mutex m_shards_mut;
161 std::thread m_batch_exec_thread;
162 std::unique_ptr<rpc::server> m_rpc_server;
164 std::vector<std::pair<std::shared_ptr<std::thread>, std::atomic_bool>>
166 std::shared_mutex m_exec_mut;
168 std::thread m_start_thread;
169 bool m_start_flag{
false};
170 bool m_stop_flag{
false};
171 std::condition_variable m_start_cv;
172 std::mutex m_start_mut;
175 void start_stop_func();
181 auto recovery_func() -> bool;
183 void batch_executor_func();
185 auto raft_callback(nuraft::cb_func::Type type,
186 nuraft::cb_func::Param* param)
187 -> nuraft::cb_func::ReturnCode;
189 auto prepare_cb(
const hash_t& dtx_id,
190 const std::vector<transaction::compact_tx>& txs)
192 auto commit_cb(
const hash_t& dtx_id,
193 const std::vector<bool>& complete_txs,
194 const std::vector<std::vector<uint64_t>>& tx_idxs)
196 auto discard_cb(
const hash_t& dtx_id) -> bool;
197 auto done_cb(
const hash_t& dtx_id) -> bool;
199 void batch_set_cbs(distributed_tx& c);
201 [[nodiscard]]
auto replicate_sm_command(
const sm_command& c)
202 -> std::optional<nuraft::ptr<nuraft::buffer>>;
204 void connect_shards();
206 void schedule_exec(std::function<
void(
size_t)>&& f);