OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
uhs/atomizer/watchtower/controller.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "controller.hpp"
7
8#include "status_update.hpp"
11
12#include <utility>
13
15 uint32_t watchtower_id,
17 const std::shared_ptr<logging::log>& log)
18 : m_watchtower_id(watchtower_id),
19 m_opts(std::move(opts)),
20 m_logger(log),
21 m_watchtower(m_opts.m_watchtower_block_cache_size,
22 m_opts.m_watchtower_error_cache_size),
23 m_archiver_client(m_opts.m_archiver_endpoints[0], log) {}
24
26 m_internal_network.close();
27 m_external_network.close();
28 m_atomizer_network.close();
29
30 if(m_internal_server.joinable()) {
31 m_internal_server.join();
32 }
33
34 if(m_external_server.joinable()) {
35 m_external_server.join();
36 }
37
38 if(m_atomizer_thread.joinable()) {
39 m_atomizer_thread.join();
40 }
41}
42
44 auto internal = m_internal_network.start_server(
45 m_opts.m_watchtower_internal_endpoints[m_watchtower_id],
46 [&](auto&& pkt) {
47 return internal_server_handler(std::forward<decltype(pkt)>(pkt));
48 });
49
50 if(!internal.has_value()) {
51 m_logger->error("Failed to establish watchtower internal server.");
52 return false;
53 }
54
55 m_internal_server = std::move(internal.value());
56
57 auto external = m_external_network.start_server(
58 m_opts.m_watchtower_client_endpoints[m_watchtower_id],
59 [&](auto&& pkt) {
60 return external_server_handler(std::forward<decltype(pkt)>(pkt));
61 });
62
63 if(!external.has_value()) {
64 m_logger->error("Failed to establish watchtower external server.");
65 return false;
66 }
67
68 m_external_server = std::move(external.value());
69
70 static constexpr auto retry_delay = std::chrono::seconds(1);
71 m_atomizer_network.cluster_connect(m_opts.m_atomizer_endpoints, false);
72 while(!m_atomizer_network.connected_to_one()) {
73 // Since atomizers require a watchtower and the archiver requires an
74 // atomizer, this has to be allowed to fail. The network will reconnect
75 // when an atomizer comes online.
76 m_logger->warn("Failed to connect to any atomizers, waiting...");
77 std::this_thread::sleep_for(retry_delay);
78 }
79
80 while(!m_archiver_client.init()) {
81 m_logger->warn("Failed to connect to archiver, retrying...");
82 std::this_thread::sleep_for(retry_delay);
83 }
84
85 m_atomizer_thread = m_atomizer_network.start_handler([&](auto&& pkt) {
86 return atomizer_handler(std::forward<decltype(pkt)>(pkt));
87 });
88
89 m_logger->info("Connected to atomizers.");
90
91 return true;
92}
93
94auto cbdc::watchtower::controller::atomizer_handler(
95 cbdc::network::message_t&& pkt) -> std::optional<cbdc::buffer> {
96 auto maybe_blk = from_buffer<atomizer::block>(*pkt.m_pkt);
97 if(!maybe_blk.has_value()) {
98 m_logger->error("Invalid block packet");
99 return std::nullopt;
100 }
101 auto& blk = maybe_blk.value();
102 m_logger->debug("Received block",
103 blk.m_height,
104 "with",
105 blk.m_transactions.size(),
106 "transactions.");
107 if(blk.m_height != (m_last_blk_height + 1)) {
108 m_logger->warn("Block not contiguous. Last block:", m_last_blk_height);
109 while(blk.m_height != (m_last_blk_height + 1)) {
110 auto missed_blk
111 = m_archiver_client.get_block(m_last_blk_height + 1);
112 if(!missed_blk) {
113 m_logger->warn("Waiting for archiver sync");
114 static constexpr auto archiver_wait_time
115 = std::chrono::milliseconds(100);
116 std::this_thread::sleep_for(archiver_wait_time);
117 continue;
118 }
119
120 m_last_blk_height = (*missed_blk).m_height;
121 m_watchtower.add_block(std::move(*missed_blk));
122 }
123 }
124 m_last_blk_height = blk.m_height;
125 m_watchtower.add_block(std::move(blk));
126 return std::nullopt;
127}
128
129auto cbdc::watchtower::controller::internal_server_handler(
130 cbdc::network::message_t&& pkt) -> std::optional<cbdc::buffer> {
131 auto maybe_errs
133 if(!maybe_errs.has_value()) {
134 m_logger->error("Invalid internal request packet");
135 return std::nullopt;
136 }
137 m_watchtower.add_errors(std::move(maybe_errs.value()));
138 return std::nullopt;
139}
140
141auto cbdc::watchtower::controller::external_server_handler(
142 cbdc::network::message_t&& pkt) -> std::optional<cbdc::buffer> {
143 auto deser = cbdc::buffer_serializer(*pkt.m_pkt);
144 auto req = request(deser);
145 auto res_handler = overloaded{
147 -> cbdc::buffer {
148 auto res = m_watchtower.handle_status_update_request(su_req);
149 m_logger->info("Received status_update_request with",
150 su_req.uhs_ids().size(),
151 "UHS IDs");
152 return make_buffer(*res);
153 },
155 -> cbdc::buffer {
156 auto res = m_watchtower.handle_best_block_height_request(bbh_req);
157 m_logger->info("Received request_best_block_height from peer",
158 pkt.m_peer_id);
159 return make_buffer(*res);
160 }};
161 auto msg = std::visit(res_handler, req.payload());
162 return msg;
163}
164
166 return m_last_blk_height;
167}
Serializer implementation for buffer.
Buffer to store and retrieve byte data.
Definition buffer.hpp:15
auto init() -> bool
Initializes the controller.
Network request to interact with the Watchtower's status update service.
exec_request request
Agent RPC request type.
overloaded(Ts...) -> overloaded< Ts... >
auto from_buffer(nuraft::buffer &buf) -> std::optional< T >
Deserialize object of given type from a nuraft::buffer.
auto make_buffer(const T &obj) -> std::enable_if_t< std::is_same_v< B, nuraft::ptr< nuraft::buffer > >, nuraft::ptr< nuraft::buffer > >
Serialize object into nuraft::buffer using a cbdc::nuraft_serializer.
Project-wide configuration options.
Definition config.hpp:132
Received message type.
Request the watchtower's known best block height.