Skip to content

Commit

Permalink
Refactor 1722: Remove Composite from processing pipeline
Browse files Browse the repository at this point in the history
(cherry picked from commit ec65b8e)
  • Loading branch information
alexowens90 authored and G-D-Petrov committed Aug 13, 2024
1 parent 28feeb3 commit 1cdc263
Show file tree
Hide file tree
Showing 13 changed files with 498 additions and 554 deletions.
6 changes: 3 additions & 3 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,18 +395,18 @@ struct SegmentFunctionTask : BaseTask {

struct MemSegmentProcessingTask : BaseTask {
std::vector<std::shared_ptr<Clause>> clauses_;
Composite<EntityIds> entity_ids_;
std::vector<EntityId> entity_ids_;

explicit MemSegmentProcessingTask(
std::vector<std::shared_ptr<Clause>> clauses,
Composite<EntityIds>&& entity_ids) :
std::vector<EntityId>&& entity_ids) :
clauses_(std::move(clauses)),
entity_ids_(std::move(entity_ids)) {
}

ARCTICDB_MOVE_ONLY_DEFAULT(MemSegmentProcessingTask)

Composite<EntityIds> operator()() {
std::vector<EntityId> operator()() {
std::ranges::reverse_view reversed_clauses{clauses_};
for (const auto& clause: reversed_clauses) {
entity_ids_ = clause->process(std::move(entity_ids_));
Expand Down
531 changes: 261 additions & 270 deletions cpp/arcticdb/processing/clause.cpp

Large diffs are not rendered by default.

168 changes: 71 additions & 97 deletions cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <arcticdb/util/movable_priority_queue.hpp>
#include <arcticdb/stream/merge.hpp>
#include <arcticdb/pipeline/index_utils.hpp>
#include <arcticdb/util/composite.hpp>

#include <folly/Poly.h>
#include <folly/futures/Future.h>
Expand Down Expand Up @@ -76,14 +75,14 @@ struct IClause {
return std::move(folly::poly_call<0>(*this, ranges_and_keys, start_from));
}

[[nodiscard]] Composite<EntityIds>
process(Composite<EntityIds>&& entity_ids) const {
[[nodiscard]] std::vector<EntityId>
process(std::vector<EntityId>&& entity_ids) const {
return std::move(folly::poly_call<1>(*this, std::move(entity_ids)));
}

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
std::vector<Composite<EntityIds>>&& entity_ids) const {
return folly::poly_call<2>(*this, std::move(entity_ids));
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(
std::vector<std::vector<EntityId>>&& entity_ids_vec) const {
return folly::poly_call<2>(*this, std::move(entity_ids_vec));
}

[[nodiscard]] const ClauseInfo& clause_info() const { return folly::poly_call<3>(*this); };
Expand Down Expand Up @@ -115,13 +114,14 @@ std::vector<std::vector<size_t>> structure_by_column_slice(std::vector<RangesAnd

std::vector<std::vector<size_t>> structure_all_together(std::vector<RangesAndKey>& ranges_and_keys);

Composite<ProcessingUnit> gather_entities(std::shared_ptr<ComponentManager> component_manager,
Composite<EntityIds>&& entity_ids,
bool include_atom_keys = false,
bool include_bucket = false,
bool include_initial_expected_get_calls = false);
ProcessingUnit gather_entities(std::shared_ptr<ComponentManager> component_manager,
std::vector<EntityId>&& entity_ids,
bool include_atom_keys = false,
bool include_initial_expected_get_calls = false);

EntityIds push_entities(std::shared_ptr<ComponentManager> component_manager, ProcessingUnit&& proc);
std::vector<EntityId> push_entities(std::shared_ptr<ComponentManager> component_manager, ProcessingUnit&& proc);

std::vector<EntityId> flatten_entities(std::vector<std::vector<EntityId>>&& entity_ids_vec);

struct PassthroughClause {
ClauseInfo clause_info_;
Expand All @@ -134,12 +134,9 @@ struct PassthroughClause {
return structure_by_row_slice(ranges_and_keys, start_from);
}

[[nodiscard]] Composite<EntityIds> process(
Composite<EntityIds>&& entity_ids
) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
ARCTICDB_UNUSED std::vector<Composite<EntityIds>>&&) const {
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(ARCTICDB_UNUSED std::vector<std::vector<EntityId>>&&) const {
return std::nullopt;
}

Expand Down Expand Up @@ -176,12 +173,9 @@ struct FilterClause {
return structure_by_row_slice(ranges_and_keys, start_from);
}

[[nodiscard]] Composite<EntityIds> process(
Composite<EntityIds>&& entity_ids
) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
ARCTICDB_UNUSED std::vector<Composite<EntityIds>> &&) const {
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(ARCTICDB_UNUSED std::vector<std::vector<EntityId>> &&) const {
return std::nullopt;
}

Expand Down Expand Up @@ -229,11 +223,9 @@ struct ProjectClause {
return structure_by_row_slice(ranges_and_keys, start_from);
}

[[nodiscard]] Composite<EntityIds>
process(Composite<EntityIds>&& entity_ids) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
ARCTICDB_UNUSED std::vector<Composite<EntityIds>>&&) const {
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(ARCTICDB_UNUSED std::vector<std::vector<EntityId>>&&) const {
return std::nullopt;
}

Expand Down Expand Up @@ -276,57 +268,50 @@ struct PartitionClause {
return structure_by_row_slice(ranges_and_keys, start_from);
}

[[nodiscard]] Composite<EntityIds> process(Composite<EntityIds>&& entity_ids) const {
auto procs = gather_entities(component_manager_, std::move(entity_ids));
Composite<EntityIds> output;
procs.broadcast([&output, this](auto &proc) {
Composite<ProcessingUnit> partitioned_proc = partition_processing_segment<GrouperType, BucketizerType>(proc,
ColumnName(grouping_column_),
processing_config_.dynamic_schema_);
partitioned_proc.broadcast([&output, this](auto& p) {
output.push_back(push_entities(component_manager_, std::move(p)));
});
});
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const {
if (entity_ids.empty()) {
return {};
}
auto proc = gather_entities(component_manager_, std::move(entity_ids));
std::vector<ProcessingUnit> partitioned_procs = partition_processing_segment<GrouperType, BucketizerType>(
proc,
ColumnName(grouping_column_),
processing_config_.dynamic_schema_);
std::vector<EntityId> output;
for (auto &&partitioned_proc: partitioned_procs) {
std::vector<EntityId> proc_entity_ids = push_entities(component_manager_, std::move(partitioned_proc));
output.insert(output.end(), proc_entity_ids.begin(), proc_entity_ids.end());
}
return output;
}

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
std::vector<Composite<EntityIds>>&& entity_ids) const {
std::vector<Composite<ProcessingUnit>> input_procs;
input_procs.reserve(entity_ids.size());
for (auto&& comp: entity_ids) {
input_procs.emplace_back(gather_entities(component_manager_, std::move(comp), false, true));
}
std::unordered_map<size_t, Composite<ProcessingUnit>> partition_map;
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(std::vector<std::vector<EntityId>>&& entity_ids_vec) const {
schema::check<ErrorCode::E_COLUMN_DOESNT_EXIST>(
std::any_of(input_procs.begin(), input_procs.end(), [](const Composite<ProcessingUnit>& proc) {
return !proc.empty();
std::any_of(entity_ids_vec.cbegin(), entity_ids_vec.cend(), [](const std::vector<EntityId>& entity_ids) {
return !entity_ids.empty();
}),
"Grouping column {} does not exist or is empty", grouping_column_
);

for (auto &comp : input_procs) {
comp.broadcast([&partition_map](auto &proc) {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(proc.bucket_.has_value(),
"PartitionClause::repartition failed, all processing units must have a bucket ID");

if (partition_map.find(*proc.bucket_) == partition_map.end()) {
partition_map[*proc.bucket_] = Composite<ProcessingUnit>(std::move(proc));
} else {
partition_map[*proc.bucket_].push_back(std::move(proc));
}
});
// Some could be empty, so actual number may be lower
auto max_num_buckets = ConfigsMap::instance()->get_int("Partition.NumBuckets",
async::TaskScheduler::instance()->cpu_thread_count());
max_num_buckets = std::min(max_num_buckets, static_cast<int64_t>(std::numeric_limits<bucket_id>::max()));
// Preallocate results with expected sizes, erase later if any are empty
std::vector<std::vector<EntityId>> res(max_num_buckets);
// With an even distribution, expect each element of res to have entity_ids_vec.size() elements
for (auto& res_element: res) {
res_element.reserve(entity_ids_vec.size());
}

std::vector<Composite<EntityIds>> ret;
ret.reserve(partition_map.size());
for (auto&& [_, comp] : partition_map) {
ret.emplace_back(comp.transform([this](auto&& proc) {
return push_entities(component_manager_, std::move(proc));
}));
// Experimentation shows flattening the entities into a single vector and a single call to
// component_manager_->get is faster than not flattening and making multiple calls
auto entity_ids = flatten_entities(std::move(entity_ids_vec));
std::vector<bucket_id> buckets{component_manager_->get<bucket_id>(entity_ids)};
for (auto [idx, entity_id]: folly::enumerate(entity_ids)) {
res[buckets[idx]].emplace_back(entity_id);
}

return ret;
// Get rid of any empty buckets
std::erase_if(res, [](const std::vector<EntityId>& entity_ids) { return entity_ids.empty(); });
return res;
}

[[nodiscard]] const ClauseInfo& clause_info() const {
Expand Down Expand Up @@ -384,11 +369,9 @@ struct AggregationClause {
);
}

[[nodiscard]] Composite<EntityIds> process(Composite<EntityIds>&& entity_ids) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
ARCTICDB_UNUSED std::vector<Composite<EntityIds>>&&
) const {
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(ARCTICDB_UNUSED std::vector<std::vector<EntityId>>&&) const {
return std::nullopt;
}

Expand Down Expand Up @@ -440,11 +423,9 @@ struct ResampleClause {
std::vector<RangesAndKey>& ranges_and_keys,
size_t start_from);

[[nodiscard]] Composite<EntityIds> process(Composite<EntityIds>&& entity_ids) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
ARCTICDB_UNUSED std::vector<Composite<EntityIds>>&&
) const {
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(ARCTICDB_UNUSED std::vector<std::vector<EntityId>>&&) const {
return std::nullopt;
}

Expand Down Expand Up @@ -501,10 +482,9 @@ struct RemoveColumnPartitioningClause {
return structure_by_row_slice(ranges_and_keys, start_from);
}

[[nodiscard]] Composite<EntityIds> process(Composite<EntityIds>&& entity_ids) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
ARCTICDB_UNUSED std::vector<Composite<EntityIds>>&&) const {
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(ARCTICDB_UNUSED std::vector<std::vector<EntityId>>&&) const {
return std::nullopt;
}

Expand Down Expand Up @@ -535,10 +515,9 @@ struct SplitClause {
return structure_by_row_slice(ranges_and_keys, start_from);
}

[[nodiscard]] Composite<EntityIds> process(Composite<EntityIds>&& entity_ids) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
ARCTICDB_UNUSED std::vector<Composite<EntityIds>>&&) const {
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(ARCTICDB_UNUSED std::vector<std::vector<EntityId>>&&) const {
return std::nullopt;
}

Expand Down Expand Up @@ -568,10 +547,9 @@ struct SortClause {
return structure_by_row_slice(ranges_and_keys, start_from);
}

[[nodiscard]] Composite<EntityIds> process(Composite<EntityIds>&& entity_ids) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
ARCTICDB_UNUSED std::vector<Composite<EntityIds>>&&) const {
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(ARCTICDB_UNUSED std::vector<std::vector<EntityId>>&&) const {
return std::nullopt;
}

Expand Down Expand Up @@ -614,10 +592,9 @@ struct MergeClause {
return structure_all_together(ranges_and_keys);
}

[[nodiscard]] Composite<EntityIds> process(Composite<EntityIds>&& entity_ids) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
std::vector<Composite<EntityIds>>&& entity_ids) const;
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(std::vector<std::vector<EntityId>>&& entity_ids_vec) const;

[[nodiscard]] const ClauseInfo& clause_info() const {
return clause_info_;
Expand Down Expand Up @@ -655,10 +632,9 @@ struct ColumnStatsGenerationClause {
return structure_by_row_slice(ranges_and_keys, start_from);
}

[[nodiscard]] Composite<EntityIds> process(Composite<EntityIds>&& entity_ids) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
ARCTICDB_UNUSED std::vector<Composite<EntityIds>>&&) const {
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(ARCTICDB_UNUSED std::vector<std::vector<EntityId>>&&) const {
return std::nullopt;
}

Expand Down Expand Up @@ -721,10 +697,9 @@ struct RowRangeClause {
std::vector<RangesAndKey>& ranges_and_keys,
ARCTICDB_UNUSED size_t start_from);

[[nodiscard]] Composite<EntityIds> process(Composite<EntityIds>&& entity_ids) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
ARCTICDB_UNUSED std::vector<Composite<EntityIds>>&&) const {
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(ARCTICDB_UNUSED std::vector<std::vector<EntityId>>&&) const {
return std::nullopt;
}

Expand Down Expand Up @@ -762,10 +737,9 @@ struct DateRangeClause {
std::vector<RangesAndKey>& ranges_and_keys,
size_t start_from);

[[nodiscard]] Composite<EntityIds> process(Composite<EntityIds>&& entity_ids) const;
[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<Composite<EntityIds>>> repartition(
ARCTICDB_UNUSED std::vector<Composite<EntityIds>>&&) const {
[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(ARCTICDB_UNUSED std::vector<std::vector<EntityId>>&&) const {
return std::nullopt;
}

Expand Down
Loading

0 comments on commit 1cdc263

Please sign in to comment.