6#ifndef OPENCBDC_TX_SRC_ATOMIZER_STATE_MACHINE_H_
7#define OPENCBDC_TX_SRC_ATOMIZER_STATE_MACHINE_H_
12#include <libnuraft/nuraft.hxx>
13#include <shared_mutex>
27 state_machine(
size_t stxo_cache_depth, std::string snapshot_dir);
37 = std::variant<make_block_response, get_block_response, errors>;
45 [[nodiscard]]
auto commit(nuraft::ulong log_idx, nuraft::buffer& data)
46 -> nuraft::ptr<nuraft::buffer>
override;
51 nuraft::ulong log_idx,
52 nuraft::ptr<nuraft::cluster_config>& )
override;
69 nuraft::ptr<nuraft::buffer>& data_out,
70 bool& is_last_obj) ->
int override;
83 nuraft::ulong& obj_id,
86 bool is_last_obj)
override;
98 -> nuraft::ptr<nuraft::snapshot>
override;
110 nuraft::async_result<bool>::handler_type& when_done)
override;
119 = std::unordered_map<uint64_t, cbdc::atomizer::block>;
127 nuraft::ptr<nuraft::snapshot>
m_snp{};
129 std::shared_ptr<blockstore_t>
m_blocks{};
133 [[nodiscard]]
auto get_snapshot_path(uint64_t idx)
const
136 [[nodiscard]]
auto get_tmp_path() const -> std::
string;
138 [[nodiscard]] auto read_snapshot(uint64_t idx)
139 -> std::optional<snapshot>;
141 static constexpr auto m_tmp_file = "tmp";
143 std::atomic<uint64_t> m_last_committed_idx{0};
145 std::shared_ptr<cbdc::atomizer::atomizer> m_atomizer;
146 std::shared_ptr<blockstore_t> m_blocks;
148 std::atomic<uint64_t> m_tx_notify_count{0};
150 std::string m_snapshot_dir;
152 size_t m_stxo_cache_depth{};
154 std::shared_mutex m_snp_mut;
Raft state machine for managing a replicated atomizer.
auto commit(nuraft::ulong log_idx, nuraft::buffer &data) -> nuraft::ptr< nuraft::buffer > override
Executes the committed the raft log entry at the given index and return the state machine execution r...
std::variant< make_block_response, get_block_response, errors > response
Atomizer state machine response.
std::unordered_map< uint64_t, cbdc::atomizer::block > blockstore_t
Maps block heights to blocks.
auto tx_notify_count() -> uint64_t
Returns the total number of transaction notifications which the state machine has processed.
state_machine(size_t stxo_cache_depth, std::string snapshot_dir)
Constructor.
auto last_commit_index() -> nuraft::ulong override
Returns the index of the most recently committed log entry.
void create_snapshot(nuraft::snapshot &s, nuraft::async_result< bool >::handler_type &when_done) override
Creates a snapshot with the given metadata.
std::variant< aggregate_tx_notify_request, make_block_request, get_block_request, prune_request > request
Atomizer state machine request.
auto apply_snapshot(nuraft::snapshot &s) -> bool override
Replaces the state of the state machine with the state stored in the snapshot referenced by the given...
auto read_logical_snp_obj(nuraft::snapshot &s, void *&user_snp_ctx, nuraft::ulong obj_id, nuraft::ptr< nuraft::buffer > &data_out, bool &is_last_obj) -> int override
Read the portion of the state machine snapshot associated with the given metadata and object ID into ...
void commit_config(nuraft::ulong log_idx, nuraft::ptr< nuraft::cluster_config > &) override
Handler for the raft cluster configuration changes.
void save_logical_snp_obj(nuraft::snapshot &s, nuraft::ulong &obj_id, nuraft::buffer &data, bool is_first_obj, bool is_last_obj) override
Saves the portion of the state machine snapshot associated with the given metadata and object ID into...
auto last_snapshot() -> nuraft::ptr< nuraft::snapshot > override
Returns the most recent snapshot metadata.
Batch of aggregate transaction notifications.
Retrieve cached block request.
Placeholder struct for a make block state machine request.
Prune blocks request for RPC and state machine.
Represents a snapshot of the state machine with associated metadata.
std::shared_ptr< blockstore_t > m_blocks
Pointer to the state of the block cache.
std::shared_ptr< cbdc::atomizer::atomizer > m_atomizer
Pointer to the atomizer instance.
nuraft::ptr< nuraft::snapshot > m_snp
Pointer to the nuraft snapshot metadata.