OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
uhs/atomizer/shard/controller.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "controller.hpp"
7
10
11#include <utility>
12
13namespace cbdc::shard {
14 controller::controller(uint32_t shard_id,
15 config::options opts,
16 std::shared_ptr<logging::log> logger)
17 : m_shard_id(shard_id),
18 m_opts(std::move(opts)),
19 m_logger(std::move(logger)),
20 m_shard(m_opts.m_shard_ranges[shard_id]),
21 m_archiver_client(m_opts.m_archiver_endpoints[0], m_logger) {}
22
24 m_shard_network.close();
25 m_atomizer_network.close();
26
27 if(m_shard_server.joinable()) {
28 m_shard_server.join();
29 }
30 if(m_atomizer_client.joinable()) {
31 m_atomizer_client.join();
32 }
33
34 m_request_queue.clear();
35 for(auto& t : m_handler_threads) {
36 if(t.joinable()) {
37 t.join();
38 }
39 }
40 }
41
42 auto controller::init() -> bool {
43 if(auto err_msg
44 = m_shard.open_db(m_opts.m_shard_db_dirs[m_shard_id])) {
45 m_logger->error("Failed to open shard DB for shard",
46 m_shard_id,
47 ". Got error:",
48 *err_msg);
49 return false;
50 }
51
52 if(!m_archiver_client.init()) {
53 m_logger->warn("Failed to connect to archiver");
54 }
55
56 if(!m_watchtower_network.cluster_connect(
57 m_opts.m_watchtower_internal_endpoints)) {
58 m_logger->warn("Failed to connect to watchtowers.");
59 }
60
61 m_atomizer_network.cluster_connect(m_opts.m_atomizer_endpoints, false);
62 if(!m_atomizer_network.connected_to_one()) {
63 m_logger->warn("Failed to connect to any atomizers");
64 }
65
66 m_atomizer_client = m_atomizer_network.start_handler([&](auto&& pkt) {
67 return atomizer_handler(std::forward<decltype(pkt)>(pkt));
68 });
69
70 constexpr auto max_wait = 3;
71 for(size_t i = 0; i < max_wait && m_shard.best_block_height() < 1;
72 i++) {
73 m_logger->info("Waiting to sync with atomizer");
74 constexpr auto wait_time = std::chrono::seconds(1);
75 std::this_thread::sleep_for(wait_time);
76 }
77
78 if(m_shard.best_block_height() < 1) {
79 m_logger->warn(
80 "Shard still not syncronized with atomizer, starting anyway");
81 }
82
83 auto ss = m_shard_network.start_server(
84 m_opts.m_shard_endpoints[m_shard_id],
85 [&](auto&& pkt) {
86 return server_handler(std::forward<decltype(pkt)>(pkt));
87 });
88
89 if(!ss.has_value()) {
90 m_logger->error("Failed to establish shard server.");
91 return false;
92 }
93
94 m_shard_server = std::move(ss.value());
95
96 auto n_threads = std::thread::hardware_concurrency();
97 for(size_t i = 0; i < n_threads; i++) {
98 m_handler_threads.emplace_back([&]() {
99 request_consumer();
100 });
101 }
102
103 return true;
104 }
105
106 auto controller::server_handler(cbdc::network::message_t&& pkt)
107 -> std::optional<cbdc::buffer> {
108 m_request_queue.push(pkt);
109 return std::nullopt;
110 }
111
112 auto controller::atomizer_handler(cbdc::network::message_t&& pkt)
113 -> std::optional<cbdc::buffer> {
114 auto maybe_blk = from_buffer<atomizer::block>(*pkt.m_pkt);
115 if(!maybe_blk.has_value()) {
116 m_logger->error("Invalid block packet");
117 return std::nullopt;
118 }
119
120 auto& blk = maybe_blk.value();
121
122 m_logger->info("Digesting block", blk.m_height, "...");
123
124 // If the block is not contiguous, catch up by requesting
125 // blocks from the archiver.
126 while(!m_shard.digest_block(blk)) {
127 m_logger->warn("Block",
128 blk.m_height,
129 "not contiguous with previous block",
130 m_shard.best_block_height());
131
132 if(blk.m_height <= m_shard.best_block_height()) {
133 break;
134 }
135
136 // Attempt to catch up to the latest block
137 for(uint64_t i = m_shard.best_block_height() + 1; i < blk.m_height;
138 i++) {
139 const auto past_blk = m_archiver_client.get_block(i);
140 if(past_blk) {
141 m_shard.digest_block(past_blk.value());
142 } else {
143 m_logger->info("Waiting for archiver sync");
144 const auto wait_time = std::chrono::milliseconds(10);
145 std::this_thread::sleep_for(wait_time);
146 i--;
147 continue;
148 }
149 }
150 }
151
152 m_logger->info("Digested block", blk.m_height);
153 return std::nullopt;
154 }
155
156 void controller::request_consumer() {
157 auto pkt = network::message_t();
158 while(m_request_queue.pop(pkt)) {
159 auto maybe_tx = from_buffer<transaction::compact_tx>(*pkt.m_pkt);
160 if(!maybe_tx.has_value()) {
161 m_logger->error("Invalid transaction packet");
162 continue;
163 }
164
165 auto& tx = maybe_tx.value();
166
167 m_logger->info("Digesting transaction", to_string(tx.m_id), "...");
168
170 tx,
172 m_opts.m_attestation_threshold)) {
173 m_logger->warn("Received invalid compact transaction",
174 to_string(tx.m_id));
175 continue;
176 }
177
178 auto res = m_shard.digest_transaction(std::move(tx));
179
180 auto res_handler = overloaded{
181 [&](const atomizer::tx_notify_request& msg) {
182 m_logger->info("Digested transaction",
183 to_string(msg.m_tx.m_id));
184
185 m_logger->debug("Sending",
186 msg.m_attestations.size(),
187 "/",
188 msg.m_tx.m_inputs.size(),
189 "attestations...");
190 if(!m_atomizer_network.send_to_one(
191 atomizer::request{msg})) {
192 m_logger->error(
193 "Failed to transmit tx to atomizer. ID:",
194 to_string(msg.m_tx.m_id));
195 }
196 },
197 [&](const cbdc::watchtower::tx_error& err) {
198 m_logger->info("error for Tx:",
199 to_string(err.tx_id()),
200 err.to_string());
201 // TODO: batch errors into a single RPC
202 auto data = std::vector<cbdc::watchtower::tx_error>{err};
203 auto buf = make_shared_buffer(data);
204 m_watchtower_network.broadcast(buf);
205 }};
206 std::visit(res_handler, res);
207 }
208 }
209}
void close()
Shuts down the network listener and all existing peer connections.
auto send_to_one(const std::shared_ptr< buffer > &data) -> bool
Send the provided data to an online peer managed by this network.
void broadcast(const std::shared_ptr< buffer > &data)
Sends the provided data to all added peers.
auto init() -> bool
Initializes the controller.
auto digest_transaction(transaction::compact_tx tx) -> std::variant< atomizer::tx_notify_request, watchtower::tx_error >
Checks the validity of a provided transaction's inputs, and returns a transaction notification to for...
Definition shard.cpp:105
Wrapper for transaction errors.
std::variant< tx_notify_request, prune_request, get_block_request > request
Atomizer RPC request.
auto check_attestations(const transaction::compact_tx &tx, const std::unordered_set< pubkey_t, hashing::null > &pubkeys, size_t threshold) -> bool
Validates the sentinel attestations attached to a compact transaction.
overloaded(Ts...) -> overloaded< Ts... >
auto from_buffer(nuraft::buffer &buf) -> std::optional< T >
Deserialize object of given type from a nuraft::buffer.
auto make_shared_buffer(const T &obj) -> std::shared_ptr< cbdc::buffer >
Serialize object into std::shared_ptr<cbdc::buffer> using a cbdc::buffer_serializer.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
Project-wide configuration options.
Definition config.hpp:132
size_t m_attestation_threshold
Number of sentinel attestations needed for a compact transaction.
Definition config.hpp:260
std::unordered_set< pubkey_t, hashing::null > m_sentinel_public_keys
Public keys for sentinels.
Definition config.hpp:257
Received message type.