From 07cdcc39ea31be2b8651e70130e66ecb6b8a5474 Mon Sep 17 00:00:00 2001
From: msm <msm@cert.pl>
Date: Fri, 4 Oct 2024 10:28:25 +0200
Subject: [PATCH] opt8: keep runs in compressed form

---
 libursa/OnDiskDataset.cpp  |   2 +-
 libursa/OnDiskIndex.cpp    |  10 ++--
 libursa/OnDiskIndex.h      |   5 +-
 libursa/Query.cpp          |   5 +-
 libursa/QueryOptimizer.cpp |   2 +-
 libursa/QueryResult.cpp    |  22 ++++----
 libursa/QueryResult.h      |  15 +++---
 libursa/SortedRun.cpp      | 105 +++++++++++++++++++++++++++++++++----
 libursa/SortedRun.h        | 101 +++++++++++++++++++++++++----------
 libursa/Utils.h            |   6 +++
 10 files changed, 201 insertions(+), 72 deletions(-)

diff --git a/libursa/OnDiskDataset.cpp b/libursa/OnDiskDataset.cpp
index 9e9b05b..20c1d52 100644
--- a/libursa/OnDiskDataset.cpp
+++ b/libursa/OnDiskDataset.cpp
@@ -108,7 +108,7 @@ void OnDiskDataset::execute(const Query &query, ResultWriter *out,
         files_index->for_each_filename(
             [&out](const std::string &fname) { out->push_back(fname); });
     } else {
-        for (const auto &fid : result.vector()) {
+        for (const auto &fid : result.vector().decompressed()) {
             out->push_back(get_file_name(fid));
         }
     }
diff --git a/libursa/OnDiskIndex.cpp b/libursa/OnDiskIndex.cpp
index da83dd4..f151ba6 100644
--- a/libursa/OnDiskIndex.cpp
+++ b/libursa/OnDiskIndex.cpp
@@ -74,8 +74,7 @@ std::pair<uint64_t, uint64_t> OnDiskIndex::get_run_offsets(
     return std::make_pair(ptrs[0], ptrs[1]);
 }
 
-std::vector<FileId> OnDiskIndex::get_run(uint64_t ptr,
-                                         uint64_t next_ptr) const {
+SortedRun OnDiskIndex::get_run(uint64_t ptr, uint64_t next_ptr) const {
     uint64_t run_length = next_ptr - ptr;
 
     if (ptr > next_ptr || next_ptr > index_size) {
@@ -86,12 +85,11 @@ std::vector<FileId> OnDiskIndex::get_run(uint64_t ptr,
 
     std::vector<uint8_t> run_bytes(run_length);
     ndxfile.pread(run_bytes.data(), run_length, ptr);
-    return read_compressed_run(run_bytes.data(),
-                               run_bytes.data() + run_bytes.size());
+    return SortedRun(std::move(run_bytes));
 }
 
-std::vector<FileId> OnDiskIndex::query_primitive(TriGram trigram,
-                                                 QueryCounter *counter) const {
+SortedRun OnDiskIndex::query_primitive(TriGram trigram,
+                                       QueryCounter *counter) const {
     auto op = QueryOperation(counter);
     std::pair<uint64_t, uint64_t> offsets = get_run_offsets(trigram);
     return get_run(offsets.first, offsets.second);
diff --git a/libursa/OnDiskIndex.h b/libursa/OnDiskIndex.h
index e73152f..fe5bf5b 100644
--- a/libursa/OnDiskIndex.h
+++ b/libursa/OnDiskIndex.h
@@ -21,9 +21,8 @@ class OnDiskIndex {
     IndexType ntype;
 
     static constexpr uint32_t VERSION = 6;
-    std::vector<FileId> get_run(uint64_t ptr, uint64_t next_ptr) const;
-    std::vector<FileId> query_primitive(TriGram trigram,
-                                        QueryCounter *counter) const;
+    SortedRun get_run(uint64_t ptr, uint64_t next_ptr) const;
+    SortedRun query_primitive(TriGram trigram, QueryCounter *counter) const;
     std::pair<uint64_t, uint64_t> get_run_offsets(TriGram trigram) const;
 
     static void on_disk_merge_core(const std::vector<IndexMergeHelper> &indexes,
diff --git a/libursa/Query.cpp b/libursa/Query.cpp
index 29eef83..a1dea77 100644
--- a/libursa/Query.cpp
+++ b/libursa/Query.cpp
@@ -226,7 +226,6 @@ void Query::prefetch(int from_index, int howmany, bool only_last,
             if (only_last && (i + 1 != howmany)) {
                 continue;
             }
-            spdlog::debug("prefetching {}", ndx);
             prefetcher(queries[ndx].ngram);
         }
     }
@@ -260,7 +259,7 @@ QueryResult Query::run(const QueryPrimitive &primitive,
     // Case: or. Short circuits when result is already everything.
     if (type == QueryType::OR) {
         auto result = QueryResult::empty();
-        for (const auto &query : queries) {
+        for (auto &query : queries) {
             result.do_or(query.run(primitive, prefetcher, counters),
                          &counters->ors());
             if (result.is_everything()) {
@@ -276,7 +275,7 @@ QueryResult Query::run(const QueryPrimitive &primitive,
     // There is some logic duplication here and in QueryResult::do_min_of_real.
     if (type == QueryType::MIN_OF) {
         std::vector<QueryResult> results;
-        std::vector<const QueryResult *> results_ptrs;
+        std::vector<QueryResult *> results_ptrs;
         results.reserve(queries.size());
         results_ptrs.reserve(queries.size());
         int cutoff = count;
diff --git a/libursa/QueryOptimizer.cpp b/libursa/QueryOptimizer.cpp
index 5a849e0..7aa65c3 100644
--- a/libursa/QueryOptimizer.cpp
+++ b/libursa/QueryOptimizer.cpp
@@ -32,7 +32,7 @@ Query simplify_subqueries(Query &&q) {
     return std::move(Query(q.get_type(), std::move(newqueries)));
 }
 
-// This optimization simplifies trivial (one operant) operations:
+// This optimization simplifies trivial (one operand) operations:
 // AND(x) --> x
 // OR(x)  --> x
 Query flatten_trivial_operations(Query &&q, bool *changed) {
diff --git a/libursa/QueryResult.cpp b/libursa/QueryResult.cpp
index ff7ea0d..02fbbee 100644
--- a/libursa/QueryResult.cpp
+++ b/libursa/QueryResult.cpp
@@ -2,31 +2,31 @@
 
 #include <algorithm>
 
-void QueryResult::do_or(const QueryResult &other, QueryCounter *counter) {
+void QueryResult::do_or(QueryResult &&other, QueryCounter *counter) {
     auto op = QueryOperation(counter);
     if (this->is_everything() || other.is_everything()) {
         has_everything = true;
-        results = SortedRun();
+        results = std::move(SortedRun());
     } else {
         results.do_or(other.results);
     }
 }
 
-void QueryResult::do_and(const QueryResult &other, QueryCounter *counter) {
+void QueryResult::do_and(QueryResult &&other, QueryCounter *counter) {
     auto op = QueryOperation(counter);
     if (other.is_everything()) {
     } else if (this->is_everything()) {
-        results = other.results;
+        results = std::move(other.results);
         has_everything = other.has_everything;
     } else {
         results.do_and(other.results);
     }
 }
 
-QueryResult QueryResult::do_min_of_real(
-    int cutoff, const std::vector<const QueryResult *> &sources) {
-    std::vector<const SortedRun *> nontrivial_sources;
-    for (const auto *source : sources) {
+QueryResult QueryResult::do_min_of_real(int cutoff,
+                                        std::vector<QueryResult *> &sources) {
+    std::vector<SortedRun *> nontrivial_sources;
+    for (QueryResult *source : sources) {
         if (source->is_everything()) {
             cutoff -= 1;
         } else if (!source->is_empty()) {
@@ -66,9 +66,9 @@ QueryResult QueryResult::do_min_of_real(
     return QueryResult(SortedRun::pick_common(cutoff, nontrivial_sources));
 }
 
-QueryResult QueryResult::do_min_of(
-    int cutoff, const std::vector<const QueryResult *> &sources,
-    QueryCounter *counter) {
+QueryResult QueryResult::do_min_of(int cutoff,
+                                   std::vector<QueryResult *> &sources,
+                                   QueryCounter *counter) {
     // TODO: sources can be mutable here, to save us some copies later.
     QueryOperation op(counter);
     QueryResult out{do_min_of_real(cutoff, sources)};
diff --git a/libursa/QueryResult.h b/libursa/QueryResult.h
index e370b2a..94080dd 100644
--- a/libursa/QueryResult.h
+++ b/libursa/QueryResult.h
@@ -15,8 +15,8 @@ class QueryResult {
 
     QueryResult() : results{}, has_everything{true} {}
 
-    static QueryResult do_min_of_real(
-        int cutoff, const std::vector<const QueryResult *> &sources);
+    static QueryResult do_min_of_real(int cutoff,
+                                      std::vector<QueryResult *> &sources);
 
    public:
     QueryResult(QueryResult &&other) = default;
@@ -28,12 +28,12 @@ class QueryResult {
 
     static QueryResult everything() { return QueryResult(); }
 
-    void do_or(const QueryResult &other, QueryCounter *counter);
-    void do_and(const QueryResult &other, QueryCounter *counter);
+    void do_or(QueryResult &&other, QueryCounter *counter);
+    void do_and(QueryResult &&other, QueryCounter *counter);
 
-    static QueryResult do_min_of(
-        int cutoff, const std::vector<const QueryResult *> &sources,
-        QueryCounter *counter);
+    static QueryResult do_min_of(int cutoff,
+                                 std::vector<QueryResult *> &sources,
+                                 QueryCounter *counter);
 
     // If true, means that QueryResults represents special "uninitialized"
     // value, "set of all FileIds in DataSet".
@@ -44,4 +44,5 @@ class QueryResult {
     bool is_empty() const { return !has_everything && results.empty(); }
 
     const SortedRun &vector() const { return results; }
+    SortedRun &vector() { return results; }
 };
diff --git a/libursa/SortedRun.cpp b/libursa/SortedRun.cpp
index bd90027..e269aaf 100644
--- a/libursa/SortedRun.cpp
+++ b/libursa/SortedRun.cpp
@@ -1,23 +1,101 @@
 #include "SortedRun.h"
 
 #include <algorithm>
+#include <stdexcept>
 
-void SortedRun::do_or(const SortedRun &other) {
+#include "Utils.h"
+#include "spdlog/spdlog.h"
+
+uint32_t RunIterator::current() const {
+    uint64_t acc = 0;
+    uint32_t shift = 0;
+    for (uint8_t *it = pos_;; it++) {
+        uint32_t next = *it;
+        acc += (next & 0x7FU) << shift;
+        shift += 7U;
+        if ((next & 0x80U) == 0) {
+            return curr_ + acc + 1;
+        }
+    }
+}
+
+uint8_t *RunIterator::nextpos() {
+    for (uint8_t *it = pos_;; it++) {
+        if ((*it & 0x80) == 0) {
+            return it + 1;
+        }
+    }
+}
+
+void SortedRun::validate_compression(bool expected) {
+    if (!empty() && is_compressed() != expected) {
+        throw std::runtime_error("Run was in invalid compression state");
+    }
+}
+
+std::vector<uint32_t>::iterator SortedRun::begin() {
+    validate_compression(false);
+    return sequence_.begin();
+}
+
+std::vector<uint32_t>::iterator SortedRun::end() {
+    validate_compression(false);
+    return sequence_.end();
+}
+
+RunIterator SortedRun::comp_begin() {
+    validate_compression(true);
+    return RunIterator(run_.data());
+}
+
+RunIterator SortedRun::comp_end() {
+    validate_compression(true);
+    return RunIterator(run_.data() + run_.size());
+}
+
+void SortedRun::do_or(SortedRun &other) {
+    // In almost every case this is already decompressed.
+    decompress();
     std::vector<FileId> new_results;
-    std::set_union(other.begin(), other.end(), sequence_.begin(),
-                   sequence_.end(), std::back_inserter(new_results));
+    if (other.is_compressed()) {
+        // Unlikely case, in most cases both runs are already decompressed.
+        std::set_union(other.comp_begin(), other.comp_end(), begin(), end(),
+                       std::back_inserter(new_results));
+    } else {
+        std::set_union(other.begin(), other.end(), begin(), end(),
+                       std::back_inserter(new_results));
+    }
     std::swap(new_results, sequence_);
 }
 
-void SortedRun::do_and(const SortedRun &other) {
-    auto new_end =
-        std::set_intersection(other.begin(), other.end(), sequence_.begin(),
-                              sequence_.end(), sequence_.begin());
+void SortedRun::do_and(SortedRun &other) {
+    // Benchmarking shows that handling a situation where this->is_compressed()
+    // makes the code *slower*. I assume that's because of memory efficiency.
+    decompress();
+    std::vector<uint32_t>::iterator new_end;
+    if (other.is_compressed()) {
+        new_end = std::set_intersection(other.comp_begin(), other.comp_end(),
+                                        begin(), end(), begin());
+    } else {
+        new_end = std::set_intersection(other.begin(), other.end(), begin(),
+                                        end(), begin());
+    }
     sequence_.erase(new_end, sequence_.end());
 }
 
-SortedRun SortedRun::pick_common(
-    int cutoff, const std::vector<const SortedRun *> &sources) {
+void SortedRun::decompress() {
+    if (run_.empty()) {
+        // Already decompressed
+        return;
+    }
+
+    sequence_ = read_compressed_run(run_.data(), run_.data() + run_.size());
+    std::vector<uint8_t> empty;
+    run_.swap(empty);
+}
+
+SortedRun SortedRun::pick_common(int cutoff,
+                                 std::vector<SortedRun *> &sources) {
     // returns all FileIds which appear at least `cutoff` times among provided
     // `sources`
     using FileIdRange = std::pair<std::vector<FileId>::const_iterator,
@@ -27,9 +105,9 @@ SortedRun SortedRun::pick_common(
     heads.reserve(sources.size());
 
     for (auto source : sources) {
+        source->decompress();
         if (!source->empty()) {
-            heads.emplace_back(
-                std::make_pair(source->cbegin(), source->cend()));
+            heads.emplace_back(std::make_pair(source->begin(), source->end()));
         }
     }
 
@@ -70,3 +148,8 @@ SortedRun SortedRun::pick_common(
 
     return SortedRun(std::move(result));
 }
+
+const std::vector<uint32_t> &SortedRun::decompressed() {
+    decompress();
+    return sequence_;
+}
diff --git a/libursa/SortedRun.h b/libursa/SortedRun.h
index 6b886a4..07bc34c 100644
--- a/libursa/SortedRun.h
+++ b/libursa/SortedRun.h
@@ -1,45 +1,88 @@
 #include "Core.h"
+#include "spdlog/spdlog.h"
 
-class SortedRun {
-    std::vector<uint32_t> sequence_;
+// Iterate over a compressed run representation.
+// "Run" here means a sorted list of FileIDs (this name is used in the
+// codebase).  And a "compressed" run format is described in the documentation
+// "ondiskformat.md", in the "Index" section.
+class RunIterator : public std::iterator<std::forward_iterator_tag, uint32_t> {
+    typedef RunIterator iterator;
+    uint8_t *pos_;
+    int32_t curr_;
 
-   public:
-    SortedRun() : sequence_{} {}
-    SortedRun(std::vector<uint32_t> &&sequence)
-        : sequence_(std::move(sequence)) {}
+    uint32_t current() const;
+    uint8_t *nextpos();
 
-    bool empty() const { return sequence_.empty(); }
+   public:
+    RunIterator(uint8_t *run) : pos_(run), curr_(-1) { curr_ = current(); }
+    ~RunIterator() {}
 
-    bool operator==(const SortedRun &other) const {
-        return sequence_ == other.sequence_;
+    RunIterator &operator++() {
+        pos_ = nextpos();
+        curr_ = current();
+        return *this;
     }
 
-    void do_or(const SortedRun &other);
-    void do_and(const SortedRun &other);
+    uint32_t operator*() const { return curr_; }
+    bool operator!=(const iterator &rhs) const { return pos_ != rhs.pos_; }
+};
 
-    static SortedRun pick_common(int cutoff,
-                                 const std::vector<const SortedRun *> &sources);
+// This class represents a "run" - a sorted list of FileIDs. This can be
+// a list of files matching a given ngram, for example. "Sorted" here is
+// redundant - there are no unsorted runs.
+// There are two possible representations, "compressed" and "uncompressed".
+// In the "compressed" representation we store raw data in vector<uint_8>
+// and parse it on the go (see the file format documentation). The
+// "uncompressed" representation is a raw (sorted) list of FileIDs. Compressed
+// representation is useful to avoid heavy allocations for every file read.
+class SortedRun {
+    // Only one of the following vector is nonempty at any time.
+    std::vector<uint32_t> sequence_;  // Uncompressed representation
+    std::vector<uint8_t> run_;  // Compressed representation
 
-    // When you really need to clone the run - TODO remove.
-    SortedRun clone() const { return *this; }
+    // Assert that the run is compressed/uncompressed, throw otherwise.
+    void validate_compression(bool expected);
 
-    std::vector<uint32_t>::const_iterator begin() const {
-        return sequence_.begin();
-    }
+    // Returns true if this run is compressed and non-empty.
+    bool is_compressed() const { return !run_.empty(); }
 
-    std::vector<uint32_t>::const_iterator cbegin() const {
-        return sequence_.begin();
-    }
+    // Force decompression of the compressed representation (noop otherwise).
+    void decompress();
 
-    std::vector<uint32_t>::iterator begin() { return sequence_.begin(); }
+    // Iterate over the decompressed representation (throws if compressed)
+    std::vector<uint32_t>::iterator begin();
+    std::vector<uint32_t>::iterator end();
 
-    std::vector<uint32_t>::const_iterator end() const {
-        return sequence_.end();
-    }
+    // Iterate over the compressed representation (throws if decompressed)
+    RunIterator comp_begin();
+    RunIterator comp_end();
 
-    std::vector<uint32_t>::const_iterator cend() const {
-        return sequence_.end();
-    }
+    SortedRun(const SortedRun &other) = default;
+
+   public:
+    SortedRun() : sequence_{}, run_{} {}
+
+    // Create a new run (with a decompressed representation)
+    explicit SortedRun(std::vector<uint32_t> &&sequence)
+        : sequence_{std::move(sequence)} {}
+
+    // Create a new run (with a compressed representation)
+    explicit SortedRun(std::vector<uint8_t> &&run) : run_{std::move(run)} {}
+
+    SortedRun(SortedRun &&other)
+        : sequence_(other.sequence_), run_(other.run_) {}
+    SortedRun &operator=(SortedRun &&) = default;
+
+    bool empty() const { return sequence_.empty() && run_.empty(); }
+
+    void do_or(SortedRun &other);
+    void do_and(SortedRun &other);
+
+    static SortedRun pick_common(int cutoff, std::vector<SortedRun *> &sources);
+
+    // When you really need to clone the run - TODO remove.
+    SortedRun clone() const { return *this; }
 
-    std::vector<uint32_t>::iterator end() { return sequence_.end(); }
+    // Returns a reference to a vector with FileIds of the current run.
+    const std::vector<uint32_t> &decompressed();
 };
diff --git a/libursa/Utils.h b/libursa/Utils.h
index 8dfe619..baf56a9 100644
--- a/libursa/Utils.h
+++ b/libursa/Utils.h
@@ -119,9 +119,15 @@ class PosixRunWriter {
     }
 };
 
+// "Run" here means a sorted list of FileIDs.
+// A "compressed" run format is described in the documentation
+// "ondiskformat.md", in the "Index" section. It's a variable length
+// encoding that stores date more efficiently. Uncompressed format is a vector.
+// Functions here convert from one representation to another.
 uint64_t compress_run(const std::vector<FileId> &run, std::ostream &out);
 std::vector<FileId> read_compressed_run(const uint8_t *start,
                                         const uint8_t *end);
+
 std::string get_index_type_name(IndexType type);
 std::optional<IndexType> index_type_from_string(const std::string &type);