OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
log_store.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "log_store.hpp"
7
8#include <array>
9#include <cstring>
10#include <leveldb/write_batch.h>
11#include <libnuraft/buffer_serializer.hxx>
12
13namespace cbdc::raft {
14 template<bool First>
15 auto get_first_or_last_index(leveldb::DB* db,
16 const leveldb::ReadOptions& opt) -> uint64_t {
17 auto it = std::unique_ptr<leveldb::Iterator>(db->NewIterator(opt));
18
19 if constexpr(First) {
20 it->SeekToFirst();
21 if(!it->Valid()) {
22 return 1;
23 }
24 } else {
25 it->SeekToLast();
26 if(!it->Valid()) {
27 return 0;
28 }
29 }
30
31 const auto last_log_key_slice = it->key();
32
33 uint64_t ret{};
34 assert(last_log_key_slice.size() == sizeof(ret));
35 std::memcpy(&ret, last_log_key_slice.data(), sizeof(ret));
36
37 return ret;
38 }
39
40 auto log_store::load(const std::string& db_dir) -> bool {
41 m_write_opt.sync = false;
42
43 leveldb::Options opt;
44 opt.create_if_missing = true;
45 opt.comparator = &m_cmp;
46
47 leveldb::DB* db_ptr{};
48 const auto res = leveldb::DB::Open(opt, db_dir, &db_ptr);
49 if(!res.ok()) {
50 return false;
51 }
52
53 {
54 std::lock_guard<std::mutex> l(m_db_mut);
55 m_db.reset(db_ptr);
56
57 m_next_idx
58 = get_first_or_last_index<false>(m_db.get(), m_read_opt) + 1;
59 m_start_idx
60 = get_first_or_last_index<true>(m_db.get(), m_read_opt);
61 }
62
63 return true;
64 }
65
66 auto log_store::next_slot() const -> uint64_t {
67 std::lock_guard<std::mutex> l(m_db_mut);
68 return m_next_idx;
69 }
70
71 auto log_store::start_index() const -> uint64_t {
72 std::lock_guard<std::mutex> l(m_db_mut);
73 return m_start_idx;
74 }
75
76 auto log_entry_from_slice(const leveldb::Slice& slice)
77 -> nuraft::ptr<nuraft::log_entry> {
78 auto buf = nuraft::buffer::alloc(slice.size());
79 std::memcpy(buf->data_begin(), slice.data(), buf->size());
80 auto entry = nuraft::log_entry::deserialize(*buf);
81 assert(entry);
82 return entry;
83 }
84
85 auto log_store::last_entry() const -> nuraft::ptr<nuraft::log_entry> {
86 nuraft::ptr<nuraft::log_entry> last_entry;
87 {
88 std::lock_guard<std::mutex> l(m_db_mut);
89 auto it = std::unique_ptr<leveldb::Iterator>(
90 m_db->NewIterator(m_read_opt));
91
92 it->SeekToLast();
93
94 if(!it->Valid()) {
95 auto null_entry
96 = nuraft::cs_new<nuraft::log_entry>(0, nullptr);
97 return null_entry;
98 }
99
100 const auto last_entry_slice = it->value();
101 last_entry = log_entry_from_slice(last_entry_slice);
102 }
103
104 return last_entry;
105 }
106
107 using data_slice = std::pair<leveldb::Slice, std::vector<char>>;
108
109 auto get_key_slice(uint64_t key) -> data_slice {
110 std::vector<char> key_arr(sizeof(key));
111 std::memcpy(key_arr.data(), &key, key_arr.size());
112 auto slice = leveldb::Slice(key_arr.data(), key_arr.size());
113 return std::make_pair(slice, std::move(key_arr));
114 }
115
116 auto get_value_slice(nuraft::ptr<nuraft::log_entry>& entry) -> data_slice {
117 const auto buf = entry->serialize();
118 std::vector<char> value_buf(buf->size());
119 std::memcpy(value_buf.data(), buf->data_begin(), value_buf.size());
120 auto slice = leveldb::Slice(value_buf.data(), value_buf.size());
121 return std::make_pair(slice, std::move(value_buf));
122 }
123
124 auto log_store::append(nuraft::ptr<nuraft::log_entry>& entry) -> uint64_t {
125 const auto value = get_value_slice(entry);
126
127 {
128 std::lock_guard<std::mutex> l(m_db_mut);
129 const auto key = get_key_slice(m_next_idx);
130 const auto status = m_db->Put(m_write_opt, key.first, value.first);
131 assert(status.ok());
132
133 m_next_idx++;
134 return m_next_idx - 1;
135 }
136 }
137
138 void log_store::write_at(uint64_t index,
139 nuraft::ptr<nuraft::log_entry>& entry) {
140 const auto key = get_key_slice(index);
141 const auto value = get_value_slice(entry);
142 leveldb::WriteBatch batch;
143 batch.Put(key.first, value.first);
144
145 {
146 std::lock_guard<std::mutex> l(m_db_mut);
147
148 std::vector<data_slice::second_type> data_slices(m_next_idx
149 - index);
150 for(uint64_t i = index + 1; i < m_next_idx; i++) {
151 auto del_key = get_key_slice(i);
152 batch.Delete(del_key.first);
153 data_slices[i - index] = std::move(del_key.second);
154 }
155
156 const auto status = m_db->Write(m_write_opt, &batch);
157 assert(status.ok());
158
159 m_next_idx = index + 1;
160 }
161 }
162
163 auto log_store::log_entries(uint64_t start, uint64_t end)
164 -> log_entries_t {
165 const auto first_key = get_key_slice(start);
166 auto ret = nuraft::cs_new<log_entries_t::element_type>(end - start);
167
168 {
169 std::lock_guard<std::mutex> l(m_db_mut);
170 auto it = std::unique_ptr<leveldb::Iterator>(
171 m_db->NewIterator(m_read_opt));
172
173 it->Seek(first_key.first);
174
175 for(size_t i{0}; i < ret->size(); [&]() {
176 it->Next();
177 i++;
178 }()) {
179 assert(it->Valid());
180 const auto val_slice = it->value();
181 auto entry = log_entry_from_slice(val_slice);
182 assert(entry);
183 (*ret)[i] = std::move(entry);
184 }
185 }
186
187 return ret;
188 }
189
190 auto log_store::entry_at(uint64_t index)
191 -> nuraft::ptr<nuraft::log_entry> {
192 const auto key = get_key_slice(index);
193 std::string val;
194
195 {
196 std::lock_guard<std::mutex> l(m_db_mut);
197 const auto status = m_db->Get(m_read_opt, key.first, &val);
198 if(!status.ok()) {
199 assert(status.IsNotFound());
200 auto null_entry
201 = nuraft::cs_new<nuraft::log_entry>(0, nullptr);
202 return null_entry;
203 }
204 }
205
206 const auto val_slice = leveldb::Slice(val.data(), val.size());
207 auto ret = log_entry_from_slice(val_slice);
208 return ret;
209 }
210
211 auto log_store::term_at(uint64_t index) -> uint64_t {
212 const auto entry = entry_at(index);
213 return entry->get_term();
214 }
215
216 auto log_store::pack(uint64_t index, int32_t cnt)
217 -> nuraft::ptr<nuraft::buffer> {
218 assert(cnt >= 0);
219 const auto entries
220 = log_entries(index, index + static_cast<uint64_t>(cnt));
221
222 std::vector<nuraft::ptr<nuraft::buffer>> bufs(
223 static_cast<size_t>(cnt));
224
225 size_t i{0};
226 size_t total_len{0};
227 for(const auto& entry : *entries) {
228 auto buf = entry->serialize();
229 total_len += buf->size();
230 bufs[i] = std::move(buf);
231 i++;
232 }
233
234 auto ret = nuraft::buffer::alloc(
235 sizeof(uint64_t) + static_cast<size_t>(cnt) * sizeof(uint64_t)
236 + total_len);
237 nuraft::buffer_serializer bs(ret);
238
239 bs.put_u64(static_cast<uint64_t>(cnt));
240
241 for(const auto& buf : bufs) {
242 bs.put_u64(buf->size());
243 bs.put_raw(buf->data_begin(), buf->size());
244 }
245
246 return ret;
247 }
248
249 void log_store::apply_pack(uint64_t index, nuraft::buffer& pack) {
250 nuraft::buffer_serializer bs(pack);
251
252 const auto cnt = bs.get_u64();
253
254 std::vector<nuraft::ptr<nuraft::log_entry>> entries(cnt);
255
256 for(size_t i{0}; i < cnt; i++) {
257 const auto len = bs.get_u64();
258 auto buf = nuraft::buffer::alloc(len);
259 bs.get_buffer(buf);
260 auto entry = nuraft::log_entry::deserialize(*buf);
261 assert(entry);
262 entries[i] = std::move(entry);
263 }
264
265 std::vector<data_slice::second_type> data_slices(entries.size() * 2);
266 leveldb::WriteBatch batch;
267 for(size_t i{0}; i < entries.size(); i++) {
268 auto key = get_key_slice(index + i);
269 auto val = get_value_slice(entries[i]);
270 batch.Put(key.first, val.first);
271
272 // Store the backing data or it will get deleted when we go out of
273 // scope
274 data_slices[i * 2] = std::move(key.second);
275 data_slices[(i * 2) + 1] = std::move(val.second);
276 }
277
278 {
279 std::lock_guard<std::mutex> l(m_db_mut);
280 const auto status = m_db->Write(m_write_opt, &batch);
281 assert(status.ok());
282
283 m_start_idx
284 = get_first_or_last_index<true>(m_db.get(), m_read_opt);
285 m_next_idx
286 = get_first_or_last_index<false>(m_db.get(), m_read_opt) + 1;
287 }
288 }
289
290 auto log_store::compact(uint64_t last_log_index) -> bool {
291 leveldb::WriteBatch batch;
292
293 {
294 std::lock_guard<std::mutex> l(m_db_mut);
295
296 const auto n_elems = last_log_index - m_start_idx + 1;
297 std::vector<data_slice::second_type> data_slices(n_elems);
298 for(uint64_t i{m_start_idx}; i <= last_log_index; i++) {
299 auto key = get_key_slice(i);
300 batch.Delete(key.first);
301 data_slices[i - m_start_idx] = std::move(key.second);
302 }
303
304 const auto status = m_db->Write(m_write_opt, &batch);
305 assert(status.ok());
306
307 m_start_idx = last_log_index + 1;
308 m_next_idx = std::max(m_next_idx, m_start_idx);
309 }
310
311 return true;
312 }
313
314 auto log_store::flush() -> bool {
315 // LevelDB does not provide a way to issue a single "flush" call. As a
316 // workaround make a dummy write with no lasting effects where sync =
317 // true.
318
319 leveldb::WriteOptions sync_opts;
320 sync_opts.sync = true;
321
322 leveldb::WriteBatch batch;
323
324 // Log entry 0 is always empty so we're not overwriting anything
325 // important
326 std::array<char, sizeof(uint64_t)> dummy_key_data{};
327 leveldb::Slice dummy_key_slice(dummy_key_data.data(),
328 dummy_key_data.size());
329 batch.Put(dummy_key_slice, dummy_key_slice);
330 batch.Delete(dummy_key_slice);
331
332 {
333 std::lock_guard<std::mutex> l(m_db_mut);
334 const auto status = m_db->Write(sync_opts, &batch);
335 return status.ok();
336 }
337 }
338}
auto next_slot() const -> uint64_t override
Return the log index of the next empty log entry.
Definition log_store.cpp:66
auto term_at(uint64_t index) -> uint64_t override
Return the log term associated with the log entry at the given index.
auto load(const std::string &db_dir) -> bool
Load the log store from the given LevelDB database directory.
Definition log_store.cpp:40
auto pack(uint64_t index, int32_t cnt) -> nuraft::ptr< nuraft::buffer > override
Serialize the given number of log entries from the given index.
auto flush() -> bool override
Flush any buffered writes to disk.
auto start_index() const -> uint64_t override
Return the first log index stored by the log store.
Definition log_store.cpp:71
void apply_pack(uint64_t index, nuraft::buffer &pack) override
Deserialize the given log entries and write them starting at the given log index.
void write_at(uint64_t index, nuraft::ptr< nuraft::log_entry > &entry) override
Write a log entry at the given index.
auto compact(uint64_t last_log_index) -> bool override
Delete log entries from the start of the log up to the given log index.
auto last_entry() const -> nuraft::ptr< nuraft::log_entry > override
Return the last log entry in the log store.
Definition log_store.cpp:85
auto append(nuraft::ptr< nuraft::log_entry > &entry) -> uint64_t override
Append the given log entry to the end of the log.
nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > log_entries_t
List of log entries.
Definition log_store.hpp:60
auto entry_at(uint64_t index) -> nuraft::ptr< nuraft::log_entry > override
Return the log entry at the given index.
auto log_entries(uint64_t start, uint64_t end) -> log_entries_t override
Return the log entries in the given range of indices.
auto get_value_slice(nuraft::ptr< nuraft::log_entry > &entry) -> data_slice
auto get_first_or_last_index(leveldb::DB *db, const leveldb::ReadOptions &opt) -> uint64_t
Definition log_store.cpp:15
auto log_entry_from_slice(const leveldb::Slice &slice) -> nuraft::ptr< nuraft::log_entry >
Definition log_store.cpp:76
auto get_key_slice(uint64_t key) -> data_slice
std::pair< leveldb::Slice, std::vector< char > > data_slice