17 std::vector<network::endpoint_t> raft_endpoints,
18 size_t stxo_cache_depth,
19 std::shared_ptr<logging::log> logger,
21 nuraft::cb_func::func_type raft_callback)
22 : node(static_cast<int>(atomizer_id),
23 std::move(raft_endpoints),
28 "atomizer_snps_" + std::
to_string(atomizer_id)),
31 std::move(raft_callback)),
32 m_log(std::move(logger)),
33 m_opts(std::move(opts)) {}
38 assert(cls !=
nullptr);
48 return replicate(new_log, result_fn);
52 return get_sm()->tx_notify_count();
60 m_log->warn(
"Received invalid compact transaction",
65 auto maybe_tx = [&]() -> std::optional<
decltype(m_txs)::node_type> {
66 std::unique_lock l(m_txs_mut);
67 auto it = m_txs.find(notif.m_tx);
68 if(it != m_txs.end()) {
69 for(
auto n : notif.m_attestations) {
70 auto p = std::make_pair(n, notif.m_block_height);
71 auto n_it = it->second.find(p);
72 if((n_it != it->second.end()
73 && n_it->second < notif.m_block_height)
74 || n_it == it->second.end()) {
75 it->second.insert(std::move(p));
79 auto attestations = attestation_set();
80 attestations.reserve(notif.m_attestations.size());
81 for(
auto n : notif.m_attestations) {
83 std::make_pair(n, notif.m_block_height));
86 .insert(std::make_pair(std::move(notif.m_tx),
87 std::move(attestations)))
93 if(it->second.size() != it->first.m_inputs.size()) {
97 auto tx = m_txs.extract(it);
101 if(!maybe_tx.has_value()) {
105 auto& tx = maybe_tx.value();
107 agg.m_tx = std::move(tx.key());
109 for(
const auto& att : tx.mapped()) {
110 if(oldest == 0 || att.second < oldest) {
114 agg.m_oldest_attestation = oldest;
117 std::lock_guard<std::mutex> l(m_complete_mut);
118 m_complete_txs.push_back(std::move(agg));
126 std::lock_guard<std::mutex> l(m_complete_mut);
127 std::swap(atns.m_agg_txs, m_complete_txs);
129 if(atns.m_agg_txs.empty()) {
132 return make_request(atns, result_fn);
135 auto atomizer_raft::attestation_hash::operator()(
136 const atomizer_raft::attestation& pair)
const ->
size_t {
137 return std::hash<
decltype(pair.first)>()(pair.first);
140 auto atomizer_raft::attestation_cmp::operator()(
141 const atomizer_raft::attestation& a,
142 const atomizer_raft::attestation& b)
const ->
bool {
143 return a.first == b.first;
void tx_notify(tx_notify_request &¬if)
Add the given transaction notification to the set of pending notifications.
auto send_complete_txs(const raft::callback_type &result_fn) -> bool
Replicate a transaction notification command in the state machine containing the current set of compl...
auto get_sm() -> state_machine *
Return a pointer to the state machine replicated by this raft node.
atomizer_raft(uint32_t atomizer_id, std::vector< network::endpoint_t > raft_endpoints, size_t stxo_cache_depth, std::shared_ptr< logging::log > logger, config::options opts, nuraft::cb_func::func_type raft_callback)
Constructor.
auto make_request(const state_machine::request &r, const raft::callback_type &result_fn) -> bool
Serialize and replicate the given request in the atomizer raft cluster.
auto tx_notify_count() -> uint64_t
Return the number of transaction notifications handled by the state machine.
Raft state machine for managing a replicated atomizer.
std::variant< aggregate_tx_notify_request, make_block_request, get_block_request, prune_request > request
Atomizer state machine request.
std::function< void(result_type &r, nuraft::ptr< std::exception > &err)> callback_type
Function type for raft state machine execution result callbacks.
auto check_attestations(const transaction::compact_tx &tx, const std::unordered_set< pubkey_t, hashing::null > &pubkeys, size_t threshold) -> bool
Validates the sentinel attestations attached to a compact transaction.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
auto make_buffer(const T &obj) -> std::enable_if_t< std::is_same_v< B, nuraft::ptr< nuraft::buffer > >, nuraft::ptr< nuraft::buffer > >
Serialize object into nuraft::buffer using a cbdc::nuraft_serializer.
Transaction notification message with a full set of input attestations.
Batch of aggregate transaction notifications.
Transaction notification message.
Project-wide configuration options.
size_t m_attestation_threshold
Number of sentinel attestations needed for a compact transaction.
std::unordered_set< pubkey_t, hashing::null > m_sentinel_public_keys
Public keys for sentinels.