12#include <leveldb/write_batch.h>
26 std::shared_ptr<logging::log> log,
28 : m_archiver_id(archiver_id),
29 m_opts(std::move(opts)),
30 m_logger(std::move(log)),
31 m_max_samples(max_samples) {}
34 m_atomizer_network.
close();
35 m_archiver_network.
close();
39 if(m_atomizer_handler_thread.joinable()) {
40 m_atomizer_handler_thread.join();
43 if(m_archiver_server.joinable()) {
44 m_archiver_server.join();
52 if(!init_best_block()) {
55 if(!init_sample_collection()) {
58 if(!init_atomizer_connection()) {
61 if(!init_archiver_server()) {
69 opt.create_if_missing =
true;
70 opt.paranoid_checks =
true;
71 opt.compression = leveldb::kNoCompression;
73 leveldb::DB* db_ptr{};
75 = leveldb::DB::Open(opt,
76 m_opts.m_archiver_db_dirs[m_archiver_id],
79 m_logger->error(res.ToString());
87 std::string bestblock_val;
89 = m_db->Get(m_read_options, m_bestblock_key, &bestblock_val);
90 if(blk_res.IsNotFound()) {
91 bestblock_val = std::to_string(0);
93 = m_db->Put(m_write_options, m_bestblock_key, bestblock_val);
98 m_best_height =
static_cast<uint64_t
>(std::stoul(bestblock_val));
103 m_tp_sample_file.open(
"tp_samples.txt");
104 if(!m_tp_sample_file.good()) {
107 m_last_block_time = std::chrono::high_resolution_clock::now();
108 m_sample_collection_active =
true;
113 m_atomizer_network.cluster_connect(m_opts.m_atomizer_endpoints,
false);
114 if(!m_atomizer_network.connected_to_one()) {
115 m_logger->warn(
"Failed to connect to any atomizers.");
118 m_atomizer_handler_thread
119 = m_atomizer_network.start_handler([&](
auto&& pkt) {
120 return atomizer_handler(std::forward<
decltype(pkt)>(pkt));
127 auto as = m_archiver_network.start_server(
128 m_opts.m_archiver_endpoints[m_archiver_id],
130 return server_handler(std::forward<decltype(pkt)>(pkt));
133 if(!as.has_value()) {
134 m_logger->error(
"Failed to establish shard server.");
138 m_archiver_server = std::move(as.value());
148 -> std::optional<cbdc::buffer> {
150 if(!req.has_value()) {
151 m_logger->error(
"Invalid request packet");
154 auto blk = get_block(req.value());
159 -> std::optional<cbdc::buffer> {
161 if(!blk.has_value()) {
162 m_logger->error(
"Invalid request packet");
165 if((m_max_samples != 0) && (m_samples >= m_max_samples)) {
169 digest_block(blk.value());
174 return m_best_height;
178 if(m_best_height == 0) {
181 std::string bestblock_val;
183 = m_db->Get(m_read_options, m_bestblock_key, &bestblock_val);
186 =
static_cast<uint64_t
>(std::stoul(bestblock_val));
191 bool digest_next{
false};
194 m_logger->warn(
"Not processing duplicate block h:",
199 if(blk.
m_height != m_best_height + 1) {
201 auto it = m_deferred.find(blk.
m_height - 1);
202 if(it == m_deferred.end()) {
206 m_deferred.emplace(blk.
m_height, blk);
210 leveldb::WriteBatch batch;
212 m_logger->trace(
"Digesting block ", blk.
m_height,
"... ");
215 leveldb::Slice blk_slice(blk_bytes.c_str(), blk_bytes.size());
217 const auto height_str = std::to_string(blk.
m_height);
219 batch.Put(height_str, blk_slice);
220 batch.Put(m_bestblock_key, height_str);
223 const auto res = m_db->Write(m_write_options, &batch);
226 m_logger->trace(
"Digested block ", blk.
m_height);
227 if(m_sample_collection_active) {
228 const auto old_block_time = m_last_block_time;
229 m_last_block_time = std::chrono::high_resolution_clock::now();
230 const auto s_since_last_block = std::chrono::duration<double>(
231 m_last_block_time - old_block_time);
232 const auto tx_throughput
234 / s_since_last_block.count();
236 m_tp_sample_file << tx_throughput << std::endl;
242 request_prune(m_best_height);
244 auto it = m_deferred.find(blk.
m_height + 1);
245 if(it != m_deferred.end()) {
246 next_blk = it->second;
262 -> std::optional<cbdc::atomizer::block> {
263 m_logger->trace(__func__,
"(", height,
")");
264 std::string height_str = std::to_string(height);
267 static constexpr const auto expected_entry_sz = 256 * 100000;
268 blk_str.reserve(expected_entry_sz);
269 auto s = m_db->Get(m_read_options, height_str, &blk_str);
272 m_logger->warn(
"block", height,
"not found");
273 m_logger->trace(
"end", __func__);
279 buf.append(blk_str.data(), blk_str.size());
281 assert(blk.has_value());
282 m_logger->trace(
"found block", height,
"-", blk.value().m_height);
286 void controller::request_block(uint64_t height) {
287 m_logger->trace(
"Requesting block", height);
291 m_logger->error(
"Failed to request block", height);
295 void controller::request_prune(uint64_t height) {
296 m_logger->trace(
"Requesting prune h <", height);
297 auto req = atomizer::prune_request{height};
300 m_logger->error(
"Failed to request prune", height);
auto get_block(uint64_t height) -> std::optional< cbdc::atomizer::block >
Queries the archiver database for the block at the specified height.
auto init() -> bool
Initializes the controller with all its dependencies.
auto running() const -> bool
Returns true if this archiver is receiving blocks from the atomizer.
auto init_sample_collection() -> bool
Initializes the sample collection.
auto atomizer_handler(cbdc::network::message_t &&pkt) -> std::optional< cbdc::buffer >
Receives a serialized block from the atomizer and digests it.
void digest_block(const cbdc::atomizer::block &blk)
Adds a block to the archiver database.
auto init_leveldb() -> bool
Initializes the LevelDB database.
auto best_block_height() const -> uint64_t
Returns the archiver's best block height.
auto init_archiver_server() -> bool
Initializes the archiver server.
auto server_handler(cbdc::network::message_t &&pkt) -> std::optional< cbdc::buffer >
Receives a request for an archived block and returns the block.
auto init_best_block() -> bool
Initializes the best block value.
auto init_atomizer_connection() -> bool
Initializes the connection to the atomizer.
void close()
Shuts down the network listener and all existing peer connections.
auto send_to_one(const std::shared_ptr< buffer > &data) -> bool
Send the provided data to an online peer managed by this network.
std::variant< tx_notify_request, prune_request, get_block_request > request
Atomizer RPC request.
auto from_buffer(nuraft::buffer &buf) -> std::optional< T >
Deserialize object of given type from a nuraft::buffer.
@ buffer
A singular RLP value (byte array)
auto make_shared_buffer(const T &obj) -> std::shared_ptr< cbdc::buffer >
Serialize object into std::shared_ptr<cbdc::buffer> using a cbdc::buffer_serializer.
auto make_buffer(const T &obj) -> std::enable_if_t< std::is_same_v< B, nuraft::ptr< nuraft::buffer > >, nuraft::ptr< nuraft::buffer > >
Serialize object into nuraft::buffer using a cbdc::nuraft_serializer.
Wrapper for leveldb::WriteOptions to provide a constructor to set base class member "sync".
leveldbWriteOptions(bool do_sync)
Batch of compact transactions settled by the atomizer.
std::vector< transaction::compact_tx > m_transactions
Compact transactions settled by the atomizer in this block.
uint64_t m_height
Index of this block in the overall contiguous sequence of blocks from the first block starting at hei...
Retrieve cached block request.
Project-wide configuration options.