OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
uhs/twophase/locking_shard/state_machine.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "state_machine.hpp"
7
8#include "format.hpp"
11
12#include <unistd.h>
13
14namespace cbdc::locking_shard {
16 const std::pair<uint8_t, uint8_t>& output_range,
17 std::shared_ptr<logging::log> logger,
18 size_t completed_txs_cache_size,
19 const std::string& preseed_file,
20 config::options opts)
21 : m_output_range(output_range),
22 m_logger(std::move(logger)) {
24 return process_request(std::move(req));
25 });
26 m_shard = std::make_unique<locking_shard>(output_range,
27 m_logger,
28 completed_txs_cache_size,
29 preseed_file,
30 std::move(opts));
31 }
32
33 auto state_machine::commit(uint64_t log_idx, nuraft::buffer& data)
34 -> nuraft::ptr<nuraft::buffer> {
35 assert(log_idx == m_last_committed_idx + 1);
36 m_last_committed_idx = log_idx;
37
38 auto resp = blocking_call(data);
39 if(!resp.has_value()) {
40 // TODO: This would only happen if there was a deserialization
41 // error with the request. Maybe we should abort here as such an
42 // event would imply a bug in the coordinator.
43 return nullptr;
44 }
45
46 return resp.value();
47 }
48
50 const nuraft::ulong log_idx,
51 nuraft::ptr<nuraft::cluster_config>& /*new_conf*/) {
52 assert(log_idx == m_last_committed_idx + 1);
53 m_last_committed_idx = log_idx;
54 }
55
56 auto state_machine::apply_snapshot(nuraft::snapshot& /* s */) -> bool {
57 return false;
58 }
59
60 auto state_machine::last_snapshot() -> nuraft::ptr<nuraft::snapshot> {
61 return nullptr;
62 }
63
65 return m_last_committed_idx;
66 }
67
69 nuraft::snapshot& /* s */,
70 nuraft::async_result<bool>::handler_type& when_done) {
71 nuraft::ptr<std::exception> except(nullptr);
72 bool ret = false;
73 when_done(ret, except);
74 }
75
77 -> std::shared_ptr<cbdc::locking_shard::locking_shard> {
78 return m_shard;
79 }
80
81 auto state_machine::process_request(cbdc::locking_shard::rpc::request req)
83 auto dtxid_str = to_string(req.m_dtx_id);
84 return std::visit(
85 overloaded{[&](rpc::lock_params&& params)
87 m_logger->info("Processing lock",
88 dtxid_str,
89 "with",
90 params.size(),
91 "txs");
92 auto res = m_shard->lock_outputs(std::move(params),
93 req.m_dtx_id);
94 assert(res.has_value());
95 m_logger->info("Done lock", dtxid_str);
96 return res.value();
97 },
98 [&](rpc::apply_params&& params)
100 m_logger->info("Processing apply", dtxid_str);
101 [[maybe_unused]] auto res
102 = m_shard->apply_outputs(std::move(params),
103 req.m_dtx_id);
104 assert(res);
105 m_logger->info("Done apply", dtxid_str);
106 return rpc::apply_response();
107 },
108 [&](rpc::discard_params&& /* params */)
110 m_logger->info("Processing discard", dtxid_str);
111 [[maybe_unused]] auto res
112 = m_shard->discard_dtx(req.m_dtx_id);
113 assert(res);
114 m_logger->info("Done discard", dtxid_str);
115 return rpc::discard_response();
116 }},
117 std::move(req.m_params));
118 }
119}
void create_snapshot(nuraft::snapshot &, nuraft::async_result< bool >::handler_type &) override
Not implemented for locking shard.
void commit_config(nuraft::ulong log_idx, nuraft::ptr< nuraft::cluster_config > &) override
Handler for the raft cluster configuration changes.
auto last_snapshot() -> nuraft::ptr< nuraft::snapshot > override
Not implemented for locking shard.
state_machine(const std::pair< uint8_t, uint8_t > &output_range, std::shared_ptr< logging::log > logger, size_t completed_txs_cache_size, const std::string &preseed_file, config::options opts)
Constructor.
auto get_shard_instance() -> std::shared_ptr< cbdc::locking_shard::locking_shard >
Returns a pointer to the locking shard instance managed by this state machine.
auto last_commit_index() -> uint64_t override
Returns the most recently committed log entry index.
auto apply_snapshot(nuraft::snapshot &) -> bool override
Not implemented for locking shard.
auto commit(uint64_t log_idx, nuraft::buffer &data) -> nuraft::ptr< nuraft::buffer > override
Commit the given raft log entry at the given log index, and return the result.
std::vector< tx > lock_params
Transactions whose outputs the locking shard should lock.
std::variant< lock_response, apply_response, discard_response > response
Response to a locking shard request.
std::vector< bool > apply_params
Vector of bools.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
Project-wide configuration options.
Definition config.hpp:132
Variant handler template.