19#include <libnuraft/nuraft.hxx>
24 std::string snapshot_dir)
25 : m_snapshot_dir(std::move(snapshot_dir)),
26 m_stxo_cache_depth(stxo_cache_depth) {
27 m_atomizer = std::make_shared<atomizer>(0, m_stxo_cache_depth);
28 m_blocks = std::make_shared<decltype(m_blocks)::element_type>();
29 auto err = std::error_code();
30 std::filesystem::create_directory(m_snapshot_dir, err);
32 std::exit(EXIT_FAILURE);
37 std::exit(EXIT_FAILURE);
43 -> nuraft::ptr<nuraft::buffer> {
44 assert(log_idx == m_last_committed_idx + 1);
45 m_last_committed_idx = log_idx;
47 assert(req.has_value());
49 auto resp = std::visit(
52 -> std::optional<response> {
54 for(
auto&& msg : r.m_agg_txs) {
55 auto err = m_atomizer->insert_complete(
56 msg.m_oldest_attestation,
72 -> std::optional<response> {
73 auto [blk, errs] = m_atomizer->make_block();
74 m_blocks->emplace(blk.m_height, blk);
78 auto it = m_blocks->find(r.m_block_height);
79 if(it != m_blocks->end()) {
85 for(
auto it = m_blocks->begin(); it != m_blocks->end();) {
86 if(it->second.m_height < r.m_block_height) {
87 it = m_blocks->erase(it);
96 if(!resp.has_value()) {
104 const nuraft::ulong log_idx,
105 nuraft::ptr<nuraft::cluster_config>& ) {
106 assert(log_idx == m_last_committed_idx + 1);
107 m_last_committed_idx = log_idx;
114 nuraft::ptr<nuraft::buffer>& data_out,
115 bool& is_last_obj) ->
int {
116 auto path = get_snapshot_path(s.get_last_log_idx());
118 std::shared_lock<std::shared_mutex> l(m_snp_mut);
119 auto ss = std::ifstream(path, std::ios::in | std::ios::binary);
124 auto err = std::error_code();
125 auto sz = std::filesystem::file_size(path, err);
129 std::exit(EXIT_FAILURE);
131 auto buf = nuraft::buffer::alloc(sz);
132 auto read_vec = std::vector<char>(sz);
133 ss.read(read_vec.data(),
134 static_cast<std::streamsize
>(buf->size()));
138 std::exit(EXIT_FAILURE);
140 std::memcpy(buf->data_begin(), read_vec.data(), sz);
141 data_out = std::move(buf);
151 nuraft::ulong& obj_id,
152 nuraft::buffer& data,
156 auto tmp_path = get_tmp_path();
158 std::unique_lock<std::shared_mutex> l(m_snp_mut);
159 auto ss = std::ofstream(tmp_path,
160 std::ios::out | std::ios::trunc
164 std::exit(EXIT_FAILURE);
167 auto write_vec = std::vector<char>(data.size());
168 std::memcpy(write_vec.data(), data.data_begin(), data.size());
169 ss.write(write_vec.data(),
170 static_cast<std::streamsize
>(data.size()));
172 std::exit(EXIT_FAILURE);
178 auto path = get_snapshot_path(s.get_last_log_idx());
179 auto err = std::error_code();
180 std::filesystem::rename(tmp_path, path, err);
182 std::exit(EXIT_FAILURE);
190 auto snp = read_snapshot(s.get_last_log_idx());
192 m_blocks = snp->m_blocks;
193 m_atomizer = snp->m_atomizer;
194 m_last_committed_idx = s.get_last_log_idx();
196 return snp.has_value();
200 auto snp = read_snapshot(0);
208 return m_last_committed_idx;
213 nuraft::async_result<bool>::handler_type& when_done) {
215 nuraft::ptr<std::exception> except(
nullptr);
218 auto snp_ser = s.serialize();
220 nuraft::snapshot::deserialize(*snp_ser),
223 auto tmp_path = get_tmp_path();
224 auto path = get_snapshot_path(s.get_last_log_idx());
226 std::unique_lock<std::shared_mutex> l(m_snp_mut);
227 auto ss = std::ofstream(tmp_path,
228 std::ios::out | std::ios::trunc
233 std::exit(EXIT_FAILURE);
238 std::exit(EXIT_FAILURE);
244 auto err = std::error_code();
245 std::filesystem::rename(tmp_path, path, err);
247 std::exit(EXIT_FAILURE);
251 std::filesystem::directory_iterator(m_snapshot_dir)) {
252 auto name = p.path().filename().generic_string();
253 if(name == m_tmp_file
254 || std::stoull(name) < s.get_last_log_idx()) {
255 std::filesystem::remove(p, err);
257 std::exit(EXIT_FAILURE);
263 when_done(ret, except);
267 return m_tx_notify_count;
270 auto state_machine::get_snapshot_path(uint64_t idx)
const -> std::string {
271 return m_snapshot_dir +
"/" + std::to_string(idx);
274 auto state_machine::get_tmp_path() const -> std::
string {
275 return m_snapshot_dir +
"/" + m_tmp_file;
278 auto state_machine::read_snapshot(uint64_t idx)
279 -> std::optional<snapshot> {
280 std::shared_lock<std::shared_mutex> l(m_snp_mut);
281 auto open_fail_fatal =
false;
284 auto err = std::error_code();
286 std::filesystem::directory_iterator(m_snapshot_dir, err)) {
288 std::exit(EXIT_FAILURE);
290 auto name = p.path().filename().generic_string();
291 if(name == m_tmp_file) {
294 auto f_idx = std::stoull(name);
295 if(f_idx > max_idx) {
305 open_fail_fatal =
true;
308 auto path = get_snapshot_path(idx);
310 auto ss = std::ifstream(path, std::ios::in | std::ios::binary);
312 if(open_fail_fatal) {
313 std::exit(EXIT_FAILURE);
317 auto err = std::error_code();
318 auto sz = std::filesystem::file_size(path, err);
320 std::exit(EXIT_FAILURE);
323 auto new_atm = std::make_shared<atomizer>(0, m_stxo_cache_depth);
324 auto new_blocks = std::make_shared<decltype(m_blocks)::element_type>();
326 = snapshot{std::move(new_atm),
nullptr, std::move(new_blocks)};
327 if(!(deser >> snp)) {
328 std::exit(EXIT_FAILURE);
330 snp.m_snp->set_size(sz);
auto commit(nuraft::ulong log_idx, nuraft::buffer &data) -> nuraft::ptr< nuraft::buffer > override
Executes the committed the raft log entry at the given index and return the state machine execution r...
auto tx_notify_count() -> uint64_t
Returns the total number of transaction notifications which the state machine has processed.
state_machine(size_t stxo_cache_depth, std::string snapshot_dir)
Constructor.
auto last_commit_index() -> nuraft::ulong override
Returns the index of the most recently committed log entry.
void create_snapshot(nuraft::snapshot &s, nuraft::async_result< bool >::handler_type &when_done) override
Creates a snapshot with the given metadata.
auto apply_snapshot(nuraft::snapshot &s) -> bool override
Replaces the state of the state machine with the state stored in the snapshot referenced by the given...
auto read_logical_snp_obj(nuraft::snapshot &s, void *&user_snp_ctx, nuraft::ulong obj_id, nuraft::ptr< nuraft::buffer > &data_out, bool &is_last_obj) -> int override
Read the portion of the state machine snapshot associated with the given metadata and object ID into ...
void commit_config(nuraft::ulong log_idx, nuraft::ptr< nuraft::cluster_config > &) override
Handler for the raft cluster configuration changes.
void save_logical_snp_obj(nuraft::snapshot &s, nuraft::ulong &obj_id, nuraft::buffer &data, bool is_first_obj, bool is_last_obj) override
Saves the portion of the state machine snapshot associated with the given metadata and object ID into...
auto last_snapshot() -> nuraft::ptr< nuraft::snapshot > override
Returns the most recent snapshot metadata.
Implementation of serializer for reading from a std::istream.
Implementation of serializer for writing to a std::ostream.
std::vector< watchtower::tx_error > errors
List of watchtower errors returned by the atomizer state machine.
auto from_buffer(nuraft::buffer &buf) -> std::optional< T >
Deserialize object of given type from a nuraft::buffer.
auto make_buffer(const T &obj) -> std::enable_if_t< std::is_same_v< B, nuraft::ptr< nuraft::buffer > >, nuraft::ptr< nuraft::buffer > >
Serialize object into nuraft::buffer using a cbdc::nuraft_serializer.
Batch of aggregate transaction notifications.
Retrieve cached block request.
Atomizer state machine response from get block request.
Placeholder struct for a make block state machine request.
Response from atomizer state machine to a make block request.
Prune blocks request for RPC and state machine.
Represents a snapshot of the state machine with associated metadata.
Variant handler template.