OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
uhs/twophase/sentinel_2pc/controller.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "controller.hpp"
7
11
12#include <utility>
13
15 controller::controller(uint32_t sentinel_id,
16 const config::options& opts,
17 std::shared_ptr<logging::log> logger)
18 : m_sentinel_id(sentinel_id),
19 m_opts(opts),
20 m_logger(std::move(logger)),
21 m_coordinator_client(
22 opts.m_coordinator_endpoints[sentinel_id
23 % static_cast<uint32_t>(
24 opts.m_coordinator_endpoints
25 .size())]) {}
26
27 auto controller::init() -> bool {
28 if(m_opts.m_sentinel_endpoints.empty()) {
29 m_logger->error("No sentinel endpoints are defined.");
30 return false;
31 }
32
33 if(m_sentinel_id >= m_opts.m_sentinel_endpoints.size()) {
34 m_logger->error(
35 "The sentinel ID is too large for the number of sentinels.");
36 return false;
37 }
38
39 auto skey = m_opts.m_sentinel_private_keys.find(m_sentinel_id);
40 if(skey == m_opts.m_sentinel_private_keys.end()) {
41 if(m_opts.m_attestation_threshold > 0) {
42 m_logger->error("No private key specified");
43 return false;
44 }
45 } else {
46 m_privkey = skey->second;
47
48 auto pubkey = pubkey_from_privkey(m_privkey, m_secp.get());
49 m_logger->info("Sentinel public key:", cbdc::to_string(pubkey));
50 }
51
52 auto retry_delay = std::chrono::seconds(1);
53 auto retry_threshold = 4;
54 while(!m_coordinator_client.init() && retry_threshold-- > 0) {
55 m_logger->warn("Failed to start coordinator client.");
56
57 std::this_thread::sleep_for(retry_delay);
58 if(retry_threshold > 0) {
59 retry_delay *= 2;
60 m_logger->warn("Retrying...");
61 }
62 }
63
64 for(const auto& ep : m_opts.m_sentinel_endpoints) {
65 if(ep == m_opts.m_sentinel_endpoints[m_sentinel_id]) {
66 continue;
67 }
68 auto client = std::make_unique<sentinel::rpc::client>(
69 std::vector<network::endpoint_t>{ep},
70 m_logger);
71 if(!client->init(false)) {
72 m_logger->warn("Failed to start sentinel client");
73 }
74 m_sentinel_clients.emplace_back(std::move(client));
75 }
76
77 constexpr size_t dist_lower_bound = 0;
78 const size_t dist_upper_bound
79 = m_sentinel_clients.empty() ? 0 : m_sentinel_clients.size() - 1;
80 m_dist = decltype(m_dist)(dist_lower_bound, dist_upper_bound);
81
82 auto rpc_server = std::make_unique<cbdc::rpc::tcp_server<
85 m_opts.m_sentinel_endpoints[m_sentinel_id]);
86 if(!rpc_server->init()) {
87 m_logger->error("Failed to start sentinel RPC server");
88 return false;
89 }
90
91 m_rpc_server = std::make_unique<decltype(m_rpc_server)::element_type>(
92 this,
93 std::move(rpc_server));
94
95 return true;
96 }
97
100 execute_result_callback_type result_callback) -> bool {
101 const auto validation_err = transaction::validation::check_tx(tx);
102 if(validation_err.has_value()) {
103 auto tx_id = transaction::tx_id(tx);
104 m_logger->debug(
105 "Rejected (",
106 transaction::validation::to_string(validation_err.value()),
107 ")",
108 to_string(tx_id));
109 result_callback(cbdc::sentinel::execute_response{
111 validation_err});
112 return true;
113 }
114
115 auto compact_tx = cbdc::transaction::compact_tx(tx);
116
117 if(m_opts.m_attestation_threshold > 0) {
118 auto attestation = compact_tx.sign(m_secp.get(), m_privkey);
119 compact_tx.m_attestations.insert(attestation);
120 }
121
122 gather_attestations(tx, std::move(result_callback), compact_tx, {});
123
124 return true;
125 }
126
127 void
128 controller::result_handler(std::optional<bool> res,
129 const execute_result_callback_type& res_cb) {
130 if(res.has_value()) {
133 std::nullopt};
134 if(!res.value()) {
136 }
137 res_cb(resp);
138 } else {
139 res_cb(std::nullopt);
140 }
141 }
142
145 validate_result_callback_type result_callback) -> bool {
146 const auto validation_err = transaction::validation::check_tx(tx);
147 if(validation_err.has_value()) {
148 result_callback(std::nullopt);
149 return true;
150 }
151 auto compact_tx = cbdc::transaction::compact_tx(tx);
152 auto attestation = compact_tx.sign(m_secp.get(), m_privkey);
153 result_callback(std::move(attestation));
154 return true;
155 }
156
157 void controller::validate_result_handler(
158 validate_result v_res,
159 const transaction::full_tx& tx,
160 execute_result_callback_type result_callback,
162 std::unordered_set<size_t> requested) {
163 if(!v_res.has_value()) {
164 m_logger->error(to_string(ctx.m_id),
165 "invalid according to remote sentinel");
166 result_callback(std::nullopt);
167 return;
168 }
169 ctx.m_attestations.insert(std::move(v_res.value()));
170 gather_attestations(tx,
171 std::move(result_callback),
172 ctx,
173 std::move(requested));
174 }
175
176 void controller::gather_attestations(
177 const transaction::full_tx& tx,
178 execute_result_callback_type result_callback,
179 const transaction::compact_tx& ctx,
180 std::unordered_set<size_t> requested) {
181 if(ctx.m_attestations.size() < m_opts.m_attestation_threshold) {
182 auto success = false;
183 while(!success) {
184 auto sentinel_id = m_dist(m_rand);
185 if(requested.find(sentinel_id) != requested.end()) {
186 continue;
187 }
188 success
189 = m_sentinel_clients[sentinel_id]->validate_transaction(
190 tx,
191 [=, this](validate_result v_res) {
192 auto r = requested;
193 r.insert(sentinel_id);
194 validate_result_handler(v_res,
195 tx,
196 result_callback,
197 ctx,
198 r);
199 });
200 }
201 return;
202 }
203
204 m_logger->debug("Accepted", to_string(ctx.m_id));
205
206 send_compact_tx(ctx, std::move(result_callback));
207 }
208
209 void
210 controller::send_compact_tx(const transaction::compact_tx& ctx,
211 execute_result_callback_type result_callback) {
212 auto cb =
213 [&, res_cb = std::move(result_callback)](std::optional<bool> res) {
214 result_handler(res, res_cb);
215 };
216
217 // TODO: add a "retry" error response to offload sentinels from this
218 // infinite retry responsibility.
219 while(!m_coordinator_client.execute_transaction(ctx, cb)) {
220 // TODO: the network currently doesn't provide a callback for
221 // reconnection events so we have to sleep here to
222 // prevent a needless spin. Instead, add such a callback
223 // or queue to the network to remove this sleep.
224 static constexpr auto retry_delay = std::chrono::milliseconds(100);
225 std::this_thread::sleep_for(retry_delay);
226 };
227 }
228}
External client for sending new transactions to the system.
auto init() -> bool
Initializes the client.
auto execute_transaction(transaction::compact_tx tx, callback_type result_callback) -> bool override
Requests execution of the given transaction using the coordinator cluster.
Generic asynchronous RPC server.
Implements an RPC server over a TCP socket.
std::optional< cbdc::sentinel::validate_response > validate_result
Result of a validation operation.
std::function< void( std::optional< cbdc::sentinel::execute_response >)> execute_result_callback_type
Callback function for transaction execution result.
std::function< void(validate_result)> validate_result_callback_type
Callback function for providing a transaction validation result.
auto execute_transaction(transaction::full_tx tx, execute_result_callback_type result_callback) -> bool override
Statically validates a transaction, submits it the shard coordinator network, and returns the result ...
auto init() -> bool
Initializes the controller.
auto validate_transaction(transaction::full_tx tx, validate_result_callback_type result_callback) -> bool override
Statically validates a transaction and generates a sentinel attestation if the transaction is valid.
std::variant< execute_response, validate_response > response
Sentinel RPC response type.
@ static_invalid
Statically invalid. Must be fixed and resubmitted.
@ state_invalid
Statically valid, but rejected by the shards for trying to spend inputs either that do not exist or t...
@ confirmed
Executed to completion.
std::variant< execute_request, validate_request > request
Sentinel RPC request type.
auto to_string(cbdc::transaction::validation::tx_error_code err) -> std::string
auto check_tx(const cbdc::transaction::full_tx &tx) -> std::optional< tx_error >
Runs static validation checks on the given transaction.
auto tx_id(const full_tx &tx) noexcept -> hash_t
Calculates the unique hash of a full transaction.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
auto pubkey_from_privkey(const privkey_t &privkey, secp256k1_context *ctx) -> pubkey_t
Generates a public key from the specified private key.
Definition keys.cpp:12
Project-wide configuration options.
Definition config.hpp:132
size_t m_attestation_threshold
Number of sentinel attestations needed for a compact transaction.
Definition config.hpp:260
A condensed, hash-only transaction representation.
std::unordered_map< pubkey_t, signature_t, hashing::null > m_attestations
Signatures from sentinels attesting the compact TX is valid.
hash_t m_id
The hash of the full transaction returned by tx_id.
A complete transaction.