19 size_t coordinator_id,
21 std::shared_ptr<logging::log> logger)
23 m_coordinator_id(coordinator_id),
24 m_opts(std::move(opts)),
25 m_logger(std::move(logger)),
27 m_shard_endpoints(m_opts.m_locking_shard_endpoints),
28 m_shard_ranges(m_opts.m_shard_ranges),
29 m_batch_size(m_opts.m_batch_size),
30 m_exec_threads(m_opts.m_coordinator_max_threads) {
31 m_raft_params.election_timeout_lower_bound_
33 m_raft_params.election_timeout_upper_bound_
35 m_raft_params.heart_beat_interval_
37 m_raft_params.snapshot_distance_ = 0;
38 m_raft_params.max_append_size_
49 <<
"[ERROR] The logger pointer in coordinator::controller "
50 <<
"is null." << std::endl;
54 if(m_coordinator_id > (m_opts.m_coordinator_endpoints.size() - 1)) {
55 m_logger->error(
"The coordinator ID is out of range "
56 "of the m_coordinator_endpoints vector.");
60 for(
const auto& vec : m_opts.m_coordinator_endpoints) {
61 if(m_node_id > (vec.size() - 1)) {
62 m_logger->error(
"The node ID is out of range "
63 "of the m_coordinator_endpoints vector.");
69 = m_opts.m_coordinator_endpoints[m_coordinator_id][m_node_id];
72 > (m_opts.m_coordinator_raft_endpoints.size() - 1)) {
73 m_logger->error(
"The coordinator ID is out of range "
74 "of the m_coordinator_raft_endpoints vector.");
78 for(
const auto& vec : m_opts.m_coordinator_raft_endpoints) {
79 if(m_node_id > (vec.size() - 1)) {
80 m_logger->error(
"The node ID is out of range "
81 "of the m_coordinator_raft_endpoints vector.");
86 m_raft_serv = std::make_shared<raft::node>(
87 static_cast<int>(m_node_id),
88 m_opts.m_coordinator_raft_endpoints[m_coordinator_id],
89 "coordinator" + std::to_string(m_coordinator_id),
94 [&](
auto&& res,
auto&& err) {
95 return raft_callback(std::forward<decltype(res)>(res),
96 std::forward<decltype(err)>(err));
102 m_start_thread = std::thread([&] {
109 return m_raft_serv->init(m_raft_params);
112 auto controller::raft_callback(nuraft::cb_func::Type type,
113 nuraft::cb_func::Param* )
114 -> nuraft::cb_func::ReturnCode {
115 if(type == nuraft::cb_func::BecomeLeader) {
123 m_logger->warn(
"Became leader, starting coordinator");
125 std::unique_lock<std::mutex> l(m_start_mut);
129 m_start_cv.notify_one();
130 m_logger->warn(
"Done with become leader handler");
131 }
else if(type == nuraft::cb_func::BecomeFollower) {
134 m_logger->warn(
"Became follower, stopping coordinator");
136 std::unique_lock<std::mutex> l(m_start_mut);
137 m_start_flag =
false;
140 m_start_cv.notify_one();
141 m_logger->warn(
"Done with become follower handler");
143 return nuraft::cb_func::ReturnCode::Ok;
147 controller::prepare_cb(
const hash_t& dtx_id,
148 const std::vector<transaction::compact_tx>& txs)
154 return replicate_sm_command(comm).has_value();
158 controller::commit_cb(
const hash_t& dtx_id,
159 const std::vector<bool>& complete_txs,
160 const std::vector<std::vector<uint64_t>>& tx_idxs)
166 std::make_pair(complete_txs, tx_idxs)};
167 return replicate_sm_command(comm).has_value();
170 auto controller::discard_cb(
const hash_t& dtx_id) ->
bool {
173 return replicate_sm_command(comm).has_value();
176 auto controller::done_cb(
const hash_t& dtx_id) ->
bool {
179 return replicate_sm_command(comm).has_value();
182 void controller::stop() {
187 std::lock_guard<std::mutex> l(m_batch_mut);
190 m_rpc_server.reset();
191 m_batch_cv.notify_one();
198 std::shared_lock<std::shared_mutex> l(m_shards_mut);
199 for(
auto& s : m_shards) {
205 if(m_batch_exec_thread.joinable()) {
206 m_batch_exec_thread.join();
214 std::unique_lock<std::shared_mutex> l(m_shards_mut);
219 auto controller::recovery_func() ->
bool {
223 m_logger->info(
"Waiting for get SM command response");
224 auto r = replicate_sm_command(comm);
230 m_logger->info(
"Started recovery process");
232 const auto& res = *r;
236 m_logger->error(
"Empty response object");
241 auto coordinators = std::vector<std::shared_ptr<distributed_tx>>();
244 auto state = coordinator_state();
245 auto deser = nuraft_serializer(*res);
248 for(
const auto& prep : state.m_prepare_txs) {
250 auto coord = std::shared_ptr<distributed_tx>();
252 std::shared_lock<std::shared_mutex> l(m_shards_mut);
253 coord = std::make_shared<distributed_tx>(prep.first,
259 coord->recover_prepare(prep.second);
260 coordinators.emplace_back(std::move(coord));
263 for(
const auto& com : state.m_commit_txs) {
264 auto coord = std::shared_ptr<distributed_tx>();
266 std::shared_lock<std::shared_mutex> l(m_shards_mut);
267 coord = std::make_shared<distributed_tx>(com.first,
274 coord->recover_commit(com.second.first, com.second.second);
275 coordinators.emplace_back(std::move(coord));
278 for(
const auto& dis : state.m_discard_txs) {
279 auto coord = std::shared_ptr<distributed_tx>();
281 std::shared_lock<std::shared_mutex> l(m_shards_mut);
282 coord = std::make_shared<distributed_tx>(dis,
287 coord->recover_discard();
288 coordinators.emplace_back(std::move(coord));
293 auto success = std::atomic_bool{
true};
294 for(
auto&& coord : coordinators) {
297 batch_set_cbs(*coord);
298 auto dtx_id_str =
to_string(coord->get_id());
299 m_logger->info(
"Recovering dtx", dtx_id_str);
302 auto f = [&, c{std::move(coord)}, s{std::move(dtx_id_str)}](
305 auto exec_res = c->execute();
307 m_logger->error(
"Failed to recover dtx", s);
312 m_logger->info(
"Recovered dtx", s);
316 std::shared_lock<std::shared_mutex> l(m_exec_mut);
317 m_exec_threads[thread_idx].second =
false;
322 schedule_exec(std::move(f));
331 void controller::batch_set_cbs(distributed_tx& c) {
332 auto s = c.get_state();
338 c.set_prepare_cb([&](
auto&& dtx_id,
auto&& txs) {
339 return prepare_cb(std::forward<
decltype(dtx_id)>(dtx_id),
340 std::forward<
decltype(txs)>(txs));
345 [&](
auto&& dtx_id,
auto&& complete_txs,
auto&& tx_idxs) {
347 std::forward<
decltype(dtx_id)>(dtx_id),
348 std::forward<
decltype(complete_txs)>(complete_txs),
349 std::forward<
decltype(tx_idxs)>(tx_idxs));
353 c.set_discard_cb([&](
auto&& dtx_id) {
354 return discard_cb(std::forward<
decltype(dtx_id)>(dtx_id));
358 c.set_done_cb([&](
auto&& dtx_id) {
359 return done_cb(std::forward<
decltype(dtx_id)>(dtx_id));
364 void controller::batch_executor_func() {
369 std::unique_lock<std::mutex> l(m_batch_mut);
370 m_batch_cv.wait(l, [&]() {
371 return !m_current_txs->empty() || !m_running;
380 auto batch = std::shared_ptr<distributed_tx>();
382 = std::shared_ptr<decltype(m_current_txs)::element_type>();
386 auto new_batch = std::shared_ptr<distributed_tx>();
388 std::shared_lock<std::shared_mutex> l(m_shards_mut);
390 = std::make_shared<distributed_tx>(m_rnd.
random_hash(),
399 std::lock_guard<std::mutex> l(m_batch_mut);
400 batch = std::move(m_current_batch);
401 txs = std::move(m_current_txs);
402 m_current_batch = std::move(new_batch);
403 batch_set_cbs(*m_current_batch);
404 m_current_txs = std::make_shared<
405 decltype(m_current_txs)::element_type>();
410 m_batch_cv.notify_one();
414 auto f = [&, b{std::move(batch)}, t{std::move(txs)}](
417 m_logger->info(
"dtxn start:", dtxid,
"size:", t->size());
418 auto s = std::chrono::high_resolution_clock::now();
420 auto res = b->execute();
424 for(
const auto& [tx_id, metadata] : *t) {
425 const auto& [cb_func, batch_idx] = metadata;
426 auto tx_res = std::optional<bool>();
427 if(res.has_value()) {
428 tx_res =
static_cast<bool>((*res)[batch_idx]);
437 m_logger->warn(
"dtxn failed:", dtxid);
439 auto e = std::chrono::high_resolution_clock::now();
440 auto l = (e - s).count();
441 m_logger->info(
"dtxn done:",
450 std::shared_lock<std::shared_mutex> l(m_exec_mut);
451 m_exec_threads[thread_idx].second =
false;
456 schedule_exec(std::move(f));
460 auto controller::replicate_sm_command(
const sm_command& c)
461 -> std::optional<nuraft::ptr<nuraft::buffer>> {
463 auto ser = nuraft_serializer(*buf);
466 assert(ser.end_of_buffer());
468 return m_raft_serv->replicate_sync(buf);
471 void controller::connect_shards() {
475 for(
size_t i{0}; i < m_shard_endpoints.size(); i++) {
476 m_logger->warn(
"Connecting to shard cluster", std::to_string(i));
477 auto s = std::make_shared<locking_shard::rpc::client>(
478 m_shard_endpoints[i],
482 m_logger->fatal(
"Failed to initialize shard client");
485 std::unique_lock<std::shared_mutex> l(m_shards_mut);
486 m_shards.emplace_back(std::move(s));
491 void controller::schedule_exec(std::function<
void(
size_t)>&& f) {
493 bool found_thread{
false};
494 while(!found_thread) {
496 std::unique_lock<std::shared_mutex> l(m_exec_mut);
497 for(
size_t i{0}; i < m_exec_threads.size(); i++) {
498 auto& thr = m_exec_threads[i];
502 if(thr.first && thr.first->joinable()) {
512 = std::make_shared<std::thread>(std::move(f), i);
522 std::this_thread::yield();
527 void controller::join_execs() {
528 std::shared_lock<std::shared_mutex> l(m_exec_mut);
529 for(
auto& t : m_exec_threads) {
530 if(t.first && t.first->joinable()) {
536 void controller::start_stop_func() {
538 bool stopping{
false};
539 bool quitting{
false};
542 std::unique_lock<std::mutex> l(m_start_mut);
543 m_start_cv.wait(l, [&]() {
544 return m_start_flag || m_quit || m_stop_flag;
555 assert(m_start_flag ^ m_stop_flag);
559 m_start_flag =
false;
565 m_logger->warn(
"Stopping coordinator");
567 m_logger->warn(
"Stopped coordinator");
569 m_logger->warn(
"Quitting");
573 m_logger->warn(
"Stopping coordinator before start");
578 m_logger->warn(
"Starting coordinator");
580 m_logger->warn(
"Started coordinator");
585 void controller::start() {
589 std::lock_guard<std::mutex> l(m_batch_mut);
592 m_logger->warn(
"Resetting sentinel network handler");
594 m_rpc_server.reset();
595 m_logger->warn(
"Connecting to shards");
598 m_logger->warn(
"Became leader, recovering dtxs");
601 bool recovered{
false};
603 auto res = recovery_func();
605 m_logger->error(
"Failed to recover, likely stopped "
610 }
while(!recovered && m_raft_serv->is_leader());
611 m_logger->info(
"Recovery complete");
615 if(!m_raft_serv->is_leader()) {
620 auto batch = std::shared_ptr<distributed_tx>();
622 std::shared_lock<std::shared_mutex> ll(m_shards_mut);
623 batch = std::make_shared<distributed_tx>(m_rnd.
random_hash(),
628 batch_set_cbs(*batch);
632 std::lock_guard<std::mutex> ll(m_batch_mut);
633 m_current_batch = std::move(batch);
635 = std::make_shared<decltype(m_current_txs)::element_type>();
639 m_batch_exec_thread = std::thread([&] {
640 batch_executor_func();
647 if(!rpc_server->init()) {
648 m_logger->fatal(
"Failed to start RPC server");
651 m_rpc_server = std::make_unique<decltype(m_rpc_server)::element_type>(
653 std::move(rpc_server));
661 std::unique_lock<std::mutex> l(m_start_mut);
664 m_start_cv.notify_one();
665 if(m_start_thread.joinable()) {
666 m_start_thread.join();
672 return std::tie(m_comm, m_dtx_id)
673 == std::tie(rhs.m_comm, rhs.m_dtx_id);
678 return std::tie(m_prepare_txs, m_commit_txs, m_discard_txs)
679 == std::tie(rhs.m_prepare_txs,
688 if(!m_raft_serv->is_leader()) {
694 m_opts.m_sentinel_public_keys,
695 m_opts.m_attestation_threshold)) {
696 m_logger->warn(
"Received invalid compact transaction",
703 std::unique_lock<std::mutex> l(m_batch_mut);
704 m_batch_cv.wait(l, [&]() {
705 return m_current_txs->size() < m_batch_size || !m_running;
712 if(m_current_txs->find(tx.m_id) != m_current_txs->end()) {
716 auto idx = m_current_batch->add_tx(tx);
719 m_current_txs->emplace(
721 std::make_pair(std::move(result_callback), idx));
727 m_batch_cv.notify_one();
void quit()
Terminates the replicated coordinator instance.
auto init() -> bool
Starts the replicated coordinator and associated raft server.
auto execute_transaction(transaction::compact_tx tx, callback_type result_callback) -> bool override
Coordinates a transaction among locking shards.
@ prepare
dtx is calling prepare on shards
@ done
dtx has completed fully
@ discard
dtx is calling discard on shards
@ commit
dtx is calling commit on shards
std::function< void(std::optional< bool >)> callback_type
Signature of callback function for a transaction execution result.
Raft state machine for managing a replicated coordinator.
@ prepare
Stores a dtx in the prepare phase.
@ done
Clears the dtx from the coordinator state.
@ discard
Moves a dtx from commit to discard.
@ get
Retrieves all active dtxs.
@ commit
Moves a dtx from prepare to commit.
auto random_hash() -> hash_t
Returns a random 32-byte hash value.
Generic asynchronous RPC server.
Implements an RPC server over a TCP socket.
auto check_attestations(const transaction::compact_tx &tx, const std::unordered_set< pubkey_t, hashing::null > &pubkeys, size_t threshold) -> bool
Validates the sentinel attestations attached to a compact transaction.
std::array< unsigned char, cbdc::hash_size > hash_t
SHA256 hash container.
auto serialized_size(const T &obj) -> size_t
Calculates the serialized size in bytes of the given object when serialized using serializer.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
Project-wide configuration options.
int32_t m_raft_max_batch
Maximum number of raft log entries to batch into one RPC message.
int32_t m_election_timeout_lower
Raft election timeout lower bound in milliseconds.
int32_t m_election_timeout_upper
Raft election timeout upper bound in milliseconds.
int32_t m_heartbeat
Raft heartbeat timeout in milliseconds.
Current state of distributed transactions managed by a coordinator.
auto operator==(const coordinator_state &rhs) const -> bool
Metadata of a command for the state machine.
auto operator==(const sm_command_header &rhs) const -> bool
A condensed, hash-only transaction representation.