OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
uhs/twophase/coordinator/state_machine.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "state_machine.hpp"
7
8#include "controller.hpp"
9#include "format.hpp"
12
13namespace cbdc::coordinator {
14 auto state_machine::commit(uint64_t log_idx, nuraft::buffer& data)
15 -> nuraft::ptr<nuraft::buffer> {
16 assert(log_idx == m_last_committed_idx + 1);
17 m_last_committed_idx = log_idx;
19 auto deser = cbdc::nuraft_serializer(data);
20 // Deserialize the header from the state machine command
21 deser >> comm;
22 switch(comm.m_comm) {
23 case command::prepare: {
24 // Put the dtx in the prepare phase and associated data in the
25 // relevant map
26 auto res = m_state.m_prepare_txs.emplace(
27 comm.m_dtx_id.value(),
28 nuraft::buffer::copy(data));
29 if(!res.second) {
30 // dtx IDs are supposed to be unique so if the dtx is
31 // already present in the prepare map, that would indicate
32 // a bug elsewhere in the codebase. Crash to protect the
33 // system.
34 m_logger->fatal("Duplicate prepare for dtx",
35 to_string(comm.m_dtx_id.value()));
36 }
37 break;
38 }
39 case command::commit: {
40 // Remove the dtx from the prepare map
41 auto res = m_state.m_prepare_txs.erase(comm.m_dtx_id.value());
42 if(res == 0U) {
43 // To be in the commit phase the dtx should have been in
44 // the prepare phase. If it's not, that's a bug and we
45 // crash to protect the system.
46 m_logger->fatal("Prepare not found for commit dtx",
47 to_string(comm.m_dtx_id.value()));
48 }
49 // Put the dtx in the commit map with associated data
50 m_state.m_commit_txs.emplace(comm.m_dtx_id.value(),
51 nuraft::buffer::copy(data));
52
53 break;
54 }
55 case command::discard: {
56 // Remove the dtx from the commit map
57 auto res = m_state.m_commit_txs.erase(comm.m_dtx_id.value());
58 if(res == 0U) {
59 // If the dtx wasn't in the commit map a bug has occurred
60 // and we crash
61 m_logger->fatal("Commit not found for discard dtx",
62 to_string(comm.m_dtx_id.value()));
63 }
64 // Add the dtx to the discard set
65 m_state.m_discard_txs.emplace(comm.m_dtx_id.value());
66 break;
67 }
68 case command::done: {
69 // Remove the dtx from the discard map
70 auto res = m_state.m_discard_txs.erase(comm.m_dtx_id.value());
71 if(res == 0U) {
72 // The dtx must have been discarded to be considered done.
73 // If it wasn't in the map, crash.
74 m_logger->fatal("Discard not found for done dtx",
75 to_string(comm.m_dtx_id.value()));
76 }
77 break;
78 }
79 case command::get: {
80 // Retrieve and serialize the current coordinator state to send
81 // back to the requester
82 auto ret
83 = nuraft::buffer::alloc(cbdc::serialized_size(m_state));
84 auto ser = cbdc::nuraft_serializer(*ret);
85 ser << m_state;
86 // Sanity check to ensure ret_sz is correct
87 assert(ser.end_of_buffer());
88 return ret;
89 }
90 }
91 return nullptr;
92 }
93
95 const nuraft::ulong log_idx,
96 nuraft::ptr<nuraft::cluster_config>& /*new_conf*/) {
97 assert(log_idx == m_last_committed_idx + 1);
98 m_last_committed_idx = log_idx;
99 }
100
101 auto state_machine::apply_snapshot(nuraft::snapshot& /* s */) -> bool {
102 // TODO: implement snapshots (cf. #768)
103 return false;
104 }
105
106 auto state_machine::last_snapshot() -> nuraft::ptr<nuraft::snapshot> {
107 return nullptr;
108 }
109
111 return m_last_committed_idx;
112 }
113
115 nuraft::snapshot& /* s */,
116 nuraft::async_result<bool>::handler_type& when_done) {
117 bool res{false};
118 auto snp = nuraft::ptr<std::exception>();
119 when_done(res, snp);
120 }
121
122 state_machine::state_machine(std::shared_ptr<logging::log> logger)
123 : m_logger(std::move(logger)) {}
124}
auto apply_snapshot(nuraft::snapshot &) -> bool override
Not implemented for coordinators.
void commit_config(nuraft::ulong log_idx, nuraft::ptr< nuraft::cluster_config > &) override
Handler for the raft cluster configuration changes.
@ commit
Moves a dtx from prepare to commit.
state_machine(std::shared_ptr< logging::log > logger)
Constructor.
auto last_snapshot() -> nuraft::ptr< nuraft::snapshot > override
Not implemented for coordinators.
auto last_commit_index() -> uint64_t override
Returns the index of the last-committed command.
void create_snapshot(nuraft::snapshot &, nuraft::async_result< bool >::handler_type &when_done) override
Not implemented for coordinators.
Implements serializer for nuraft::buffer.
auto serialized_size(const T &obj) -> size_t
Calculates the serialized size in bytes of the given object when serialized using serializer.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.