OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
uhs/atomizer/archiver/controller.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "controller.hpp"
7
11
12#include <leveldb/write_batch.h>
13#include <utility>
14
15namespace cbdc::archiver {
16
18 // Set base class member:
19 sync = do_sync;
20 }
21
22 const leveldbWriteOptions controller::m_write_options{true};
23
24 controller::controller(uint32_t archiver_id,
26 std::shared_ptr<logging::log> log,
27 size_t max_samples)
28 : m_archiver_id(archiver_id),
29 m_opts(std::move(opts)),
30 m_logger(std::move(log)),
31 m_max_samples(max_samples) {}
32
34 m_atomizer_network.close();
35 m_archiver_network.close();
36
37 m_running = false;
38
39 if(m_atomizer_handler_thread.joinable()) {
40 m_atomizer_handler_thread.join();
41 }
42
43 if(m_archiver_server.joinable()) {
44 m_archiver_server.join();
45 }
46 }
47
48 auto controller::init() -> bool {
49 if(!init_leveldb()) {
50 return false;
51 }
52 if(!init_best_block()) {
53 return false;
54 }
55 if(!init_sample_collection()) {
56 return false;
57 }
58 if(!init_atomizer_connection()) {
59 return false;
60 }
61 if(!init_archiver_server()) {
62 return false;
63 }
64 return true;
65 }
66
67 auto controller::init_leveldb() -> bool {
68 leveldb::Options opt;
69 opt.create_if_missing = true;
70 opt.paranoid_checks = true;
71 opt.compression = leveldb::kNoCompression;
72
73 leveldb::DB* db_ptr{};
74 const auto res
75 = leveldb::DB::Open(opt,
76 m_opts.m_archiver_db_dirs[m_archiver_id],
77 &db_ptr);
78 if(!res.ok()) {
79 m_logger->error(res.ToString());
80 return false;
81 }
82 m_db.reset(db_ptr);
83 return true;
84 }
85
87 std::string bestblock_val;
88 const auto blk_res
89 = m_db->Get(m_read_options, m_bestblock_key, &bestblock_val);
90 if(blk_res.IsNotFound()) {
91 bestblock_val = std::to_string(0);
92 const auto wr_res
93 = m_db->Put(m_write_options, m_bestblock_key, bestblock_val);
94 if(!wr_res.ok()) {
95 return false;
96 }
97 }
98 m_best_height = static_cast<uint64_t>(std::stoul(bestblock_val));
99 return true;
100 }
101
103 m_tp_sample_file.open("tp_samples.txt");
104 if(!m_tp_sample_file.good()) {
105 return false;
106 }
107 m_last_block_time = std::chrono::high_resolution_clock::now();
108 m_sample_collection_active = true;
109 return true;
110 }
111
113 m_atomizer_network.cluster_connect(m_opts.m_atomizer_endpoints, false);
114 if(!m_atomizer_network.connected_to_one()) {
115 m_logger->warn("Failed to connect to any atomizers.");
116 }
117
118 m_atomizer_handler_thread
119 = m_atomizer_network.start_handler([&](auto&& pkt) {
120 return atomizer_handler(std::forward<decltype(pkt)>(pkt));
121 });
122
123 return true;
124 }
125
127 auto as = m_archiver_network.start_server(
128 m_opts.m_archiver_endpoints[m_archiver_id],
129 [&](auto&& pkt) {
130 return server_handler(std::forward<decltype(pkt)>(pkt));
131 });
132
133 if(!as.has_value()) {
134 m_logger->error("Failed to establish shard server.");
135 return false;
136 }
137
138 m_archiver_server = std::move(as.value());
139
140 return true;
141 }
142
143 auto controller::running() const -> bool {
144 return m_running;
145 }
146
148 -> std::optional<cbdc::buffer> {
149 auto req = from_buffer<request>(*pkt.m_pkt);
150 if(!req.has_value()) {
151 m_logger->error("Invalid request packet");
152 return std::nullopt;
153 }
154 auto blk = get_block(req.value());
155 return cbdc::make_buffer(blk);
156 }
157
159 -> std::optional<cbdc::buffer> {
160 auto blk = from_buffer<atomizer::block>(*pkt.m_pkt);
161 if(!blk.has_value()) {
162 m_logger->error("Invalid request packet");
163 return std::nullopt;
164 }
165 if((m_max_samples != 0) && (m_samples >= m_max_samples)) {
166 m_running = false;
167 return std::nullopt;
168 }
169 digest_block(blk.value());
170 return std::nullopt;
171 }
172
173 auto controller::best_block_height() const -> uint64_t {
174 return m_best_height;
175 }
176
178 if(m_best_height == 0) {
179 // This is the first call to digest_block. Check if there is
180 // already a best height value in the database and set it if so.
181 std::string bestblock_val;
182 const auto blk_res
183 = m_db->Get(m_read_options, m_bestblock_key, &bestblock_val);
184 if(blk_res.ok()) {
185 m_best_height
186 = static_cast<uint64_t>(std::stoul(bestblock_val));
187 }
188 }
189
190 cbdc::atomizer::block next_blk;
191 bool digest_next{false};
192 {
193 if(blk.m_height <= m_best_height) {
194 m_logger->warn("Not processing duplicate block h:",
195 blk.m_height);
196 return;
197 }
198
199 if(blk.m_height != m_best_height + 1) {
200 // Not contiguous, check prev block isn't deferred already
201 auto it = m_deferred.find(blk.m_height - 1);
202 if(it == m_deferred.end()) {
203 // Request previous block from atomizer cluster
204 request_block(blk.m_height - 1);
205 }
206 m_deferred.emplace(blk.m_height, blk);
207 return;
208 }
209
210 leveldb::WriteBatch batch;
211
212 m_logger->trace("Digesting block ", blk.m_height, "... ");
213
214 auto blk_bytes = make_buffer(blk);
215 leveldb::Slice blk_slice(blk_bytes.c_str(), blk_bytes.size());
216
217 const auto height_str = std::to_string(blk.m_height);
218
219 batch.Put(height_str, blk_slice);
220 batch.Put(m_bestblock_key, height_str);
221 m_best_height++;
222
223 const auto res = m_db->Write(m_write_options, &batch);
224 assert(res.ok());
225
226 m_logger->trace("Digested block ", blk.m_height);
227 if(m_sample_collection_active) {
228 const auto old_block_time = m_last_block_time;
229 m_last_block_time = std::chrono::high_resolution_clock::now();
230 const auto s_since_last_block = std::chrono::duration<double>(
231 m_last_block_time - old_block_time);
232 const auto tx_throughput
233 = static_cast<double>(blk.m_transactions.size())
234 / s_since_last_block.count();
235
236 m_tp_sample_file << tx_throughput << std::endl;
237 m_samples++;
238 }
239
240 // Tell the atomizer cluster to prune all blocks <
241 // m_best_height
242 request_prune(m_best_height);
243
244 auto it = m_deferred.find(blk.m_height + 1);
245 if(it != m_deferred.end()) {
246 next_blk = it->second;
247 digest_next = true;
248 }
249
250 m_deferred.erase(blk.m_height);
251 }
252
253 if(digest_next) {
254 // TODO: this can recurse back to genesis. In a long-running system
255 // we'll want an alternative method of building a new archiver node
256 // and limit the depth of recursion here.
257 digest_block(next_blk);
258 }
259 }
260
261 auto controller::get_block(uint64_t height)
262 -> std::optional<cbdc::atomizer::block> {
263 m_logger->trace(__func__, "(", height, ")");
264 std::string height_str = std::to_string(height);
265 std::string blk_str;
266 // Assume blocks with 100k 256-byte transactions.
267 static constexpr const auto expected_entry_sz = 256 * 100000;
268 blk_str.reserve(expected_entry_sz);
269 auto s = m_db->Get(m_read_options, height_str, &blk_str);
270
271 if(!s.ok()) {
272 m_logger->warn("block", height, "not found");
273 m_logger->trace("end", __func__);
274
275 return std::nullopt;
276 }
277
278 auto buf = cbdc::buffer();
279 buf.append(blk_str.data(), blk_str.size());
280 auto blk = from_buffer<atomizer::block>(buf);
281 assert(blk.has_value());
282 m_logger->trace("found block", height, "-", blk.value().m_height);
283 return blk.value();
284 }
285
286 void controller::request_block(uint64_t height) {
287 m_logger->trace("Requesting block", height);
288 auto req = atomizer::get_block_request{height};
289 auto pkt = make_shared_buffer(atomizer::request{req});
290 if(!m_atomizer_network.send_to_one(pkt)) {
291 m_logger->error("Failed to request block", height);
292 }
293 }
294
295 void controller::request_prune(uint64_t height) {
296 m_logger->trace("Requesting prune h <", height);
297 auto req = atomizer::prune_request{height};
298 auto pkt = make_shared_buffer(atomizer::request{req});
299 if(!m_atomizer_network.send_to_one(pkt)) {
300 m_logger->error("Failed to request prune", height);
301 }
302 }
303}
auto get_block(uint64_t height) -> std::optional< cbdc::atomizer::block >
Queries the archiver database for the block at the specified height.
auto init() -> bool
Initializes the controller with all its dependencies.
auto running() const -> bool
Returns true if this archiver is receiving blocks from the atomizer.
auto init_sample_collection() -> bool
Initializes the sample collection.
auto atomizer_handler(cbdc::network::message_t &&pkt) -> std::optional< cbdc::buffer >
Receives a serialized block from the atomizer and digests it.
void digest_block(const cbdc::atomizer::block &blk)
Adds a block to the archiver database.
auto init_leveldb() -> bool
Initializes the LevelDB database.
auto best_block_height() const -> uint64_t
Returns the archiver's best block height.
auto init_archiver_server() -> bool
Initializes the archiver server.
auto server_handler(cbdc::network::message_t &&pkt) -> std::optional< cbdc::buffer >
Receives a request for an archived block and returns the block.
auto init_best_block() -> bool
Initializes the best block value.
auto init_atomizer_connection() -> bool
Initializes the connection to the atomizer.
void close()
Shuts down the network listener and all existing peer connections.
auto send_to_one(const std::shared_ptr< buffer > &data) -> bool
Send the provided data to an online peer managed by this network.
std::variant< tx_notify_request, prune_request, get_block_request > request
Atomizer RPC request.
auto from_buffer(nuraft::buffer &buf) -> std::optional< T >
Deserialize object of given type from a nuraft::buffer.
@ buffer
A singular RLP value (byte array)
auto make_shared_buffer(const T &obj) -> std::shared_ptr< cbdc::buffer >
Serialize object into std::shared_ptr<cbdc::buffer> using a cbdc::buffer_serializer.
auto make_buffer(const T &obj) -> std::enable_if_t< std::is_same_v< B, nuraft::ptr< nuraft::buffer > >, nuraft::ptr< nuraft::buffer > >
Serialize object into nuraft::buffer using a cbdc::nuraft_serializer.
Wrapper for leveldb::WriteOptions to provide a constructor to set base class member "sync".
Batch of compact transactions settled by the atomizer.
Definition block.hpp:19
std::vector< transaction::compact_tx > m_transactions
Compact transactions settled by the atomizer in this block.
Definition block.hpp:26
uint64_t m_height
Index of this block in the overall contiguous sequence of blocks from the first block starting at hei...
Definition block.hpp:24
Project-wide configuration options.
Definition config.hpp:132
Received message type.