13 std::vector<std::shared_ptr<locking_shard::interface>> shards,
14 std::shared_ptr<logging::log> logger)
16 m_shards(std::move(shards)),
17 m_logger(std::move(logger)) {
18 m_txs.resize(m_shards.size());
19 m_tx_idxs.resize(m_shards.size());
20 assert(!m_shards.empty());
23 auto distributed_tx::prepare() -> std::optional<std::vector<bool>> {
25 auto res = m_prepare_cb(m_dtx_id, m_full_txs);
27 m_state = dtx_state::failed;
31 auto futures = std::vector<
32 std::pair<std::future<std::optional<std::vector<bool>>>,
34 for(
size_t i{0}; i < m_shards.size(); i++) {
35 if(m_tx_idxs[i].empty()) {
38 const auto& shard = m_shards[i];
39 auto f = std::async(std::launch::async,
44 futures.emplace_back(std::move(f), i);
46 auto ret = std::vector<bool>(m_full_txs.size(),
true);
47 for(
auto& f : futures) {
48 auto res = f.first.get();
50 m_state = dtx_state::failed;
53 if(res->size() != m_tx_idxs[f.second].size()) {
55 "Shard prepare response has not enough statuses",
58 m_tx_idxs[f.second].size(),
62 for(
size_t i{0}; i < res->size(); i++) {
64 ret[m_tx_idxs[f.second][i]] =
false;
68 m_state = dtx_state::commit;
75 auto res = m_commit_cb(m_dtx_id, complete_txs, m_tx_idxs);
77 m_state = dtx_state::failed;
81 auto futures = std::vector<std::future<bool>>();
82 for(
size_t i{0}; i < m_shards.size(); i++) {
83 if(m_tx_idxs[i].empty()) {
86 const auto& shard = m_shards[i];
87 auto shard_complete_txs = std::vector<bool>(m_tx_idxs[i].size());
88 for(
size_t j{0}; j < shard_complete_txs.size(); j++) {
89 shard_complete_txs[j] = complete_txs[m_tx_idxs[i][j]];
91 auto f = std::async(std::launch::async,
94 std::move(shard_complete_txs),
96 futures.emplace_back(std::move(f));
98 for(
auto& f : futures) {
101 m_state = dtx_state::failed;
105 m_state = dtx_state::discard;
111 if(m_state == dtx_state::prepare || m_state == dtx_state::start) {
112 m_logger->info(
"Preparing", dtxid_str);
113 auto res = prepare();
117 m_complete_txs = std::move(*res);
118 if(m_complete_txs.size() != m_full_txs.size()) {
119 m_logger->fatal(
"Prepare has incorrect number of statuses",
124 m_complete_txs.size());
126 m_logger->info(
"Prepared", dtxid_str);
128 if(m_state == dtx_state::commit) {
129 m_logger->info(
"Committing", dtxid_str);
130 auto res = commit(m_complete_txs);
134 m_logger->info(
"Committed", dtxid_str);
136 if(m_state == dtx_state::discard) {
137 m_logger->info(
"Discarding", dtxid_str);
138 auto res = discard();
142 m_logger->info(
"Discarded", dtxid_str);
144 return m_complete_txs;
148 for(
size_t i{0}; i < m_shards.size(); i++) {
149 const auto& shard = m_shards[i];
153 if(shard->hash_in_shard_range(tx.m_id)) {
156 for(
const auto& inp : tx.m_inputs) {
157 if(shard->hash_in_shard_range(inp)) {
163 for(
const auto& out : tx.m_uhs_outputs) {
164 if(shard->hash_in_shard_range(out)) {
172 m_txs[i].emplace_back(std::move(stx));
173 m_tx_idxs[i].emplace_back(m_full_txs.size());
176 m_full_txs.emplace_back(tx);
177 return m_full_txs.size() - 1;
180 auto distributed_tx::discard() ->
bool {
182 auto res = m_discard_cb(m_dtx_id);
184 m_state = dtx_state::failed;
188 auto futures = std::vector<std::future<bool>>();
189 for(
size_t i{0}; i < m_shards.size(); i++) {
190 if(m_tx_idxs[i].empty()) {
193 const auto& shard = m_shards[i];
194 auto f = std::async(std::launch::async,
198 futures.emplace_back(std::move(f));
200 for(
auto& f : futures) {
203 m_state = dtx_state::failed;
208 auto res = m_done_cb(m_dtx_id);
210 m_state = dtx_state::failed;
214 m_state = dtx_state::done;
239 const std::vector<transaction::compact_tx>& txs) {
241 for(
const auto& tx : txs) {
247 const std::vector<bool>& complete_txs,
248 const std::vector<std::vector<uint64_t>>& tx_idxs) {
251 m_complete_txs = complete_txs;
259 return std::max(m_full_txs.size(), m_complete_txs.size());
std::function< bool(const hash_t &)> discard_cb_t
void set_commit_cb(const commit_cb_t &cb)
Registers a callback to be called before starting the commit phase of the dtx.
auto size() const -> size_t
Returns the number of transactions in the dtx.
auto get_id() const -> hash_t
Returns the dtx ID associated with this coordinator instance.
void recover_commit(const std::vector< bool > &complete_txs, const std::vector< std::vector< uint64_t > > &tx_idxs)
Sets the state of the dtx to commit and sets the state from the end of the prepare phase so that exec...
auto execute() -> std::optional< std::vector< bool > >
Executes the dtx batch to completion or failure, either from start, or an intermediate state if one o...
void set_discard_cb(const discard_cb_t &cb)
Registers a callback to be called before the discard phase of the dtx is started.
@ prepare
dtx is calling prepare on shards
@ discard
dtx is calling discard on shards
@ commit
dtx is calling commit on shards
void recover_discard()
Sets the state of the dtx to discard so that execute() will start from the discard phase.
auto add_tx(const transaction::compact_tx &tx) -> size_t
Adds a TX to the batch managed by this coordinator and dtx ID.
void recover_prepare(const std::vector< transaction::compact_tx > &txs)
Sets the state of the dtx to prepare and re-adds all the txs included in the batch.
std::function< bool(const hash_t &, const std::vector< transaction::compact_tx > &)> prepare_cb_t
std::function< bool(const hash_t &, const std::vector< bool > &, const std::vector< std::vector< uint64_t > > &)> commit_cb_t
auto get_state() const -> dtx_state
Returns the current state of the dtx.
distributed_tx(const hash_t &dtx_id, std::vector< std::shared_ptr< locking_shard::interface > > shards, std::shared_ptr< logging::log > logger)
Constructs a new transaction coordinator instance.
void set_prepare_cb(const prepare_cb_t &cb)
Registers a callback to be called before starting the prepare phase of the dtx.
std::function< bool(const hash_t &)> done_cb_t
void set_done_cb(const done_cb_t &cb)
Registers a callback to be called before the done phase of the dtx is started.
virtual auto lock_outputs(std::vector< tx > &&txs, const hash_t &dtx_id) -> std::optional< std::vector< bool > >=0
Attempts to lock the input hashes for the given vector of transactions.
virtual auto apply_outputs(std::vector< bool > &&complete_txs, const hash_t &dtx_id) -> bool=0
Completes a previous lock operation by deleting input hashes and creating output hashes,...
virtual auto discard_dtx(const hash_t &dtx_id) -> bool=0
Discards any cached information about a given distributed transaction.
std::array< unsigned char, cbdc::hash_size > hash_t
SHA256 hash container.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
Transaction type processed by locking shards.
A condensed, hash-only transaction representation.