10#include <leveldb/write_batch.h> 
   11#include <libnuraft/buffer_serializer.hxx> 
   16                                 const leveldb::ReadOptions& opt) -> uint64_t {
 
   17        auto it = std::unique_ptr<leveldb::Iterator>(db->NewIterator(opt));
 
   31        const auto last_log_key_slice = it->key();
 
   34        assert(last_log_key_slice.size() == 
sizeof(ret));
 
   35        std::memcpy(&ret, last_log_key_slice.data(), 
sizeof(ret));
 
 
   41        m_write_opt.sync = 
false;
 
   44        opt.create_if_missing = 
true;
 
   45        opt.comparator = &m_cmp;
 
   47        leveldb::DB* db_ptr{};
 
   48        const auto res = leveldb::DB::Open(opt, db_dir, &db_ptr);
 
   54            std::lock_guard<std::mutex> l(m_db_mut);
 
 
   67        std::lock_guard<std::mutex> l(m_db_mut);
 
 
   72        std::lock_guard<std::mutex> l(m_db_mut);
 
 
   77        -> nuraft::ptr<nuraft::log_entry> {
 
   78        auto buf = nuraft::buffer::alloc(slice.size());
 
   79        std::memcpy(buf->data_begin(), slice.data(), buf->size());
 
   80        auto entry = nuraft::log_entry::deserialize(*buf);
 
 
   88            std::lock_guard<std::mutex> l(m_db_mut);
 
   89            auto it = std::unique_ptr<leveldb::Iterator>(
 
   90                m_db->NewIterator(m_read_opt));
 
   96                    = nuraft::cs_new<nuraft::log_entry>(0, 
nullptr);
 
  100            const auto last_entry_slice = it->value();
 
 
  107    using data_slice = std::pair<leveldb::Slice, std::vector<char>>;
 
  110        std::vector<char> key_arr(
sizeof(key));
 
  111        std::memcpy(key_arr.data(), &key, key_arr.size());
 
  112        auto slice = leveldb::Slice(key_arr.data(), key_arr.size());
 
  113        return std::make_pair(slice, std::move(key_arr));
 
 
  117        const auto buf = entry->serialize();
 
  118        std::vector<char> value_buf(buf->size());
 
  119        std::memcpy(value_buf.data(), buf->data_begin(), value_buf.size());
 
  120        auto slice = leveldb::Slice(value_buf.data(), value_buf.size());
 
  121        return std::make_pair(slice, std::move(value_buf));
 
 
  128            std::lock_guard<std::mutex> l(m_db_mut);
 
  130            const auto status = m_db->Put(m_write_opt, key.first, value.first);
 
  134            return m_next_idx - 1;
 
 
  139                             nuraft::ptr<nuraft::log_entry>& entry) {
 
  142        leveldb::WriteBatch batch;
 
  143        batch.Put(key.first, value.first);
 
  146            std::lock_guard<std::mutex> l(m_db_mut);
 
  148            std::vector<data_slice::second_type> data_slices(m_next_idx
 
  150            for(uint64_t i = index + 1; i < m_next_idx; i++) {
 
  152                batch.Delete(del_key.first);
 
  153                data_slices[i - index] = std::move(del_key.second);
 
  156            const auto status = m_db->Write(m_write_opt, &batch);
 
  159            m_next_idx = index + 1;
 
 
  166        auto ret = nuraft::cs_new<log_entries_t::element_type>(end - start);
 
  169            std::lock_guard<std::mutex> l(m_db_mut);
 
  170            auto it = std::unique_ptr<leveldb::Iterator>(
 
  171                m_db->NewIterator(m_read_opt));
 
  173            it->Seek(first_key.first);
 
  175            for(
size_t i{0}; i < ret->size(); [&]() {
 
  180                const auto val_slice = it->value();
 
  183                (*ret)[i] = std::move(entry);
 
 
  191        -> nuraft::ptr<nuraft::log_entry> {
 
  196            std::lock_guard<std::mutex> l(m_db_mut);
 
  197            const auto status = m_db->Get(m_read_opt, key.first, &val);
 
  199                assert(status.IsNotFound());
 
  201                    = nuraft::cs_new<nuraft::log_entry>(0, 
nullptr);
 
  206        const auto val_slice = leveldb::Slice(val.data(), val.size());
 
 
  212        const auto entry = entry_at(index);
 
  213        return entry->get_term();
 
 
  217        -> nuraft::ptr<nuraft::buffer> {
 
  220            = log_entries(index, index + 
static_cast<uint64_t
>(cnt));
 
  222        std::vector<nuraft::ptr<nuraft::buffer>> bufs(
 
  223            static_cast<size_t>(cnt));
 
  227        for(
const auto& entry : *entries) {
 
  228            auto buf = entry->serialize();
 
  229            total_len += buf->size();
 
  230            bufs[i] = std::move(buf);
 
  234        auto ret = nuraft::buffer::alloc(
 
  235            sizeof(uint64_t) + 
static_cast<size_t>(cnt) * 
sizeof(uint64_t)
 
  237        nuraft::buffer_serializer bs(ret);
 
  239        bs.put_u64(
static_cast<uint64_t
>(cnt));
 
  241        for(
const auto& buf : bufs) {
 
  242            bs.put_u64(buf->size());
 
  243            bs.put_raw(buf->data_begin(), buf->size());
 
 
  250        nuraft::buffer_serializer bs(
pack);
 
  252        const auto cnt = bs.get_u64();
 
  254        std::vector<nuraft::ptr<nuraft::log_entry>> entries(cnt);
 
  256        for(
size_t i{0}; i < cnt; i++) {
 
  257            const auto len = bs.get_u64();
 
  258            auto buf = nuraft::buffer::alloc(len);
 
  260            auto entry = nuraft::log_entry::deserialize(*buf);
 
  262            entries[i] = std::move(entry);
 
  265        std::vector<data_slice::second_type> data_slices(entries.size() * 2);
 
  266        leveldb::WriteBatch batch;
 
  267        for(
size_t i{0}; i < entries.size(); i++) {
 
  270            batch.Put(key.first, val.first);
 
  274            data_slices[i * 2] = std::move(key.second);
 
  275            data_slices[(i * 2) + 1] = std::move(val.second);
 
  279            std::lock_guard<std::mutex> l(m_db_mut);
 
  280            const auto status = m_db->Write(m_write_opt, &batch);
 
 
  291        leveldb::WriteBatch batch;
 
  294            std::lock_guard<std::mutex> l(m_db_mut);
 
  296            const auto n_elems = last_log_index - m_start_idx + 1;
 
  297            std::vector<data_slice::second_type> data_slices(n_elems);
 
  298            for(uint64_t i{m_start_idx}; i <= last_log_index; i++) {
 
  300                batch.Delete(key.first);
 
  301                data_slices[i - m_start_idx] = std::move(key.second);
 
  304            const auto status = m_db->Write(m_write_opt, &batch);
 
  307            m_start_idx = last_log_index + 1;
 
  308            m_next_idx = std::max(m_next_idx, m_start_idx);
 
 
  319        leveldb::WriteOptions sync_opts;
 
  320        sync_opts.sync = 
true;
 
  322        leveldb::WriteBatch batch;
 
  326        std::array<char, 
sizeof(uint64_t)> dummy_key_data{};
 
  327        leveldb::Slice dummy_key_slice(dummy_key_data.data(),
 
  328                                       dummy_key_data.size());
 
  329        batch.Put(dummy_key_slice, dummy_key_slice);
 
  330        batch.Delete(dummy_key_slice);
 
  333            std::lock_guard<std::mutex> l(m_db_mut);
 
  334            const auto status = m_db->Write(sync_opts, &batch);
 
 
auto next_slot() const -> uint64_t override
Return the log index of the next empty log entry.
auto term_at(uint64_t index) -> uint64_t override
Return the log term associated with the log entry at the given index.
auto load(const std::string &db_dir) -> bool
Load the log store from the given LevelDB database directory.
auto pack(uint64_t index, int32_t cnt) -> nuraft::ptr< nuraft::buffer > override
Serialize the given number of log entries from the given index.
auto flush() -> bool override
Flush any buffered writes to disk.
auto start_index() const -> uint64_t override
Return the first log index stored by the log store.
void apply_pack(uint64_t index, nuraft::buffer &pack) override
Deserialize the given log entries and write them starting at the given log index.
void write_at(uint64_t index, nuraft::ptr< nuraft::log_entry > &entry) override
Write a log entry at the given index.
auto compact(uint64_t last_log_index) -> bool override
Delete log entries from the start of the log up to the given log index.
auto last_entry() const -> nuraft::ptr< nuraft::log_entry > override
Return the last log entry in the log store.
auto append(nuraft::ptr< nuraft::log_entry > &entry) -> uint64_t override
Append the given log entry to the end of the log.
nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > log_entries_t
List of log entries.
auto entry_at(uint64_t index) -> nuraft::ptr< nuraft::log_entry > override
Return the log entry at the given index.
auto log_entries(uint64_t start, uint64_t end) -> log_entries_t override
Return the log entries in the given range of indices.
auto get_value_slice(nuraft::ptr< nuraft::log_entry > &entry) -> data_slice
auto get_first_or_last_index(leveldb::DB *db, const leveldb::ReadOptions &opt) -> uint64_t
auto log_entry_from_slice(const leveldb::Slice &slice) -> nuraft::ptr< nuraft::log_entry >
auto get_key_slice(uint64_t key) -> data_slice
std::pair< leveldb::Slice, std::vector< char > > data_slice