16 std::shared_ptr<logging::log> logger)
17 : m_shard_id(shard_id),
18 m_opts(std::move(opts)),
19 m_logger(std::move(logger)),
20 m_shard(m_opts.m_shard_ranges[shard_id]),
21 m_archiver_client(m_opts.m_archiver_endpoints[0], m_logger) {}
24 m_shard_network.
close();
25 m_atomizer_network.
close();
27 if(m_shard_server.joinable()) {
28 m_shard_server.join();
30 if(m_atomizer_client.joinable()) {
31 m_atomizer_client.join();
34 m_request_queue.clear();
35 for(
auto& t : m_handler_threads) {
44 = m_shard.open_db(m_opts.m_shard_db_dirs[m_shard_id])) {
45 m_logger->error(
"Failed to open shard DB for shard",
52 if(!m_archiver_client.init()) {
53 m_logger->warn(
"Failed to connect to archiver");
56 if(!m_watchtower_network.cluster_connect(
57 m_opts.m_watchtower_internal_endpoints)) {
58 m_logger->warn(
"Failed to connect to watchtowers.");
61 m_atomizer_network.cluster_connect(m_opts.m_atomizer_endpoints,
false);
62 if(!m_atomizer_network.connected_to_one()) {
63 m_logger->warn(
"Failed to connect to any atomizers");
66 m_atomizer_client = m_atomizer_network.start_handler([&](
auto&& pkt) {
67 return atomizer_handler(std::forward<
decltype(pkt)>(pkt));
70 constexpr auto max_wait = 3;
71 for(
size_t i = 0; i < max_wait && m_shard.best_block_height() < 1;
73 m_logger->info(
"Waiting to sync with atomizer");
74 constexpr auto wait_time = std::chrono::seconds(1);
75 std::this_thread::sleep_for(wait_time);
78 if(m_shard.best_block_height() < 1) {
80 "Shard still not syncronized with atomizer, starting anyway");
83 auto ss = m_shard_network.start_server(
84 m_opts.m_shard_endpoints[m_shard_id],
86 return server_handler(std::forward<decltype(pkt)>(pkt));
90 m_logger->error(
"Failed to establish shard server.");
94 m_shard_server = std::move(ss.value());
96 auto n_threads = std::thread::hardware_concurrency();
97 for(
size_t i = 0; i < n_threads; i++) {
98 m_handler_threads.emplace_back([&]() {
107 -> std::optional<cbdc::buffer> {
108 m_request_queue.push(pkt);
113 -> std::optional<cbdc::buffer> {
115 if(!maybe_blk.has_value()) {
116 m_logger->error(
"Invalid block packet");
120 auto& blk = maybe_blk.value();
122 m_logger->info(
"Digesting block", blk.m_height,
"...");
126 while(!m_shard.digest_block(blk)) {
127 m_logger->warn(
"Block",
129 "not contiguous with previous block",
130 m_shard.best_block_height());
132 if(blk.m_height <= m_shard.best_block_height()) {
137 for(uint64_t i = m_shard.best_block_height() + 1; i < blk.m_height;
139 const auto past_blk = m_archiver_client.get_block(i);
141 m_shard.digest_block(past_blk.value());
143 m_logger->info(
"Waiting for archiver sync");
144 const auto wait_time = std::chrono::milliseconds(10);
145 std::this_thread::sleep_for(wait_time);
152 m_logger->info(
"Digested block", blk.m_height);
156 void controller::request_consumer() {
157 auto pkt = network::message_t();
158 while(m_request_queue.pop(pkt)) {
160 if(!maybe_tx.has_value()) {
161 m_logger->error(
"Invalid transaction packet");
165 auto& tx = maybe_tx.value();
167 m_logger->info(
"Digesting transaction",
to_string(tx.m_id),
"...");
173 m_logger->warn(
"Received invalid compact transaction",
181 [&](
const atomizer::tx_notify_request& msg) {
182 m_logger->info(
"Digested transaction",
185 m_logger->debug(
"Sending",
186 msg.m_attestations.size(),
188 msg.m_tx.m_inputs.size(),
193 "Failed to transmit tx to atomizer. ID:",
198 m_logger->info(
"error for Tx:",
202 auto data = std::vector<cbdc::watchtower::tx_error>{err};
206 std::visit(res_handler, res);
void close()
Shuts down the network listener and all existing peer connections.
auto send_to_one(const std::shared_ptr< buffer > &data) -> bool
Send the provided data to an online peer managed by this network.
void broadcast(const std::shared_ptr< buffer > &data)
Sends the provided data to all added peers.
auto init() -> bool
Initializes the controller.
auto digest_transaction(transaction::compact_tx tx) -> std::variant< atomizer::tx_notify_request, watchtower::tx_error >
Checks the validity of a provided transaction's inputs, and returns a transaction notification to for...
Wrapper for transaction errors.
std::variant< tx_notify_request, prune_request, get_block_request > request
Atomizer RPC request.
auto check_attestations(const transaction::compact_tx &tx, const std::unordered_set< pubkey_t, hashing::null > &pubkeys, size_t threshold) -> bool
Validates the sentinel attestations attached to a compact transaction.
overloaded(Ts...) -> overloaded< Ts... >
auto from_buffer(nuraft::buffer &buf) -> std::optional< T >
Deserialize object of given type from a nuraft::buffer.
auto make_shared_buffer(const T &obj) -> std::shared_ptr< cbdc::buffer >
Serialize object into std::shared_ptr<cbdc::buffer> using a cbdc::buffer_serializer.
auto to_string(const hash_t &val) -> std::string
Converts a hash to a hexadecimal string.
Project-wide configuration options.
size_t m_attestation_threshold
Number of sentinel attestations needed for a compact transaction.
std::unordered_set< pubkey_t, hashing::null > m_sentinel_public_keys
Public keys for sentinels.