OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
distributed_tx.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "distributed_tx.hpp"
7
8#include <future>
9
10namespace cbdc::coordinator {
12 const hash_t& dtx_id,
13 std::vector<std::shared_ptr<locking_shard::interface>> shards,
14 std::shared_ptr<logging::log> logger)
15 : m_dtx_id(dtx_id),
16 m_shards(std::move(shards)),
17 m_logger(std::move(logger)) {
18 m_txs.resize(m_shards.size());
19 m_tx_idxs.resize(m_shards.size());
20 assert(!m_shards.empty());
21 }
22
23 auto distributed_tx::prepare() -> std::optional<std::vector<bool>> {
24 if(m_prepare_cb) {
25 auto res = m_prepare_cb(m_dtx_id, m_full_txs);
26 if(!res) {
27 m_state = dtx_state::failed;
28 return std::nullopt;
29 }
30 }
31 auto futures = std::vector<
32 std::pair<std::future<std::optional<std::vector<bool>>>,
33 size_t>>();
34 for(size_t i{0}; i < m_shards.size(); i++) {
35 if(m_tx_idxs[i].empty()) {
36 continue;
37 }
38 const auto& shard = m_shards[i];
39 auto f = std::async(std::launch::async,
41 shard,
42 std::move(m_txs[i]),
43 m_dtx_id);
44 futures.emplace_back(std::move(f), i);
45 }
46 auto ret = std::vector<bool>(m_full_txs.size(), true);
47 for(auto& f : futures) {
48 auto res = f.first.get();
49 if(!res) {
50 m_state = dtx_state::failed;
51 return std::nullopt;
52 }
53 if(res->size() != m_tx_idxs[f.second].size()) {
54 m_logger->fatal(
55 "Shard prepare response has not enough statuses",
56 to_string(m_dtx_id),
57 "expected:",
58 m_tx_idxs[f.second].size(),
59 "got:",
60 res->size());
61 }
62 for(size_t i{0}; i < res->size(); i++) {
63 if(!(*res)[i]) {
64 ret[m_tx_idxs[f.second][i]] = false;
65 }
66 }
67 }
68 m_state = dtx_state::commit;
69 return ret;
70 }
71
72 auto distributed_tx::commit(const std::vector<bool>& complete_txs)
73 -> bool {
74 if(m_commit_cb) {
75 auto res = m_commit_cb(m_dtx_id, complete_txs, m_tx_idxs);
76 if(!res) {
77 m_state = dtx_state::failed;
78 return false;
79 }
80 }
81 auto futures = std::vector<std::future<bool>>();
82 for(size_t i{0}; i < m_shards.size(); i++) {
83 if(m_tx_idxs[i].empty()) {
84 continue;
85 }
86 const auto& shard = m_shards[i];
87 auto shard_complete_txs = std::vector<bool>(m_tx_idxs[i].size());
88 for(size_t j{0}; j < shard_complete_txs.size(); j++) {
89 shard_complete_txs[j] = complete_txs[m_tx_idxs[i][j]];
90 }
91 auto f = std::async(std::launch::async,
93 shard,
94 std::move(shard_complete_txs),
95 m_dtx_id);
96 futures.emplace_back(std::move(f));
97 }
98 for(auto& f : futures) {
99 auto res = f.get();
100 if(!res) {
101 m_state = dtx_state::failed;
102 return false;
103 }
104 }
105 m_state = dtx_state::discard;
106 return true;
107 }
108
109 auto distributed_tx::execute() -> std::optional<std::vector<bool>> {
110 auto dtxid_str = to_string(m_dtx_id);
111 if(m_state == dtx_state::prepare || m_state == dtx_state::start) {
112 m_logger->info("Preparing", dtxid_str);
113 auto res = prepare();
114 if(!res) {
115 return std::nullopt;
116 }
117 m_complete_txs = std::move(*res);
118 if(m_complete_txs.size() != m_full_txs.size()) {
119 m_logger->fatal("Prepare has incorrect number of statuses",
120 dtxid_str,
121 "expected:",
122 m_full_txs.size(),
123 "got:",
124 m_complete_txs.size());
125 }
126 m_logger->info("Prepared", dtxid_str);
127 }
128 if(m_state == dtx_state::commit) {
129 m_logger->info("Committing", dtxid_str);
130 auto res = commit(m_complete_txs);
131 if(!res) {
132 return std::nullopt;
133 }
134 m_logger->info("Committed", dtxid_str);
135 }
136 if(m_state == dtx_state::discard) {
137 m_logger->info("Discarding", dtxid_str);
138 auto res = discard();
139 if(!res) {
140 return std::nullopt;
141 }
142 m_logger->info("Discarded", dtxid_str);
143 }
144 return m_complete_txs;
145 }
146
148 for(size_t i{0}; i < m_shards.size(); i++) {
149 const auto& shard = m_shards[i];
150 auto stx = locking_shard::tx();
151 stx.m_tx = tx;
152 bool active{false};
153 if(shard->hash_in_shard_range(tx.m_id)) {
154 active = true;
155 } else {
156 for(const auto& inp : tx.m_inputs) {
157 if(shard->hash_in_shard_range(inp)) {
158 active = true;
159 break;
160 }
161 }
162 if(!active) {
163 for(const auto& out : tx.m_uhs_outputs) {
164 if(shard->hash_in_shard_range(out)) {
165 active = true;
166 break;
167 }
168 }
169 }
170 }
171 if(active) {
172 m_txs[i].emplace_back(std::move(stx));
173 m_tx_idxs[i].emplace_back(m_full_txs.size());
174 }
175 }
176 m_full_txs.emplace_back(tx);
177 return m_full_txs.size() - 1;
178 }
179
180 auto distributed_tx::discard() -> bool {
181 if(m_discard_cb) {
182 auto res = m_discard_cb(m_dtx_id);
183 if(!res) {
184 m_state = dtx_state::failed;
185 return false;
186 }
187 }
188 auto futures = std::vector<std::future<bool>>();
189 for(size_t i{0}; i < m_shards.size(); i++) {
190 if(m_tx_idxs[i].empty()) {
191 continue;
192 }
193 const auto& shard = m_shards[i];
194 auto f = std::async(std::launch::async,
196 shard,
197 m_dtx_id);
198 futures.emplace_back(std::move(f));
199 }
200 for(auto& f : futures) {
201 auto res = f.get();
202 if(!res) {
203 m_state = dtx_state::failed;
204 return false;
205 }
206 }
207 if(m_done_cb) {
208 auto res = m_done_cb(m_dtx_id);
209 if(!res) {
210 m_state = dtx_state::failed;
211 return false;
212 }
213 }
214 m_state = dtx_state::done;
215 return true;
216 }
217
219 return m_dtx_id;
220 }
221
223 m_prepare_cb = cb;
224 }
225
227 m_commit_cb = cb;
228 }
229
231 m_discard_cb = cb;
232 }
233
235 m_done_cb = cb;
236 }
237
239 const std::vector<transaction::compact_tx>& txs) {
240 m_state = dtx_state::prepare;
241 for(const auto& tx : txs) {
242 add_tx(tx);
243 }
244 }
245
247 const std::vector<bool>& complete_txs,
248 const std::vector<std::vector<uint64_t>>& tx_idxs) {
249 m_state = dtx_state::commit;
250 m_tx_idxs = tx_idxs;
251 m_complete_txs = complete_txs;
252 }
253
257
258 auto distributed_tx::size() const -> size_t {
259 return std::max(m_full_txs.size(), m_complete_txs.size());
260 }
261
263 return m_state;
264 }
265}
std::function< bool(const hash_t &)> discard_cb_t
void set_commit_cb(const commit_cb_t &cb)
Registers a callback to be called before starting the commit phase of the dtx.
auto size() const -> size_t
Returns the number of transactions in the dtx.
auto get_id() const -> hash_t
Returns the dtx ID associated with this coordinator instance.
void recover_commit(const std::vector< bool > &complete_txs, const std::vector< std::vector< uint64_t > > &tx_idxs)
Sets the state of the dtx to commit and sets the state from the end of the prepare phase so that exec...
auto execute() -> std::optional< std::vector< bool > >
Executes the dtx batch to completion or failure, either from start, or an intermediate state if one o...
void set_discard_cb(const discard_cb_t &cb)
Registers a callback to be called before the discard phase of the dtx is started.
@ prepare
dtx is calling prepare on shards
@ discard
dtx is calling discard on shards
@ commit
dtx is calling commit on shards
void recover_discard()
Sets the state of the dtx to discard so that execute() will start from the discard phase.
auto add_tx(const transaction::compact_tx &tx) -> size_t
Adds a TX to the batch managed by this coordinator and dtx ID.
void recover_prepare(const std::vector< transaction::compact_tx > &txs)
Sets the state of the dtx to prepare and re-adds all the txs included in the batch.
std::function< bool(const hash_t &, const std::vector< transaction::compact_tx > &)> prepare_cb_t
std::function< bool(const hash_t &, const std::vector< bool > &, const std::vector< std::vector< uint64_t > > &)> commit_cb_t
auto get_state() const -> dtx_state
Returns the current state of the dtx.
distributed_tx(const hash_t &dtx_id, std::vector< std::shared_ptr< locking_shard::interface > > shards, std::shared_ptr< logging::log > logger)
Constructs a new transaction coordinator instance.
void set_prepare_cb(const prepare_cb_t &cb)
Registers a callback to be called before starting the prepare phase of the dtx.
std::function< bool(const hash_t &)> done_cb_t
void set_done_cb(const done_cb_t &cb)
Registers a callback to be called before the done phase of the dtx is started.
virtual auto lock_outputs(std::vector< tx > &&txs, const hash_t &dtx_id) -> std::optional< std::vector< bool > >=0
Attempts to lock the input hashes for the given vector of transactions.
virtual auto apply_outputs(std::vector< bool > &&complete_txs, const hash_t &dtx_id) -> bool=0
Completes a previous lock operation by deleting input hashes and creating output hashes,...
virtual auto discard_dtx(const hash_t &dtx_id) -> bool=0
Discards any cached information about a given distributed transaction.
std::array< unsigned char, cbdc::hash_size > hash_t
SHA256 hash container.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
Transaction type processed by locking shards.
A condensed, hash-only transaction representation.