OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
shard.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "shard.hpp"
7
8#include <utility>
9
10namespace cbdc::shard {
12 : m_prefix_range(std::move(prefix_range)) {}
13
14 auto shard::open_db(const std::string& db_dir)
15 -> std::optional<std::string> {
16 leveldb::Options opt;
17 opt.create_if_missing = true;
18
19 leveldb::DB* db_ptr{};
20 const auto res = leveldb::DB::Open(opt, db_dir, &db_ptr);
21
22 if(!res.ok()) {
23 return res.ToString();
24 }
25 this->m_db.reset(db_ptr);
26
27 // Read best block height from database or initialize it to zero
28 std::string bestBlockHeight;
29 const auto bestBlockRes = this->m_db->Get(this->m_read_options,
30 m_best_block_height_key,
31 &bestBlockHeight);
32 if(bestBlockRes.IsNotFound()) {
33 this->m_best_block_height = 0;
34 std::array<char, sizeof(m_best_block_height)> height_arr{};
35 std::memcpy(height_arr.data(),
36 &m_best_block_height,
37 sizeof(m_best_block_height));
38 leveldb::Slice startBestBlockHeight(
39 height_arr.data(),
40 sizeof(this->m_best_block_height));
41 this->m_db->Put(this->m_write_options,
42 m_best_block_height_key,
43 startBestBlockHeight);
44 } else {
45 assert(bestBlockHeight.size()
46 == sizeof(this->m_best_block_height));
47 std::memcpy(&this->m_best_block_height,
48 bestBlockHeight.c_str(),
49 sizeof(this->m_best_block_height));
50 }
51
52 update_snapshot();
53
54 return std::nullopt;
55 }
56
57 auto shard::digest_block(const cbdc::atomizer::block& blk) -> bool {
58 if(blk.m_height != m_best_block_height + 1) {
59 return false;
60 }
61
62 leveldb::WriteBatch batch;
63
64 // Iterate over all confirmed transactions
65 for(const auto& tx : blk.m_transactions) {
66 // Add new outputs
67 for(const auto& out : tx.m_uhs_outputs) {
68 if(is_output_on_shard(out)) {
69 std::array<char, sizeof(out)> out_arr{};
70 std::memcpy(out_arr.data(), out.data(), out.size());
71 leveldb::Slice OutPointKey(out_arr.data(), out.size());
72 batch.Put(OutPointKey, leveldb::Slice());
73 }
74 }
75
76 // Delete spent inputs
77 for(const auto& inp : tx.m_inputs) {
78 if(is_output_on_shard(inp)) {
79 std::array<char, sizeof(inp)> inp_arr{};
80 std::memcpy(inp_arr.data(), inp.data(), inp.size());
81 leveldb::Slice OutPointKey(inp_arr.data(), inp.size());
82 batch.Delete(OutPointKey);
83 }
84 }
85 }
86
87 // Bump the best block height
88 this->m_best_block_height++;
89 std::array<char, sizeof(m_best_block_height)> height_arr{};
90 std::memcpy(height_arr.data(),
91 &m_best_block_height,
92 sizeof(m_best_block_height));
93 leveldb::Slice newBestBlockHeight(height_arr.data(),
94 sizeof(this->m_best_block_height));
95 batch.Put(m_best_block_height_key, newBestBlockHeight);
96
97 // Commit the changes atomically
98 this->m_db->Write(this->m_write_options, &batch);
99
100 update_snapshot();
101
102 return true;
103 }
104
106 -> std::variant<atomizer::tx_notify_request,
108 std::shared_ptr<const leveldb::Snapshot> snp{};
109 uint64_t snp_height{};
110 {
111 std::shared_lock<std::shared_mutex> l(m_snp_mut);
112 snp_height = m_snp_height;
113 snp = m_snp;
114 }
115
116 // Don't process transactions until we've heard from the atomizer
117 if(snp_height == 0) {
119 tx.m_id,
121 }
122
123 if(tx.m_inputs.empty()) {
125 tx.m_id,
127 }
128
129 auto read_options = m_read_options;
130 read_options.snapshot = snp.get();
131
132 // Check TX inputs exist
133 std::unordered_set<uint64_t> attestations;
134 std::vector<hash_t> dne_inputs;
135 for(uint64_t i = 0; i < tx.m_inputs.size(); i++) {
136 const auto& inp = tx.m_inputs[i];
137 // Only check for inputs/outputs relevant to this shard
138 if(!is_output_on_shard(inp)) {
139 continue;
140 }
141
142 std::array<char, sizeof(inp)> inp_arr{};
143 std::memcpy(inp_arr.data(), inp.data(), inp.size());
144 leveldb::Slice OutPointKey(inp_arr.data(), inp.size());
145 std::string op;
146
147 const auto& res = m_db->Get(read_options, OutPointKey, &op);
148 if(res.IsNotFound()) {
149 dne_inputs.push_back(inp);
150 } else {
151 attestations.insert(i);
152 }
153 }
154
155 if(!dne_inputs.empty()) {
157 tx.m_id,
159 }
160
162 msg.m_attestations = std::move(attestations);
163 msg.m_tx = std::move(tx);
164 msg.m_block_height = snp_height;
165
166 return msg;
167 }
168
169 auto shard::best_block_height() const -> uint64_t {
170 return m_best_block_height;
171 }
172
173 auto shard::is_output_on_shard(const hash_t& uhs_hash) const -> bool {
174 return config::hash_in_shard_range(m_prefix_range, uhs_hash);
175 }
176
177 void shard::update_snapshot() {
178 std::unique_lock<std::shared_mutex> l(m_snp_mut);
179 m_snp_height = m_best_block_height;
180 m_snp = std::shared_ptr<const leveldb::Snapshot>(
181 m_db->GetSnapshot(),
182 [&](const leveldb::Snapshot* p) {
183 m_db->ReleaseSnapshot(p);
184 });
185 }
186}
auto open_db(const std::string &db_dir) -> std::optional< std::string >
Creates or restores this shard's UTXO database.
Definition shard.cpp:14
auto best_block_height() const -> uint64_t
Returns the height of the most recently digested block.
Definition shard.cpp:169
auto digest_block(const cbdc::atomizer::block &blk) -> bool
Updates records to reflect changes from a new, contiguous transaction block from the atomizer.
Definition shard.cpp:57
auto digest_transaction(transaction::compact_tx tx) -> std::variant< atomizer::tx_notify_request, watchtower::tx_error >
Checks the validity of a provided transaction's inputs, and returns a transaction notification to for...
Definition shard.cpp:105
shard(config::shard_range_t prefix_range)
Constructor.
Definition shard.cpp:11
Indicates a shard that tried to process a given transaction could not locate one or more of the trans...
Wrapper for transaction errors.
auto hash_in_shard_range(const shard_range_t &range, const hash_t &val) -> bool
Checks if a hash is in the given range handled.
Definition config.cpp:736
std::pair< uint8_t, uint8_t > shard_range_t
[start, end] inclusive.
Definition config.hpp:129
std::array< unsigned char, cbdc::hash_size > hash_t
SHA256 hash container.
Shard core functionality.
Batch of compact transactions settled by the atomizer.
Definition block.hpp:19
std::unordered_set< uint64_t > m_attestations
Set of input indexes the shard is attesting are unspent at the given block height.
transaction::compact_tx m_tx
Compact transaction associated with the notification.
uint64_t m_block_height
Block height at which the given input attestations are valid.
A condensed, hash-only transaction representation.
Indicates a shard that tried to process a given transaction was out of sync with the atomizer,...