OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
uhs/twophase/coordinator/controller.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "controller.hpp"
7
8#include "format.hpp"
14
15#include <utility>
16
17namespace cbdc::coordinator {
18 controller::controller(size_t node_id,
19 size_t coordinator_id,
20 config::options opts,
21 std::shared_ptr<logging::log> logger)
22 : m_node_id(node_id),
23 m_coordinator_id(coordinator_id),
24 m_opts(std::move(opts)),
25 m_logger(std::move(logger)),
26 m_state_machine(nuraft::cs_new<state_machine>(m_logger)),
27 m_shard_endpoints(m_opts.m_locking_shard_endpoints),
28 m_shard_ranges(m_opts.m_shard_ranges),
29 m_batch_size(m_opts.m_batch_size),
30 m_exec_threads(m_opts.m_coordinator_max_threads) {
31 m_raft_params.election_timeout_lower_bound_
32 = static_cast<int>(m_opts.m_election_timeout_lower);
33 m_raft_params.election_timeout_upper_bound_
34 = static_cast<int>(m_opts.m_election_timeout_upper);
35 m_raft_params.heart_beat_interval_
36 = static_cast<int>(m_opts.m_heartbeat);
37 m_raft_params.snapshot_distance_ = 0; // TODO: implement snapshots
38 m_raft_params.max_append_size_
39 = static_cast<int>(m_opts.m_raft_max_batch);
40 }
41
45
46 auto controller::init() -> bool {
47 if(!m_logger) {
48 std::cerr
49 << "[ERROR] The logger pointer in coordinator::controller "
50 << "is null." << std::endl;
51 return false;
52 }
53
54 if(m_coordinator_id > (m_opts.m_coordinator_endpoints.size() - 1)) {
55 m_logger->error("The coordinator ID is out of range "
56 "of the m_coordinator_endpoints vector.");
57 return false;
58 }
59
60 for(const auto& vec : m_opts.m_coordinator_endpoints) {
61 if(m_node_id > (vec.size() - 1)) {
62 m_logger->error("The node ID is out of range "
63 "of the m_coordinator_endpoints vector.");
64 return false;
65 }
66 }
67
68 m_handler_endpoint
69 = m_opts.m_coordinator_endpoints[m_coordinator_id][m_node_id];
70
71 if(m_coordinator_id
72 > (m_opts.m_coordinator_raft_endpoints.size() - 1)) {
73 m_logger->error("The coordinator ID is out of range "
74 "of the m_coordinator_raft_endpoints vector.");
75 return false;
76 }
77
78 for(const auto& vec : m_opts.m_coordinator_raft_endpoints) {
79 if(m_node_id > (vec.size() - 1)) {
80 m_logger->error("The node ID is out of range "
81 "of the m_coordinator_raft_endpoints vector.");
82 return false;
83 }
84 }
85
86 m_raft_serv = std::make_shared<raft::node>(
87 static_cast<int>(m_node_id),
88 m_opts.m_coordinator_raft_endpoints[m_coordinator_id],
89 "coordinator" + std::to_string(m_coordinator_id),
90 true,
91 m_state_machine,
92 0,
93 m_logger,
94 [&](auto&& res, auto&& err) {
95 return raft_callback(std::forward<decltype(res)>(res),
96 std::forward<decltype(err)>(err));
97 });
98
99 // Thread to handle starting and stopping the message handler and dtx
100 // batch processing threads when triggered by the raft callback
101 // becoming leader or follower
102 m_start_thread = std::thread([&] {
103 start_stop_func();
104 });
105
106 // Initialize NuRaft with the state machine we just created. Register
107 // our callback function to notify us when we become a leader or
108 // follower.
109 return m_raft_serv->init(m_raft_params);
110 }
111
112 auto controller::raft_callback(nuraft::cb_func::Type type,
113 nuraft::cb_func::Param* /* param */)
114 -> nuraft::cb_func::ReturnCode {
115 if(type == nuraft::cb_func::BecomeLeader) {
116 // We're now the leader. Inform the start/stop thread that it
117 // should start up the handler threads and initiate dtx recovery.
118 // We do this via flags and a condition variable with the actual
119 // start/stop in a separate thread to not block NuRaft internally.
120 // Since we need to use the state machine to handle recovery we
121 // need to return from this callback before we can start the
122 // process.
123 m_logger->warn("Became leader, starting coordinator");
124 {
125 std::unique_lock<std::mutex> l(m_start_mut);
126 m_start_flag = true;
127 m_stop_flag = false;
128 }
129 m_start_cv.notify_one();
130 m_logger->warn("Done with become leader handler");
131 } else if(type == nuraft::cb_func::BecomeFollower) {
132 // As above, we're now a follower. Inform the start/stop thread to
133 // kill the handler threads.
134 m_logger->warn("Became follower, stopping coordinator");
135 {
136 std::unique_lock<std::mutex> l(m_start_mut);
137 m_start_flag = false;
138 m_stop_flag = true;
139 }
140 m_start_cv.notify_one();
141 m_logger->warn("Done with become follower handler");
142 }
143 return nuraft::cb_func::ReturnCode::Ok;
144 }
145
146 auto
147 controller::prepare_cb(const hash_t& dtx_id,
148 const std::vector<transaction::compact_tx>& txs)
149 -> bool {
150 // Send the prepare status for this dtx ID and the txs contained within
151 // to the coordinator RSM and ensure it replicated (or failed) before
152 // returning.
153 auto comm = sm_command{{state_machine::command::prepare, dtx_id}, txs};
154 return replicate_sm_command(comm).has_value();
155 }
156
157 auto
158 controller::commit_cb(const hash_t& dtx_id,
159 const std::vector<bool>& complete_txs,
160 const std::vector<std::vector<uint64_t>>& tx_idxs)
161 -> bool {
162 // Send the commit status for this dtx ID and the result from prepare,
163 // along with the mapping of which txs are relevant to each shard in
164 // the prepare result to the RSM and check if it replicated.
165 auto comm = sm_command{{state_machine::command::commit, dtx_id},
166 std::make_pair(complete_txs, tx_idxs)};
167 return replicate_sm_command(comm).has_value();
168 }
169
170 auto controller::discard_cb(const hash_t& dtx_id) -> bool {
171 // Send the discard status for this dtx ID and check if it replicated.
172 auto comm = sm_command{{state_machine::command::discard, dtx_id}};
173 return replicate_sm_command(comm).has_value();
174 }
175
176 auto controller::done_cb(const hash_t& dtx_id) -> bool {
177 // Send the done status for this dtx ID and check if it replicated.
178 auto comm = sm_command{{state_machine::command::done, dtx_id}};
179 return replicate_sm_command(comm).has_value();
180 }
181
182 void controller::stop() {
183 // Set the running flag to false, close the network and notify the
184 // batch trigger condition variable so the threads that check them will
185 // end and we can join the threads below.
186 {
187 std::lock_guard<std::mutex> l(m_batch_mut);
188 m_running = false;
189 }
190 m_rpc_server.reset();
191 m_batch_cv.notify_one();
192
193 // Stop each of the locking shard clients to cancel any pending RPCs
194 // and unblock any of the current dtxs so they can mark themselves as
195 // failed and stop executing.
196 {
197 // The lock just protects m_shards which we're not modifying.
198 std::shared_lock<std::shared_mutex> l(m_shards_mut);
199 for(auto& s : m_shards) {
200 s->stop();
201 }
202 }
203
204 // Join the handler and batch execution threads
205 if(m_batch_exec_thread.joinable()) {
206 m_batch_exec_thread.join();
207 }
208
209 // Join any existing dtxs still executing
210 join_execs();
211
212 // Disconnect from the shards
213 {
214 std::unique_lock<std::shared_mutex> l(m_shards_mut);
215 m_shards.clear();
216 }
217 }
218
219 auto controller::recovery_func() -> bool {
220 // Grab any non-completed dtxs from the state machine that were not
221 // done when the previous leader failed.
222 auto comm = sm_command{{state_machine::command::get}};
223 m_logger->info("Waiting for get SM command response");
224 auto r = replicate_sm_command(comm);
225 if(!r) {
226 // We likely stopped being the leader so we couldn't get the dtxs
227 return false;
228 }
229
230 m_logger->info("Started recovery process");
231
232 const auto& res = *r;
233 if(!res) {
234 // I don't think this should happen in practice. It might be wise
235 // to make this fatal.
236 m_logger->error("Empty response object");
237 return false;
238 }
239
240 // List of coordinators/dtxs we're going to recover
241 auto coordinators = std::vector<std::shared_ptr<distributed_tx>>();
242
243 // Deserialize the coordinator state we just retrieved from the RSM
244 auto state = coordinator_state();
245 auto deser = nuraft_serializer(*res);
246 deser >> state;
247
248 for(const auto& prep : state.m_prepare_txs) {
249 // Create a coordinator for the prepare dtx to recover
250 auto coord = std::shared_ptr<distributed_tx>();
251 {
252 std::shared_lock<std::shared_mutex> l(m_shards_mut);
253 coord = std::make_shared<distributed_tx>(prep.first,
254 m_shards,
255 m_logger);
256 }
257 // Tell the coordinator this dtx is in the prepare phase and
258 // provide the list of transactions
259 coord->recover_prepare(prep.second);
260 coordinators.emplace_back(std::move(coord));
261 }
262
263 for(const auto& com : state.m_commit_txs) {
264 auto coord = std::shared_ptr<distributed_tx>();
265 {
266 std::shared_lock<std::shared_mutex> l(m_shards_mut);
267 coord = std::make_shared<distributed_tx>(com.first,
268 m_shards,
269 m_logger);
270 }
271 // Tell the coordinator this dtx is in the commit phase and provide
272 // the flags for which dtxs to complete and the map between shards
273 // and transactions in the batch
274 coord->recover_commit(com.second.first, com.second.second);
275 coordinators.emplace_back(std::move(coord));
276 }
277
278 for(const auto& dis : state.m_discard_txs) {
279 auto coord = std::shared_ptr<distributed_tx>();
280 {
281 std::shared_lock<std::shared_mutex> l(m_shards_mut);
282 coord = std::make_shared<distributed_tx>(dis,
283 m_shards,
284 m_logger);
285 }
286 // Tell the coordinator this dtx is in the discard phase
287 coord->recover_discard();
288 coordinators.emplace_back(std::move(coord));
289 }
290
291 // Flag in case one of the dtxs fails. This would happen if we stopped
292 // being the leader mid-execution.
293 auto success = std::atomic_bool{true};
294 for(auto&& coord : coordinators) {
295 // Register the callbacks for the RSM so we track dtx state during
296 // execution
297 batch_set_cbs(*coord);
298 auto dtx_id_str = to_string(coord->get_id());
299 m_logger->info("Recovering dtx", dtx_id_str);
300 // Create a lambda that handles the execution and cleanup of the
301 // dtx
302 auto f = [&, c{std::move(coord)}, s{std::move(dtx_id_str)}](
303 size_t thread_idx) {
304 // Execute the dtx from its most recent phase
305 auto exec_res = c->execute();
306 if(!exec_res) {
307 m_logger->error("Failed to recover dtx", s);
308 // We probably stopped being the leader so set the success
309 // flag
310 success = false;
311 } else {
312 m_logger->info("Recovered dtx", s);
313 }
314 // Mark the thread we were using as done so it can be re-used
315 {
316 std::shared_lock<std::shared_mutex> l(m_exec_mut);
317 m_exec_threads[thread_idx].second = false;
318 }
319 };
320 // Schedule the lambda on an available executor thread. Blocks
321 // until there's a thread available
322 schedule_exec(std::move(f));
323 }
324
325 // Make sure we recovered fully before returning
326 join_execs();
327
328 return success;
329 }
330
331 void controller::batch_set_cbs(distributed_tx& c) {
332 auto s = c.get_state();
333 // Register all the RSM callbacks so the state machine tracks the state
334 // of each outstanding dtxn. Don't register the callback for the phase
335 // the dtxn is currently in as the state machine already knows about
336 // it, we can skip re-notification.
338 c.set_prepare_cb([&](auto&& dtx_id, auto&& txs) {
339 return prepare_cb(std::forward<decltype(dtx_id)>(dtx_id),
340 std::forward<decltype(txs)>(txs));
341 });
342 }
344 c.set_commit_cb(
345 [&](auto&& dtx_id, auto&& complete_txs, auto&& tx_idxs) {
346 return commit_cb(
347 std::forward<decltype(dtx_id)>(dtx_id),
348 std::forward<decltype(complete_txs)>(complete_txs),
349 std::forward<decltype(tx_idxs)>(tx_idxs));
350 });
351 }
353 c.set_discard_cb([&](auto&& dtx_id) {
354 return discard_cb(std::forward<decltype(dtx_id)>(dtx_id));
355 });
356 }
358 c.set_done_cb([&](auto&& dtx_id) {
359 return done_cb(std::forward<decltype(dtx_id)>(dtx_id));
360 });
361 }
362 }
363
364 void controller::batch_executor_func() {
365 while(m_running) {
366 {
367 // Wait until there are transactions ready to be processed in a
368 // dtx batch
369 std::unique_lock<std::mutex> l(m_batch_mut);
370 m_batch_cv.wait(l, [&]() {
371 return !m_current_txs->empty() || !m_running;
372 });
373 }
374 if(!m_running) {
375 break;
376 }
377
378 // Placeholders where we're going to move the current batch and map
379 // of tx to sentinel so we can send the responses.
380 auto batch = std::shared_ptr<distributed_tx>();
381 auto txs
382 = std::shared_ptr<decltype(m_current_txs)::element_type>();
383
384 // New batch we're going to swap out with the current batch being
385 // built by the handler thread
386 auto new_batch = std::shared_ptr<distributed_tx>();
387 {
388 std::shared_lock<std::shared_mutex> l(m_shards_mut);
389 new_batch
390 = std::make_shared<distributed_tx>(m_rnd.random_hash(),
391 m_shards,
392 m_logger);
393 }
394
395 // Atomically swap the current batch and tx->sentinel map with new
396 // ones so we can run this batch while the handler thread builds a
397 // new one
398 {
399 std::lock_guard<std::mutex> l(m_batch_mut);
400 batch = std::move(m_current_batch);
401 txs = std::move(m_current_txs);
402 m_current_batch = std::move(new_batch);
403 batch_set_cbs(*m_current_batch);
404 m_current_txs = std::make_shared<
405 decltype(m_current_txs)::element_type>();
406 }
407
408 // Notify the handler thread it can re-start adding transactions to
409 // the current batch
410 m_batch_cv.notify_one();
411
412 // Lambda to execute the batch and respond to the sentinel with the
413 // result
414 auto f = [&, b{std::move(batch)}, t{std::move(txs)}](
415 size_t thread_idx) {
416 auto dtxid = to_string(b->get_id());
417 m_logger->info("dtxn start:", dtxid, "size:", t->size());
418 auto s = std::chrono::high_resolution_clock::now();
419 // Execute the batch from the start
420 auto res = b->execute();
421 // For each tx result in the batch create a message with
422 // the txid and the result, and send it to the appropriate
423 // sentinel.
424 for(const auto& [tx_id, metadata] : *t) {
425 const auto& [cb_func, batch_idx] = metadata;
426 auto tx_res = std::optional<bool>();
427 if(res.has_value()) {
428 tx_res = static_cast<bool>((*res)[batch_idx]);
429 }
430 cb_func(tx_res);
431 }
432 if(!res) {
433 // We probably stopped being the leader and we don't know
434 // the result of the txs so we can't respond to the
435 // sentinels. Just warn and clean up. The new leader will
436 // recover the dtx.
437 m_logger->warn("dtxn failed:", dtxid);
438 } else {
439 auto e = std::chrono::high_resolution_clock::now();
440 auto l = (e - s).count();
441 m_logger->info("dtxn done:",
442 dtxid,
443 "t:",
444 l,
445 "size:",
446 res->size());
447 }
448 // Mark our thread as done so it can be re-used
449 {
450 std::shared_lock<std::shared_mutex> l(m_exec_mut);
451 m_exec_threads[thread_idx].second = false;
452 }
453 };
454 // Schedule our executor lambda, block until there's a thread
455 // available
456 schedule_exec(std::move(f));
457 }
458 }
459
460 auto controller::replicate_sm_command(const sm_command& c)
461 -> std::optional<nuraft::ptr<nuraft::buffer>> {
462 auto buf = nuraft::buffer::alloc(serialized_size(c));
463 auto ser = nuraft_serializer(*buf);
464 ser << c;
465 // Sanity check to ensure total_sz was correct
466 assert(ser.end_of_buffer());
467 // Use synchronous mode to block until replication or failure
468 return m_raft_serv->replicate_sync(buf);
469 }
470
471 void controller::connect_shards() {
472 // Make a network::network for each shard cluster and a locking
473 // shard client to manage RPCs. Add the clients to the m_shards map so
474 // the dtxs can use them.
475 for(size_t i{0}; i < m_shard_endpoints.size(); i++) {
476 m_logger->warn("Connecting to shard cluster", std::to_string(i));
477 auto s = std::make_shared<locking_shard::rpc::client>(
478 m_shard_endpoints[i],
479 m_shard_ranges[i],
480 *m_logger);
481 if(!s->init()) {
482 m_logger->fatal("Failed to initialize shard client");
483 }
484 {
485 std::unique_lock<std::shared_mutex> l(m_shards_mut);
486 m_shards.emplace_back(std::move(s));
487 }
488 }
489 }
490
491 void controller::schedule_exec(std::function<void(size_t)>&& f) {
492 // Loop until we successfully scheduled the given lambda on a thread
493 bool found_thread{false};
494 while(!found_thread) {
495 {
496 std::unique_lock<std::shared_mutex> l(m_exec_mut);
497 for(size_t i{0}; i < m_exec_threads.size(); i++) {
498 auto& thr = m_exec_threads[i];
499 // If the thread is marked done we can use it
500 if(!thr.second) {
501 // Make sure the previous thread is joined
502 if(thr.first && thr.first->joinable()) {
503 thr.first->join();
504 }
505 // Mark the thread as in-use
506 thr.second = true;
507 // Start the thread with the given lambda and provide
508 // index of the thread so it can mark itself as done
509 // Not use-after-move because the outer loop exits once
510 // f has been moved due to found_thread.
511 thr.first // NOLINTNEXTLINE(bugprone-use-after-move)
512 = std::make_shared<std::thread>(std::move(f), i);
513 found_thread = true;
514 break;
515 }
516 }
517 }
518 if(!found_thread) {
519 // For now just yield to the scheduler if there were no
520 // complete threads this time. In the future this could be a
521 // condition variable instead.
522 std::this_thread::yield();
523 }
524 }
525 }
526
527 void controller::join_execs() {
528 std::shared_lock<std::shared_mutex> l(m_exec_mut);
529 for(auto& t : m_exec_threads) {
530 if(t.first && t.first->joinable()) {
531 t.first->join();
532 }
533 }
534 }
535
536 void controller::start_stop_func() {
537 while(!m_quit) {
538 bool stopping{false};
539 bool quitting{false};
540 {
541 // Wait until we're stopping, starting or quitting
542 std::unique_lock<std::mutex> l(m_start_mut);
543 m_start_cv.wait(l, [&]() {
544 return m_start_flag || m_quit || m_stop_flag;
545 });
546 // Store our plan of action so we can release the lock on these
547 // flags in case the NuRaft handler needs to set them
548 // differently while we're busy starting/stopping.
549 if(m_quit) {
550 quitting = true;
551 stopping = true;
552 } else {
553 // Sanity check: we should be stopping or starting, not
554 // both
555 assert(m_start_flag ^ m_stop_flag);
556 if(m_stop_flag) {
557 stopping = true;
558 }
559 m_start_flag = false;
560 m_stop_flag = false;
561 }
562 }
563
564 if(stopping) {
565 m_logger->warn("Stopping coordinator");
566 stop();
567 m_logger->warn("Stopped coordinator");
568 if(quitting) {
569 m_logger->warn("Quitting");
570 break;
571 }
572 } else {
573 m_logger->warn("Stopping coordinator before start");
574 // Make sure the coordinator is stopped before starting it
575 // again to satisfy any preconditions and not leave the
576 // coordinator in a partial state
577 stop();
578 m_logger->warn("Starting coordinator");
579 start();
580 m_logger->warn("Started coordinator");
581 }
582 }
583 }
584
585 void controller::start() {
586 // Set the running flag to true so when we start the threads they won't
587 // immediately exit
588 {
589 std::lock_guard<std::mutex> l(m_batch_mut);
590 m_running = true;
591 }
592 m_logger->warn("Resetting sentinel network handler");
593 // Reset the handler network instance so we can re-use it
594 m_rpc_server.reset();
595 m_logger->warn("Connecting to shards");
596 // Connect to the shard clusters
597 connect_shards();
598 m_logger->warn("Became leader, recovering dtxs");
599 // Attempt recovery of existing dtxs until we stop being the leader or
600 // recovery succeeds
601 bool recovered{false};
602 do {
603 auto res = recovery_func();
604 if(!res) {
605 m_logger->error("Failed to recover, likely stopped "
606 "being leader");
607 continue;
608 }
609 recovered = true;
610 } while(!recovered && m_raft_serv->is_leader());
611 m_logger->info("Recovery complete");
612
613 // If we stopped being the leader while attempting to recover we
614 // shouldn't bother starting and handler threads
615 if(!m_raft_serv->is_leader()) {
616 return;
617 }
618
619 // Create a fresh batch to add transactions to
620 auto batch = std::shared_ptr<distributed_tx>();
621 {
622 std::shared_lock<std::shared_mutex> ll(m_shards_mut);
623 batch = std::make_shared<distributed_tx>(m_rnd.random_hash(),
624 m_shards,
625 m_logger);
626 }
627 // Register the RSM callbacks with the batch
628 batch_set_cbs(*batch);
629
630 // Atomically set the current batch and a new tx->sentinel map
631 {
632 std::lock_guard<std::mutex> ll(m_batch_mut);
633 m_current_batch = std::move(batch);
634 m_current_txs
635 = std::make_shared<decltype(m_current_txs)::element_type>();
636 }
637
638 // Start the batch executor thread
639 m_batch_exec_thread = std::thread([&] {
640 batch_executor_func();
641 });
642
643 // Listen on the coordinator endpoint and start handling incoming txs
644 auto rpc_server = std::make_unique<cbdc::rpc::tcp_server<
646 m_handler_endpoint);
647 if(!rpc_server->init()) {
648 m_logger->fatal("Failed to start RPC server");
649 }
650
651 m_rpc_server = std::make_unique<decltype(m_rpc_server)::element_type>(
652 this,
653 std::move(rpc_server));
654 }
655
657 // Notify the start/stop thread that we're quitting. One thread handles
658 // starting and stopping to ensure only one thing is happening at a
659 // time avoiding races on handler threads.
660 {
661 std::unique_lock<std::mutex> l(m_start_mut);
662 m_quit = true;
663 }
664 m_start_cv.notify_one();
665 if(m_start_thread.joinable()) {
666 m_start_thread.join();
667 }
668 }
669
671 const sm_command_header& rhs) const -> bool {
672 return std::tie(m_comm, m_dtx_id)
673 == std::tie(rhs.m_comm, rhs.m_dtx_id);
674 }
675
677 const coordinator_state& rhs) const -> bool {
678 return std::tie(m_prepare_txs, m_commit_txs, m_discard_txs)
679 == std::tie(rhs.m_prepare_txs,
680 rhs.m_commit_txs,
681 rhs.m_discard_txs);
682 }
683
685 callback_type result_callback)
686 -> bool {
687 // If we're not the leader we can't process txs
688 if(!m_raft_serv->is_leader()) {
689 return false;
690 }
691
693 tx,
694 m_opts.m_sentinel_public_keys,
695 m_opts.m_attestation_threshold)) {
696 m_logger->warn("Received invalid compact transaction",
697 to_string(tx.m_id));
698 return false;
699 }
700
701 auto added = [&]() {
702 // Wait until there's space in the current batch
703 std::unique_lock<std::mutex> l(m_batch_mut);
704 m_batch_cv.wait(l, [&]() {
705 return m_current_txs->size() < m_batch_size || !m_running;
706 });
707 if(!m_running) {
708 return false;
709 }
710
711 // Make sure the TX is not already in the current batch
712 if(m_current_txs->find(tx.m_id) != m_current_txs->end()) {
713 return false;
714 }
715 // Add the tx to the current dtx batch and record its index
716 auto idx = m_current_batch->add_tx(tx);
717 // Map the index of the tx to the transaction ID and sentinel
718 // ID
719 m_current_txs->emplace(
720 tx.m_id,
721 std::make_pair(std::move(result_callback), idx));
722 return true;
723 }();
724 if(added) {
725 // If this was a new TX, notify the executor thread there's work to
726 // do
727 m_batch_cv.notify_one();
728 }
729
730 return added;
731 }
732}
void quit()
Terminates the replicated coordinator instance.
auto init() -> bool
Starts the replicated coordinator and associated raft server.
auto execute_transaction(transaction::compact_tx tx, callback_type result_callback) -> bool override
Coordinates a transaction among locking shards.
@ prepare
dtx is calling prepare on shards
@ discard
dtx is calling discard on shards
@ commit
dtx is calling commit on shards
std::function< void(std::optional< bool >)> callback_type
Signature of callback function for a transaction execution result.
Raft state machine for managing a replicated coordinator.
@ prepare
Stores a dtx in the prepare phase.
@ done
Clears the dtx from the coordinator state.
@ discard
Moves a dtx from commit to discard.
@ get
Retrieves all active dtxs.
@ commit
Moves a dtx from prepare to commit.
auto random_hash() -> hash_t
Returns a random 32-byte hash value.
Generic asynchronous RPC server.
Implements an RPC server over a TCP socket.
auto check_attestations(const transaction::compact_tx &tx, const std::unordered_set< pubkey_t, hashing::null > &pubkeys, size_t threshold) -> bool
Validates the sentinel attestations attached to a compact transaction.
std::array< unsigned char, cbdc::hash_size > hash_t
SHA256 hash container.
auto serialized_size(const T &obj) -> size_t
Calculates the serialized size in bytes of the given object when serialized using serializer.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
Project-wide configuration options.
Definition config.hpp:132
int32_t m_raft_max_batch
Maximum number of raft log entries to batch into one RPC message.
Definition config.hpp:183
int32_t m_election_timeout_lower
Raft election timeout lower bound in milliseconds.
Definition config.hpp:176
int32_t m_election_timeout_upper
Raft election timeout upper bound in milliseconds.
Definition config.hpp:173
int32_t m_heartbeat
Raft heartbeat timeout in milliseconds.
Definition config.hpp:179
Current state of distributed transactions managed by a coordinator.
auto operator==(const coordinator_state &rhs) const -> bool
auto operator==(const sm_command_header &rhs) const -> bool
A condensed, hash-only transaction representation.