OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
uhs/atomizer/atomizer/state_machine.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "state_machine.hpp"
7
8#include "atomizer.hpp"
9#include "atomizer_raft.hpp"
10#include "format.hpp"
12#include "util/raft/util.hpp"
17
18#include <filesystem>
19#include <libnuraft/nuraft.hxx>
20#include <utility>
21
22namespace cbdc::atomizer {
23 state_machine::state_machine(size_t stxo_cache_depth,
24 std::string snapshot_dir)
25 : m_snapshot_dir(std::move(snapshot_dir)),
26 m_stxo_cache_depth(stxo_cache_depth) {
27 m_atomizer = std::make_shared<atomizer>(0, m_stxo_cache_depth);
28 m_blocks = std::make_shared<decltype(m_blocks)::element_type>();
29 auto err = std::error_code();
30 std::filesystem::create_directory(m_snapshot_dir, err);
31 if(err) {
32 std::exit(EXIT_FAILURE);
33 }
35 if(snp) {
37 std::exit(EXIT_FAILURE);
38 }
39 }
40 }
41
42 auto state_machine::commit(nuraft::ulong log_idx, nuraft::buffer& data)
43 -> nuraft::ptr<nuraft::buffer> {
44 assert(log_idx == m_last_committed_idx + 1);
45 m_last_committed_idx = log_idx;
46 auto req = from_buffer<request>(data);
47 assert(req.has_value());
48
49 auto resp = std::visit(
52 -> std::optional<response> {
53 auto errs = errors();
54 for(auto&& msg : r.m_agg_txs) {
55 auto err = m_atomizer->insert_complete(
56 msg.m_oldest_attestation,
57 std::move(msg.m_tx));
58
59 if(err.has_value()) {
60 errs.push_back(*err);
61 }
62 m_tx_notify_count++;
63 }
64
65 if(!errs.empty()) {
66 return errs;
67 }
68
69 return std::nullopt;
70 },
71 [&](const make_block_request& /* r */)
72 -> std::optional<response> {
73 auto [blk, errs] = m_atomizer->make_block();
74 m_blocks->emplace(blk.m_height, blk);
75 return make_block_response{blk, errs};
76 },
77 [&](const get_block_request& r) -> std::optional<response> {
78 auto it = m_blocks->find(r.m_block_height);
79 if(it != m_blocks->end()) {
80 return get_block_response{it->second};
81 }
82 return std::nullopt;
83 },
84 [&](const prune_request& r) -> std::optional<response> {
85 for(auto it = m_blocks->begin(); it != m_blocks->end();) {
86 if(it->second.m_height < r.m_block_height) {
87 it = m_blocks->erase(it);
88 } else {
89 it++;
90 }
91 }
92 return std::nullopt;
93 },
94 },
95 req.value());
96 if(!resp.has_value()) {
97 return nullptr;
98 }
100 resp.value());
101 }
102
104 const nuraft::ulong log_idx,
105 nuraft::ptr<nuraft::cluster_config>& /*new_conf*/) {
106 assert(log_idx == m_last_committed_idx + 1);
107 m_last_committed_idx = log_idx;
108 }
109
110 auto
112 void*& /* user_snp_ctx */,
113 nuraft::ulong /* obj_id */,
114 nuraft::ptr<nuraft::buffer>& data_out,
115 bool& is_last_obj) -> int {
116 auto path = get_snapshot_path(s.get_last_log_idx());
117 {
118 std::shared_lock<std::shared_mutex> l(m_snp_mut);
119 auto ss = std::ifstream(path, std::ios::in | std::ios::binary);
120 if(!ss.good()) {
121 // Requested snapshot doesn't exit anymore, not fatal
122 return -1;
123 }
124 auto err = std::error_code();
125 auto sz = std::filesystem::file_size(path, err);
126 if(err) {
127 // If we got this far, this should work unless our system is
128 // broken
129 std::exit(EXIT_FAILURE);
130 }
131 auto buf = nuraft::buffer::alloc(sz);
132 auto read_vec = std::vector<char>(sz);
133 ss.read(read_vec.data(),
134 static_cast<std::streamsize>(buf->size()));
135 if(!ss.good()) {
136 // If we got this far, this should work unless our system is
137 // broken
138 std::exit(EXIT_FAILURE);
139 }
140 std::memcpy(buf->data_begin(), read_vec.data(), sz);
141 data_out = std::move(buf);
142 }
143
144 // TODO: send and receive in chunks
145 is_last_obj = true;
146
147 return 0;
148 }
149
150 void state_machine::save_logical_snp_obj(nuraft::snapshot& s,
151 nuraft::ulong& obj_id,
152 nuraft::buffer& data,
153 bool /* is_first_obj */,
154 bool /* is_last_obj */) {
155 assert(obj_id == 0);
156 auto tmp_path = get_tmp_path();
157 {
158 std::unique_lock<std::shared_mutex> l(m_snp_mut);
159 auto ss = std::ofstream(tmp_path,
160 std::ios::out | std::ios::trunc
161 | std::ios::binary);
162 if(!ss.good()) {
163 // Since we're the exclusive writer, this should work
164 std::exit(EXIT_FAILURE);
165 }
166
167 auto write_vec = std::vector<char>(data.size());
168 std::memcpy(write_vec.data(), data.data_begin(), data.size());
169 ss.write(write_vec.data(),
170 static_cast<std::streamsize>(data.size()));
171 if(!ss.good()) {
172 std::exit(EXIT_FAILURE);
173 }
174
175 ss.flush();
176 ss.close();
177
178 auto path = get_snapshot_path(s.get_last_log_idx());
179 auto err = std::error_code();
180 std::filesystem::rename(tmp_path, path, err);
181 if(err) {
182 std::exit(EXIT_FAILURE);
183 }
184 }
185
186 obj_id++;
187 }
188
189 auto state_machine::apply_snapshot(nuraft::snapshot& s) -> bool {
190 auto snp = read_snapshot(s.get_last_log_idx());
191 if(snp) {
192 m_blocks = snp->m_blocks;
193 m_atomizer = snp->m_atomizer;
194 m_last_committed_idx = s.get_last_log_idx();
195 }
196 return snp.has_value();
197 }
198
199 auto state_machine::last_snapshot() -> nuraft::ptr<nuraft::snapshot> {
200 auto snp = read_snapshot(0);
201 if(!snp) {
202 return nullptr;
203 }
204 return snp->m_snp;
205 }
206
207 auto state_machine::last_commit_index() -> nuraft::ulong {
208 return m_last_committed_idx;
209 }
210
212 nuraft::snapshot& s,
213 nuraft::async_result<bool>::handler_type& when_done) {
214 assert(s.get_last_log_idx() == last_commit_index());
215 nuraft::ptr<std::exception> except(nullptr);
216 bool ret = true;
217
218 auto snp_ser = s.serialize();
219 auto snp = snapshot{m_atomizer,
220 nuraft::snapshot::deserialize(*snp_ser),
221 m_blocks};
222
223 auto tmp_path = get_tmp_path();
224 auto path = get_snapshot_path(s.get_last_log_idx());
225 {
226 std::unique_lock<std::shared_mutex> l(m_snp_mut);
227 auto ss = std::ofstream(tmp_path,
228 std::ios::out | std::ios::trunc
229 | std::ios::binary);
230 if(!ss.good()) {
231 // We're the exclusive writer so these file operations should
232 // work
233 std::exit(EXIT_FAILURE);
234 }
235
236 auto ser = cbdc::ostream_serializer(ss);
237 if(!(ser << snp)) {
238 std::exit(EXIT_FAILURE);
239 }
240
241 ss.flush();
242 ss.close();
243
244 auto err = std::error_code();
245 std::filesystem::rename(tmp_path, path, err);
246 if(err) {
247 std::exit(EXIT_FAILURE);
248 }
249
250 for(const auto& p :
251 std::filesystem::directory_iterator(m_snapshot_dir)) {
252 auto name = p.path().filename().generic_string();
253 if(name == m_tmp_file
254 || std::stoull(name) < s.get_last_log_idx()) {
255 std::filesystem::remove(p, err);
256 if(err) {
257 std::exit(EXIT_FAILURE);
258 }
259 }
260 }
261 }
262
263 when_done(ret, except);
264 }
265
267 return m_tx_notify_count;
268 }
269
270 auto state_machine::get_snapshot_path(uint64_t idx) const -> std::string {
271 return m_snapshot_dir + "/" + std::to_string(idx);
272 }
273
274 auto state_machine::get_tmp_path() const -> std::string {
275 return m_snapshot_dir + "/" + m_tmp_file;
276 }
277
278 auto state_machine::read_snapshot(uint64_t idx)
279 -> std::optional<snapshot> {
280 std::shared_lock<std::shared_mutex> l(m_snp_mut);
281 auto open_fail_fatal = false;
282 if(idx == 0) {
283 uint64_t max_idx{0};
284 auto err = std::error_code();
285 for(const auto& p :
286 std::filesystem::directory_iterator(m_snapshot_dir, err)) {
287 if(err) {
288 std::exit(EXIT_FAILURE);
289 }
290 auto name = p.path().filename().generic_string();
291 if(name == m_tmp_file) {
292 continue;
293 }
294 auto f_idx = std::stoull(name);
295 if(f_idx > max_idx) {
296 max_idx = f_idx;
297 }
298 }
299
300 if(max_idx == 0) {
301 return std::nullopt;
302 }
303
304 idx = max_idx;
305 open_fail_fatal = true;
306 }
307
308 auto path = get_snapshot_path(idx);
309
310 auto ss = std::ifstream(path, std::ios::in | std::ios::binary);
311 if(!ss.good()) {
312 if(open_fail_fatal) {
313 std::exit(EXIT_FAILURE);
314 }
315 return std::nullopt;
316 }
317 auto err = std::error_code();
318 auto sz = std::filesystem::file_size(path, err);
319 if(err) {
320 std::exit(EXIT_FAILURE);
321 }
322 auto deser = cbdc::istream_serializer(ss);
323 auto new_atm = std::make_shared<atomizer>(0, m_stxo_cache_depth);
324 auto new_blocks = std::make_shared<decltype(m_blocks)::element_type>();
325 auto snp
326 = snapshot{std::move(new_atm), nullptr, std::move(new_blocks)};
327 if(!(deser >> snp)) {
328 std::exit(EXIT_FAILURE);
329 }
330 snp.m_snp->set_size(sz);
331 return snp;
332 }
333}
auto commit(nuraft::ulong log_idx, nuraft::buffer &data) -> nuraft::ptr< nuraft::buffer > override
Executes the committed the raft log entry at the given index and return the state machine execution r...
auto tx_notify_count() -> uint64_t
Returns the total number of transaction notifications which the state machine has processed.
state_machine(size_t stxo_cache_depth, std::string snapshot_dir)
Constructor.
auto last_commit_index() -> nuraft::ulong override
Returns the index of the most recently committed log entry.
void create_snapshot(nuraft::snapshot &s, nuraft::async_result< bool >::handler_type &when_done) override
Creates a snapshot with the given metadata.
auto apply_snapshot(nuraft::snapshot &s) -> bool override
Replaces the state of the state machine with the state stored in the snapshot referenced by the given...
auto read_logical_snp_obj(nuraft::snapshot &s, void *&user_snp_ctx, nuraft::ulong obj_id, nuraft::ptr< nuraft::buffer > &data_out, bool &is_last_obj) -> int override
Read the portion of the state machine snapshot associated with the given metadata and object ID into ...
void commit_config(nuraft::ulong log_idx, nuraft::ptr< nuraft::cluster_config > &) override
Handler for the raft cluster configuration changes.
void save_logical_snp_obj(nuraft::snapshot &s, nuraft::ulong &obj_id, nuraft::buffer &data, bool is_first_obj, bool is_last_obj) override
Saves the portion of the state machine snapshot associated with the given metadata and object ID into...
auto last_snapshot() -> nuraft::ptr< nuraft::snapshot > override
Returns the most recent snapshot metadata.
Implementation of serializer for reading from a std::istream.
Implementation of serializer for writing to a std::ostream.
std::vector< watchtower::tx_error > errors
List of watchtower errors returned by the atomizer state machine.
auto from_buffer(nuraft::buffer &buf) -> std::optional< T >
Deserialize object of given type from a nuraft::buffer.
auto make_buffer(const T &obj) -> std::enable_if_t< std::is_same_v< B, nuraft::ptr< nuraft::buffer > >, nuraft::ptr< nuraft::buffer > >
Serialize object into nuraft::buffer using a cbdc::nuraft_serializer.
Batch of aggregate transaction notifications.
Atomizer state machine response from get block request.
Placeholder struct for a make block state machine request.
Response from atomizer state machine to a make block request.
Prune blocks request for RPC and state machine.
Represents a snapshot of the state machine with associated metadata.
Variant handler template.