OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
atomizer.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "atomizer.hpp"
7
12
13namespace cbdc::atomizer {
15 -> std::pair<block, std::vector<cbdc::watchtower::tx_error>> {
16 block blk;
17
18 blk.m_transactions.swap(m_complete_txs);
19
20 m_best_height++;
21
22 std::vector<cbdc::watchtower::tx_error> errs;
23 for(auto&& tx : m_txs[m_spent_cache_depth]) {
24 errs.push_back(cbdc::watchtower::tx_error{
25 tx.first.m_id,
27 }
28
29 for(size_t i = m_spent_cache_depth; i > 0; i--) {
30 m_spent[i] = std::move(m_spent[i - 1]);
31 m_txs[i] = std::move(m_txs[i - 1]);
32 }
33
34 m_spent[0].clear();
35 m_txs[0].clear();
36 static constexpr auto initial_spent_cache_size = 500000;
37 m_spent[0].reserve(initial_spent_cache_size);
38
39 blk.m_height = m_best_height;
40
41 return {blk, errs};
42 }
43
44 auto atomizer::insert(const uint64_t block_height,
46 std::unordered_set<uint32_t> attestations)
47 -> std::optional<cbdc::watchtower::tx_error> {
48 const auto height_offset = get_notification_offset(block_height);
49
50 auto offset_err = check_notification_offset(height_offset, tx);
51 if(offset_err) {
52 return offset_err;
53 }
54
55 // Search the incomplete transactions vector for this notification's
56 // block height offset. Note, we might be able to defer this insertion
57 // until after we've checked if the transaction is complete.
58 auto it = m_txs[height_offset].find(tx);
59 if(it == m_txs[height_offset].end()) {
60 // If we did not already receive a notification of this transaction
61 // for its height offset, insert the transaction and its
62 // attestations into the pending vector.
63 it = m_txs[height_offset]
64 .insert({std::move(tx), std::move(attestations)})
65 .first;
66 } else {
67 // Otherwise merge the new set of attestations with the existing
68 // set.
69 it->second.insert(attestations.begin(), attestations.end());
70 }
71
72 std::unordered_set<uint32_t> total_attestations;
73 size_t oldest_attestation{0};
74 std::map<size_t, decltype(m_txs)::value_type::const_iterator> tx_its;
75
76 // Iterate over each height offset in the incomplete transactions
77 // vector to accumulate the sets of attestations received for any
78 // offset in our cache.
79 for(size_t offset = 0; offset <= m_spent_cache_depth; offset++) {
80 const auto& tx_map = m_txs[offset];
81
82 // Check if we received a notification of this TX for the given
83 // height offset.
84 const auto tx_it = tx_map.find(it->first);
85 if(tx_it != tx_map.end()) {
86 // Merge the attestations from this offset with the full set of
87 // attestations so far.
88 total_attestations.insert(tx_it->second.begin(),
89 tx_it->second.end());
90
91 // Keep track of the oldest height offset that we're using
92 // an attestation from.
93 oldest_attestation = offset;
94
95 // Store the iterator to the transaction in the incomplete TXs
96 // vector for the given height offset so we can quickly access
97 // it later.
98 tx_its.emplace(offset, tx_it);
99 }
100 }
101
102 const auto& txit = it->first;
103
104 auto cache_check_range = oldest_attestation;
105
106 // Check whether this transaction now has attestations for each of its
107 // inputs.
108 if(total_attestations.size() == txit.m_inputs.size()) {
109 auto err_set = check_stxo_cache(txit, cache_check_range);
110 if(err_set) {
111 return err_set;
112 }
113
114 add_tx_to_stxo_cache(txit);
115
116 // For each of the incomplete transaction notifications we
117 // recovered while accumulating attestations, either extract the TX
118 // from the oldest notification and move it to the complete TXs
119 // vector, or erase the TX notification.
120 for(const auto& pending_offset : tx_its) {
121 if(pending_offset.first == oldest_attestation) {
122 auto tx_ext = m_txs[pending_offset.first].extract(
123 pending_offset.second);
124 m_complete_txs.push_back(std::move(tx_ext.key()));
125 } else {
126 m_txs[pending_offset.first].erase(pending_offset.second);
127 }
128 }
129 }
130
131 return std::nullopt;
132 }
133
134 auto atomizer::insert_complete(uint64_t oldest_attestation,
136 -> std::optional<cbdc::watchtower::tx_error> {
137 const auto height_offset = get_notification_offset(oldest_attestation);
138
139 auto offset_err = check_notification_offset(height_offset, tx);
140 if(offset_err) {
141 return offset_err;
142 }
143
144 auto cache_check_range = height_offset;
145
146 auto err_set = check_stxo_cache(tx, cache_check_range);
147 if(err_set) {
148 return err_set;
149 }
150
151 add_tx_to_stxo_cache(tx);
152
153 m_complete_txs.push_back(std::move(tx));
154
155 return std::nullopt;
156 }
157
158 auto atomizer::pending_transactions() const -> size_t {
159 return m_complete_txs.size();
160 }
161
162 auto atomizer::height() const -> uint64_t {
163 return m_best_height;
164 }
165
166 atomizer::atomizer(const uint64_t best_height,
167 const size_t stxo_cache_depth)
168 : m_best_height(best_height),
169 m_spent_cache_depth(stxo_cache_depth) {
170 m_txs.resize(stxo_cache_depth + 1);
171 m_spent.resize(stxo_cache_depth + 1);
172 }
173
175 auto buf = cbdc::buffer();
176 auto ser = cbdc::buffer_serializer(buf);
177
178 ser << static_cast<uint64_t>(m_spent_cache_depth) << m_best_height
179 << m_complete_txs << m_spent << m_txs;
180
181 return buf;
182 }
183
185 m_complete_txs.clear();
186
187 m_spent.clear();
188
189 m_txs.clear();
190
191 buf >> m_spent_cache_depth >> m_best_height >> m_complete_txs
192 >> m_spent >> m_txs;
193 }
194
195 auto atomizer::operator==(const atomizer& other) const -> bool {
196 return m_txs == other.m_txs && m_complete_txs == other.m_complete_txs
197 && m_spent == other.m_spent && m_best_height == other.m_best_height
198 && m_spent_cache_depth == other.m_spent_cache_depth;
199 }
200
201 auto atomizer::get_notification_offset(uint64_t block_height) const
202 -> uint64_t {
203 // Calculate the offset from the current block height when the shard
204 // attested to this transaction.
205 return m_best_height - block_height;
206 }
207
208 auto atomizer::check_notification_offset(uint64_t height_offset,
209 const transaction::compact_tx& tx)
210 const -> std::optional<cbdc::watchtower::tx_error> {
211 // Check whether this TX notification is recent enough that we can
212 // safely process it by checking our spent UTXO caches.
213 if(height_offset > m_spent_cache_depth && !tx.m_inputs.empty()) {
215 tx.m_id,
217 }
218 return std::nullopt;
219 }
220
221 auto atomizer::check_stxo_cache(const transaction::compact_tx& tx,
222 uint64_t cache_check_range) const
223 -> std::optional<cbdc::watchtower::tx_error> {
224 // For each height offset in our STXO cache up to the offset of the
225 // oldest attestation we're using, check that the inputs have not
226 // already been spent.
227 auto err_set = std::unordered_set<hash_t, hashing::null>{};
228 for(size_t offset = 0; offset <= cache_check_range; offset++) {
229 for(const auto& inp : tx.m_inputs) {
230 if(m_spent[offset].find(inp) != m_spent[offset].end()) {
231 err_set.insert(inp);
232 }
233 }
234 }
235
236 if(!err_set.empty()) {
238 tx.m_id,
239 cbdc::watchtower::tx_error_inputs_spent{std::move(err_set)}};
240 }
241
242 return std::nullopt;
243 }
244
245 void atomizer::add_tx_to_stxo_cache(const transaction::compact_tx& tx) {
246 // None of the inputs have previously been spent during block heights
247 // we used attestations from, so spend all the TX inputs in the current
248 // block height (offset 0).
249 m_spent[0].insert(tx.m_inputs.begin(), tx.m_inputs.end());
250 }
251}
Atomizer implementation.
Definition atomizer.hpp:32
auto operator==(const atomizer &other) const -> bool
Definition atomizer.cpp:195
auto pending_transactions() const -> size_t
Returns the number of complete transactions waiting to be included in the next block.
Definition atomizer.cpp:158
auto insert(uint64_t block_height, transaction::compact_tx tx, std::unordered_set< uint32_t > attestations) -> std::optional< watchtower::tx_error >
Attempts to add the specified shard attestations for a specified transaction at or later than the spe...
Definition atomizer.cpp:44
auto height() const -> uint64_t
Returns the height of the most recent block.
Definition atomizer.cpp:162
auto serialize() -> buffer
Serializes the internal state of the atomizer into a buffer.
Definition atomizer.cpp:174
void deserialize(serializer &buf)
Replaces the state of this atomizer instance with the provided serialized state data.
Definition atomizer.cpp:184
auto make_block() -> std::pair< cbdc::atomizer::block, std::vector< watchtower::tx_error > >
Adds the current set of complete transactions to a new block and returns it for storage and transmiss...
Definition atomizer.cpp:14
auto insert_complete(uint64_t oldest_attestation, transaction::compact_tx &&tx) -> std::optional< watchtower::tx_error >
Attempts to add the given compact transaction to the list of complete transactions pending for inclus...
Definition atomizer.cpp:134
Serializer implementation for buffer.
Buffer to store and retrieve byte data.
Definition buffer.hpp:15
Interface for serializing objects into and out of raw bytes representations.
Indicates that the given transaction contains one or more inputs that have already been spent in othe...
Wrapper for transaction errors.
Tools for reading options from a configuration file and building application-specific parameter sets ...
@ buffer
A singular RLP value (byte array)
Batch of compact transactions settled by the atomizer.
Definition block.hpp:19
std::vector< transaction::compact_tx > m_transactions
Compact transactions settled by the atomizer in this block.
Definition block.hpp:26
uint64_t m_height
Index of this block in the overall contiguous sequence of blocks from the first block starting at hei...
Definition block.hpp:24
A condensed, hash-only transaction representation.
Indicates that the atomizer did not receive enough attestations for a particular transaction from sha...
Indicates that a shard did not attest to this transaction recently enough for the atomizer to check i...