10#include <leveldb/write_batch.h>
11#include <libnuraft/buffer_serializer.hxx>
16 const leveldb::ReadOptions& opt) -> uint64_t {
17 auto it = std::unique_ptr<leveldb::Iterator>(db->NewIterator(opt));
31 const auto last_log_key_slice = it->key();
34 assert(last_log_key_slice.size() ==
sizeof(ret));
35 std::memcpy(&ret, last_log_key_slice.data(),
sizeof(ret));
41 m_write_opt.sync =
false;
44 opt.create_if_missing =
true;
45 opt.comparator = &m_cmp;
47 leveldb::DB* db_ptr{};
48 const auto res = leveldb::DB::Open(opt, db_dir, &db_ptr);
54 std::lock_guard<std::mutex> l(m_db_mut);
67 std::lock_guard<std::mutex> l(m_db_mut);
72 std::lock_guard<std::mutex> l(m_db_mut);
77 -> nuraft::ptr<nuraft::log_entry> {
78 auto buf = nuraft::buffer::alloc(slice.size());
79 std::memcpy(buf->data_begin(), slice.data(), buf->size());
80 auto entry = nuraft::log_entry::deserialize(*buf);
88 std::lock_guard<std::mutex> l(m_db_mut);
89 auto it = std::unique_ptr<leveldb::Iterator>(
90 m_db->NewIterator(m_read_opt));
96 = nuraft::cs_new<nuraft::log_entry>(0,
nullptr);
100 const auto last_entry_slice = it->value();
107 using data_slice = std::pair<leveldb::Slice, std::vector<char>>;
110 std::vector<char> key_arr(
sizeof(key));
111 std::memcpy(key_arr.data(), &key, key_arr.size());
112 auto slice = leveldb::Slice(key_arr.data(), key_arr.size());
113 return std::make_pair(slice, std::move(key_arr));
117 const auto buf = entry->serialize();
118 std::vector<char> value_buf(buf->size());
119 std::memcpy(value_buf.data(), buf->data_begin(), value_buf.size());
120 auto slice = leveldb::Slice(value_buf.data(), value_buf.size());
121 return std::make_pair(slice, std::move(value_buf));
128 std::lock_guard<std::mutex> l(m_db_mut);
130 const auto status = m_db->Put(m_write_opt, key.first, value.first);
134 return m_next_idx - 1;
139 nuraft::ptr<nuraft::log_entry>& entry) {
142 leveldb::WriteBatch batch;
143 batch.Put(key.first, value.first);
146 std::lock_guard<std::mutex> l(m_db_mut);
148 std::vector<data_slice::second_type> data_slices(m_next_idx
150 for(uint64_t i = index + 1; i < m_next_idx; i++) {
152 batch.Delete(del_key.first);
153 data_slices[i - index] = std::move(del_key.second);
156 const auto status = m_db->Write(m_write_opt, &batch);
159 m_next_idx = index + 1;
166 auto ret = nuraft::cs_new<log_entries_t::element_type>(end - start);
169 std::lock_guard<std::mutex> l(m_db_mut);
170 auto it = std::unique_ptr<leveldb::Iterator>(
171 m_db->NewIterator(m_read_opt));
173 it->Seek(first_key.first);
175 for(
size_t i{0}; i < ret->size(); [&]() {
180 const auto val_slice = it->value();
183 (*ret)[i] = std::move(entry);
191 -> nuraft::ptr<nuraft::log_entry> {
196 std::lock_guard<std::mutex> l(m_db_mut);
197 const auto status = m_db->Get(m_read_opt, key.first, &val);
199 assert(status.IsNotFound());
201 = nuraft::cs_new<nuraft::log_entry>(0,
nullptr);
206 const auto val_slice = leveldb::Slice(val.data(), val.size());
212 const auto entry = entry_at(index);
213 return entry->get_term();
217 -> nuraft::ptr<nuraft::buffer> {
220 = log_entries(index, index +
static_cast<uint64_t
>(cnt));
222 std::vector<nuraft::ptr<nuraft::buffer>> bufs(
223 static_cast<size_t>(cnt));
227 for(
const auto& entry : *entries) {
228 auto buf = entry->serialize();
229 total_len += buf->size();
230 bufs[i] = std::move(buf);
234 auto ret = nuraft::buffer::alloc(
235 sizeof(uint64_t) +
static_cast<size_t>(cnt) *
sizeof(uint64_t)
237 nuraft::buffer_serializer bs(ret);
239 bs.put_u64(
static_cast<uint64_t
>(cnt));
241 for(
const auto& buf : bufs) {
242 bs.put_u64(buf->size());
243 bs.put_raw(buf->data_begin(), buf->size());
250 nuraft::buffer_serializer bs(
pack);
252 const auto cnt = bs.get_u64();
254 std::vector<nuraft::ptr<nuraft::log_entry>> entries(cnt);
256 for(
size_t i{0}; i < cnt; i++) {
257 const auto len = bs.get_u64();
258 auto buf = nuraft::buffer::alloc(len);
260 auto entry = nuraft::log_entry::deserialize(*buf);
262 entries[i] = std::move(entry);
265 std::vector<data_slice::second_type> data_slices(entries.size() * 2);
266 leveldb::WriteBatch batch;
267 for(
size_t i{0}; i < entries.size(); i++) {
270 batch.Put(key.first, val.first);
274 data_slices[i * 2] = std::move(key.second);
275 data_slices[(i * 2) + 1] = std::move(val.second);
279 std::lock_guard<std::mutex> l(m_db_mut);
280 const auto status = m_db->Write(m_write_opt, &batch);
291 leveldb::WriteBatch batch;
294 std::lock_guard<std::mutex> l(m_db_mut);
296 const auto n_elems = last_log_index - m_start_idx + 1;
297 std::vector<data_slice::second_type> data_slices(n_elems);
298 for(uint64_t i{m_start_idx}; i <= last_log_index; i++) {
300 batch.Delete(key.first);
301 data_slices[i - m_start_idx] = std::move(key.second);
304 const auto status = m_db->Write(m_write_opt, &batch);
307 m_start_idx = last_log_index + 1;
308 m_next_idx = std::max(m_next_idx, m_start_idx);
319 leveldb::WriteOptions sync_opts;
320 sync_opts.sync =
true;
322 leveldb::WriteBatch batch;
326 std::array<char,
sizeof(uint64_t)> dummy_key_data{};
327 leveldb::Slice dummy_key_slice(dummy_key_data.data(),
328 dummy_key_data.size());
329 batch.Put(dummy_key_slice, dummy_key_slice);
330 batch.Delete(dummy_key_slice);
333 std::lock_guard<std::mutex> l(m_db_mut);
334 const auto status = m_db->Write(sync_opts, &batch);
auto next_slot() const -> uint64_t override
Return the log index of the next empty log entry.
auto term_at(uint64_t index) -> uint64_t override
Return the log term associated with the log entry at the given index.
auto load(const std::string &db_dir) -> bool
Load the log store from the given LevelDB database directory.
auto pack(uint64_t index, int32_t cnt) -> nuraft::ptr< nuraft::buffer > override
Serialize the given number of log entries from the given index.
auto flush() -> bool override
Flush any buffered writes to disk.
auto start_index() const -> uint64_t override
Return the first log index stored by the log store.
void apply_pack(uint64_t index, nuraft::buffer &pack) override
Deserialize the given log entries and write them starting at the given log index.
void write_at(uint64_t index, nuraft::ptr< nuraft::log_entry > &entry) override
Write a log entry at the given index.
auto compact(uint64_t last_log_index) -> bool override
Delete log entries from the start of the log up to the given log index.
auto last_entry() const -> nuraft::ptr< nuraft::log_entry > override
Return the last log entry in the log store.
auto append(nuraft::ptr< nuraft::log_entry > &entry) -> uint64_t override
Append the given log entry to the end of the log.
nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > log_entries_t
List of log entries.
auto entry_at(uint64_t index) -> nuraft::ptr< nuraft::log_entry > override
Return the log entry at the given index.
auto log_entries(uint64_t start, uint64_t end) -> log_entries_t override
Return the log entries in the given range of indices.
auto get_value_slice(nuraft::ptr< nuraft::log_entry > &entry) -> data_slice
auto get_first_or_last_index(leveldb::DB *db, const leveldb::ReadOptions &opt) -> uint64_t
auto log_entry_from_slice(const leveldb::Slice &slice) -> nuraft::ptr< nuraft::log_entry >
auto get_key_slice(uint64_t key) -> data_slice
std::pair< leveldb::Slice, std::vector< char > > data_slice