diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index 7528c81006..71c89f7a86 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -395,18 +395,18 @@ struct SegmentFunctionTask : BaseTask { struct MemSegmentProcessingTask : BaseTask { std::vector> clauses_; - Composite entity_ids_; + std::vector entity_ids_; explicit MemSegmentProcessingTask( std::vector> clauses, - Composite&& entity_ids) : + std::vector&& entity_ids) : clauses_(std::move(clauses)), entity_ids_(std::move(entity_ids)) { } ARCTICDB_MOVE_ONLY_DEFAULT(MemSegmentProcessingTask) - Composite operator()() { + std::vector operator()() { std::ranges::reverse_view reversed_clauses{clauses_}; for (const auto& clause: reversed_clauses) { entity_ids_ = clause->process(std::move(entity_ids_)); diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 4b079c0df5..3f6a2ebd18 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -12,7 +12,6 @@ #include #include -#include #include #include @@ -79,42 +78,27 @@ std::vector> structure_all_together(std::vector gather_entities(std::shared_ptr component_manager, - Composite&& entity_ids, - bool include_atom_keys, - bool include_bucket, - bool include_initial_expected_get_calls) { - return entity_ids.transform([&component_manager, include_atom_keys, include_bucket, include_initial_expected_get_calls] - (const EntityIds& entity_ids) -> ProcessingUnit { - ProcessingUnit res; - res.set_segments(component_manager->get>(entity_ids)); - res.set_row_ranges(component_manager->get>(entity_ids)); - res.set_col_ranges(component_manager->get>(entity_ids)); - - if (include_atom_keys) { - res.set_atom_keys(component_manager->get>(entity_ids)); - } - if (include_bucket) { - std::vector buckets{component_manager->get(entity_ids)}; - // Each entity_id has a bucket, but they must all be the same within one processing unit - if (buckets.size() > 0) { - internal::check( - std::adjacent_find(buckets.begin(), buckets.end(), std::not_equal_to<>() ) == buckets.end(), - "Partitioning error: segments to be processed together must be in the same bucket" - ); - res.set_bucket(buckets.at(0)); - } - } - if (include_initial_expected_get_calls) { - std::vector segment_initial_expected_get_calls; - segment_initial_expected_get_calls.reserve(entity_ids.size()); - for (auto entity_id: entity_ids) { - segment_initial_expected_get_calls.emplace_back(component_manager->get_initial_expected_get_calls>(entity_id)); - } - res.set_segment_initial_expected_get_calls(std::move(segment_initial_expected_get_calls)); +ProcessingUnit gather_entities(std::shared_ptr component_manager, + std::vector&& entity_ids, + bool include_atom_keys, + bool include_initial_expected_get_calls) { + ProcessingUnit res; + res.set_segments(component_manager->get>(entity_ids)); + res.set_row_ranges(component_manager->get>(entity_ids)); + res.set_col_ranges(component_manager->get>(entity_ids)); + + if (include_atom_keys) { + res.set_atom_keys(component_manager->get>(entity_ids)); + } + if (include_initial_expected_get_calls) { + std::vector segment_initial_expected_get_calls; + segment_initial_expected_get_calls.reserve(entity_ids.size()); + for (auto entity_id: entity_ids) { + segment_initial_expected_get_calls.emplace_back(component_manager->get_initial_expected_get_calls>(entity_id)); } - return res; - }); + res.set_segment_initial_expected_get_calls(std::move(segment_initial_expected_get_calls)); + } + return res; } /* @@ -124,10 +108,10 @@ Composite gather_entities(std::shared_ptr comp * Elements that share an index in the optional vectors of a ProcessingUnit correspond to the same entity, and so are * pushed into the component manager with the same ID. */ -EntityIds push_entities(std::shared_ptr component_manager, ProcessingUnit&& proc) { - std::optional res; +std::vector push_entities(std::shared_ptr component_manager, ProcessingUnit&& proc) { + std::optional> res; if (proc.segments_.has_value()) { - res = std::make_optional(component_manager->add(std::move(*proc.segments_))); + res = std::make_optional>(component_manager->add(std::move(*proc.segments_))); } if (proc.row_ranges_.has_value()) { res = component_manager->add(std::move(*proc.row_ranges_), res); @@ -145,10 +129,17 @@ EntityIds push_entities(std::shared_ptr component_manager, Pro return *res; } -std::vector> single_partition(std::vector> &&comps) { - std::vector> v; - v.push_back(merge_composites_shallow(std::move(comps))); - return v; +std::vector flatten_entities(std::vector>&& entity_ids_vec) { + size_t res_size = std::accumulate(entity_ids_vec.cbegin(), + entity_ids_vec.cend(), + size_t(0), + [](size_t acc, const std::vector& vec) { return acc + vec.size(); }); + std::vector res; + res.reserve(res_size); + for (const auto& entity_ids: entity_ids_vec) { + res.insert(res.end(), entity_ids.begin(), entity_ids.end()); + } + return res; } class GroupingMap { @@ -220,37 +211,36 @@ struct SegmentWrapper { } }; -Composite PassthroughClause::process(Composite&& entity_ids) const { +std::vector PassthroughClause::process(std::vector&& entity_ids) const { return std::move(entity_ids); } -Composite FilterClause::process( - Composite&& entity_ids - ) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids)); - Composite output; - procs.broadcast([&output, this](auto&& proc) { - proc.set_expression_context(expression_context_); - auto variant_data = proc.get(expression_context_->root_node_name_); - util::variant_match(variant_data, - [&proc, &output, this](util::BitSet& bitset) { - if (bitset.count() > 0) { - proc.apply_filter(std::move(bitset), optimisation_); - output.push_back(push_entities(component_manager_, std::move(proc))); - } else { - log::version().debug("Filter returned empty result"); - } - }, - [](EmptyResult) { - log::version().debug("Filter returned empty result"); - }, - [&output, &proc, this](FullResult) { - output.push_back(push_entities(component_manager_, std::move(proc))); - }, - [](const auto &) { - util::raise_rte("Expected bitset from filter clause"); - }); - }); +std::vector FilterClause::process(std::vector&& entity_ids) const { + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities(component_manager_, std::move(entity_ids)); + proc.set_expression_context(expression_context_); + auto variant_data = proc.get(expression_context_->root_node_name_); + std::vector output; + util::variant_match(variant_data, + [&proc, &output, this](util::BitSet &bitset) { + if (bitset.count() > 0) { + proc.apply_filter(std::move(bitset), optimisation_); + output = push_entities(component_manager_, std::move(proc)); + } else { + log::version().debug("Filter returned empty result"); + } + }, + [](EmptyResult) { + log::version().debug("Filter returned empty result"); + }, + [&output, &proc, this](FullResult) { + output = push_entities(component_manager_, std::move(proc)); + }, + [](const auto &) { + util::raise_rte("Expected bitset from filter clause"); + }); return output; } @@ -258,32 +248,33 @@ std::string FilterClause::to_string() const { return expression_context_ ? fmt::format("WHERE {}", expression_context_->root_node_name_.value) : ""; } -Composite ProjectClause::process(Composite&& entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids)); - Composite output; - procs.broadcast([&output, this](auto&& proc) { - proc.set_expression_context(expression_context_); - auto variant_data = proc.get(expression_context_->root_node_name_); - util::variant_match(variant_data, - [&proc, &output, this](ColumnWithStrings &col) { - - const auto data_type = col.column_->type().data_type(); - const std::string_view name = output_column_; - - proc.segments_->back()->add_column(scalar_field(data_type, name), col.column_); - ++proc.col_ranges_->back()->second; - output.push_back(push_entities(component_manager_, std::move(proc))); - }, - [&proc, &output, this](const EmptyResult&) { - if(expression_context_->dynamic_schema_) - output.push_back(push_entities(component_manager_, std::move(proc))); - else - util::raise_rte("Cannot project from empty column with static schema"); - }, - [](const auto &) { - util::raise_rte("Expected column from projection clause"); - }); - }); +std::vector ProjectClause::process(std::vector&& entity_ids) const { + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities(component_manager_, std::move(entity_ids)); + proc.set_expression_context(expression_context_); + auto variant_data = proc.get(expression_context_->root_node_name_); + std::vector output; + util::variant_match(variant_data, + [&proc, &output, this](ColumnWithStrings &col) { + + const auto data_type = col.column_->type().data_type(); + const std::string_view name = output_column_; + + proc.segments_->back()->add_column(scalar_field(data_type, name), col.column_); + ++proc.col_ranges_->back()->second; + output = push_entities(component_manager_, std::move(proc)); + }, + [&proc, &output, this](const EmptyResult &) { + if (expression_context_->dynamic_schema_) + output = push_entities(component_manager_, std::move(proc)); + else + util::raise_rte("Cannot project from empty column with static schema"); + }, + [](const auto &) { + util::raise_rte("Expected column from projection clause"); + }); return output; } @@ -324,11 +315,15 @@ AggregationClause::AggregationClause(const std::string& grouping_column, str_.append("}"); } -Composite AggregationClause::process(Composite&& entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids)).as_range(); +std::vector AggregationClause::process(std::vector&& entity_ids) const { + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities(component_manager_, std::move(entity_ids)); + auto row_slices = split_by_row_slice(std::move(proc)); // Sort procs following row range descending order, as we are going to iterate through them backwards - std::sort(std::begin(procs), std::end(procs), + std::sort(std::begin(row_slices), std::end(row_slices), [](const auto& left, const auto& right) { return left.row_ranges_->at(0)->start() >= right.row_ranges_->at(0)->start(); }); @@ -342,15 +337,15 @@ Composite AggregationClause::process(Composite&& entity_id } // Work out the common type between the processing units for the columns being aggregated - for (auto& proc: procs) { + for (auto& row_slice: row_slices) { for (auto agg_data: folly::enumerate(aggregators_data)) { // Check that segments row ranges are the same internal::check( - std::all_of(proc.row_ranges_->begin(), proc.row_ranges_->end(), [&] (const auto& row_range) {return row_range->start() == proc.row_ranges_->at(0)->start();}), + std::all_of(row_slice.row_ranges_->begin(), row_slice.row_ranges_->end(), [&] (const auto& row_range) {return row_range->start() == row_slice.row_ranges_->at(0)->start();}), "Expected all data segments in one processing unit to have the same row ranges"); auto input_column_name = aggregators_.at(agg_data.index).get_input_column_name(); - auto input_column = proc.get(input_column_name); + auto input_column = row_slice.get(input_column_name); if (std::holds_alternative(input_column)) { agg_data->add_data_type(std::get(input_column).column_->type().data_type()); } @@ -364,14 +359,14 @@ Composite AggregationClause::process(Composite&& entity_id GroupingMap grouping_map; // Iterating backwards as we are going to erase from this vector as we go along // This is to spread out deallocation of the input segments - for (auto it = procs.rbegin(); it != procs.rend(); ++it) { - auto& proc = *it; - auto partitioning_column = proc.get(ColumnName(grouping_column_)); + for (auto it = row_slices.rbegin(); it != row_slices.rend(); ++it) { + auto& row_slice = *it; + auto partitioning_column = row_slice.get(ColumnName(grouping_column_)); if (std::holds_alternative(partitioning_column)) { ColumnWithStrings col = std::get(partitioning_column); details::visit_type( col.column_->type().data_type(), - [&proc_ = proc, &grouping_map, &next_group_id, &aggregators_data, &string_pool, &col, + [&row_slice, &grouping_map, &next_group_id, &aggregators_data, &string_pool, &col, &num_unique, &grouping_data_type, this](auto data_type_tag) { using col_type_info = ScalarTypeInfo; grouping_data_type = col_type_info::data_type; @@ -457,7 +452,7 @@ Composite AggregationClause::process(Composite&& entity_id for (auto agg_data: folly::enumerate(aggregators_data)) { auto input_column_name = aggregators_.at( agg_data.index).get_input_column_name(); - auto input_column = proc_.get(input_column_name); + auto input_column = row_slice.get(input_column_name); std::optional opt_input_column; if (std::holds_alternative(input_column)) { auto column_with_strings = std::get(input_column); @@ -472,7 +467,7 @@ Composite AggregationClause::process(Composite&& entity_id } else { util::raise_rte("Expected single column from expression"); } - procs.erase(std::next(it).base()); + row_slices.erase(std::next(it).base()); } SegmentInMemory seg; auto index_col = std::make_shared(make_scalar_type(grouping_data_type), grouping_map.size(), true, false); @@ -506,7 +501,7 @@ Composite AggregationClause::process(Composite&& entity_id seg.set_string_pool(string_pool); seg.set_row_id(num_unique - 1); - return Composite(push_entities(component_manager_, ProcessingUnit(std::move(seg)))); + return push_entities(component_manager_, ProcessingUnit(std::move(seg))); } [[nodiscard]] std::string AggregationClause::to_string() const { @@ -662,10 +657,12 @@ void ResampleClause::advance_boundary_past_value(const std::vec } template -Composite ResampleClause::process(Composite&& entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids), false, false, true).as_range(); - internal::check(procs.size() == 1, "Expected a single ProcessingUnit on entry to ResampleClause::process"); - auto row_slices = split_by_row_slice(std::move(procs[0])); +std::vector ResampleClause::process(std::vector&& entity_ids) const { + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities(component_manager_, std::move(entity_ids), false, true); + auto row_slices = split_by_row_slice(std::move(proc)); // If the expected get calls for the segments in the first row slice are 2, the first bucket overlapping this row // slice is being computed by the call to process dealing with the row slices above these. Otherwise, this call // should do it @@ -721,7 +718,7 @@ Composite ResampleClause::process(Compositetype().data_type(), aggregator.get_output_column_name().value), aggregated_column); } seg.set_row_data(output_index_column->row_count() - 1); - return Composite(push_entities(component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range)))); + return push_entities(component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range))); } template @@ -822,75 +819,78 @@ std::shared_ptr ResampleClause::generate_output_index_c template struct ResampleClause; template struct ResampleClause; -[[nodiscard]] Composite RemoveColumnPartitioningClause::process(Composite&& entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids)); - Composite output; - procs.broadcast([&output, this](ProcessingUnit &proc) { - size_t min_start_row = std::numeric_limits::max(); - size_t max_end_row = 0; - size_t min_start_col = std::numeric_limits::max(); - size_t max_end_col = 0; - std::optional output_seg; - for (auto&& [idx, segment]: folly::enumerate(proc.segments_.value())) { - min_start_row = std::min(min_start_row, proc.row_ranges_->at(idx)->start()); - max_end_row = std::max(max_end_row, proc.row_ranges_->at(idx)->end()); - min_start_col = std::min(min_start_col, proc.col_ranges_->at(idx)->start()); - max_end_col = std::max(max_end_col, proc.col_ranges_->at(idx)->end()); - if (output_seg.has_value()) { - merge_string_columns(*segment, output_seg->string_pool_ptr(), false); - output_seg->concatenate(std::move(*segment), true); - } else { - output_seg = std::make_optional(std::move(*segment)); - } - } +[[nodiscard]] std::vector RemoveColumnPartitioningClause::process(std::vector&& entity_ids) const { + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities(component_manager_, std::move(entity_ids)); + size_t min_start_row = std::numeric_limits::max(); + size_t max_end_row = 0; + size_t min_start_col = std::numeric_limits::max(); + size_t max_end_col = 0; + std::optional output_seg; + for (auto&& [idx, segment]: folly::enumerate(proc.segments_.value())) { + min_start_row = std::min(min_start_row, proc.row_ranges_->at(idx)->start()); + max_end_row = std::max(max_end_row, proc.row_ranges_->at(idx)->end()); + min_start_col = std::min(min_start_col, proc.col_ranges_->at(idx)->start()); + max_end_col = std::max(max_end_col, proc.col_ranges_->at(idx)->end()); if (output_seg.has_value()) { - output.push_back(push_entities(component_manager_, ProcessingUnit(std::move(*output_seg), - RowRange{min_start_row, max_end_row}, - ColRange{min_start_col, max_end_col}))); + merge_string_columns(*segment, output_seg->string_pool_ptr(), false); + output_seg->concatenate(std::move(*segment), true); + } else { + output_seg = std::make_optional(std::move(*segment)); } - }); + } + std::vector output; + if (output_seg.has_value()) { + output = push_entities(component_manager_, + ProcessingUnit(std::move(*output_seg), + RowRange{min_start_row, max_end_row}, + ColRange{min_start_col, max_end_col})); + } return output; } -Composite SplitClause::process(Composite&& entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids)); - Composite ret; - procs.broadcast([this, &ret](auto &&p) { - auto proc = std::forward(p); - for (auto&& [idx, seg]: folly::enumerate(proc.segments_.value())) { - auto split_segs = seg->split(rows_); - size_t start_row = proc.row_ranges_->at(idx)->start(); - size_t end_row = 0; - for (auto&& split_seg : split_segs) { - end_row = start_row + split_seg.row_count(); - ret.push_back(push_entities(component_manager_, ProcessingUnit(std::move(split_seg), - RowRange(start_row, end_row), - std::move(*proc.col_ranges_->at(idx))))); - start_row = end_row; - } +std::vector SplitClause::process(std::vector&& entity_ids) const { + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities(component_manager_, std::move(entity_ids)); + std::vector ret; + for (auto&& [idx, seg]: folly::enumerate(proc.segments_.value())) { + auto split_segs = seg->split(rows_); + size_t start_row = proc.row_ranges_->at(idx)->start(); + size_t end_row = 0; + for (auto&& split_seg : split_segs) { + end_row = start_row + split_seg.row_count(); + auto new_entity_ids = push_entities(component_manager_, + ProcessingUnit(std::move(split_seg), + RowRange(start_row, end_row), + std::move(*proc.col_ranges_->at(idx)))); + ret.insert(ret.end(), new_entity_ids.begin(), new_entity_ids.end()); + start_row = end_row; } - }); + } return ret; } -Composite SortClause::process(Composite&& entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids)); - Composite output; - procs.broadcast([&output, this](auto&& proc) { - for (auto& seg: proc.segments_.value()) { - // This modifies the segment in place, which goes against the ECS principle of all entities being immutable - // Only used by SortMerge right now and so this is fine, although it would not generalise well - seg->sort(column_); - } - output.push_back(push_entities(component_manager_, std::move(proc))); - }); - return output; +std::vector SortClause::process(std::vector&& entity_ids) const { + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities(component_manager_, std::move(entity_ids)); + for (auto& seg: proc.segments_.value()) { + // This modifies the segment in place, which goes against the ECS principle of all entities being immutable + // Only used by SortMerge right now and so this is fine, although it would not generalise well + seg->sort(column_); + } + return push_entities(component_manager_, std::move(proc)); } template void merge_impl( std::shared_ptr component_manager, - Composite &ret, + std::vector>& ret, QueueType &input_streams, bool add_symbol_column, StreamId stream_id, @@ -903,7 +903,7 @@ void merge_impl( SegmentationPolicy segmentation_policy{static_cast(num_segment_rows)}; auto func = [&component_manager, &ret, &row_range, &col_range](auto &&segment) { - ret.push_back(push_entities(component_manager, ProcessingUnit{std::forward(segment), row_range, col_range})); + ret.emplace_back(push_entities(component_manager, ProcessingUnit{std::forward(segment), row_range, col_range})); }; using AggregatorType = stream::Aggregator; @@ -925,19 +925,18 @@ void merge_impl( // MergeClause receives a list of DataFrames as input and merge them into a single one where all // the rows are sorted by time stamp -Composite MergeClause::process(Composite&& entity_ids) const { +std::vector MergeClause::process(std::vector&& entity_ids) const { return std::move(entity_ids); } -std::optional>> MergeClause::repartition( - std::vector> &&comps) const { +std::optional>> MergeClause::repartition(std::vector>&& entity_ids_vec) const { // TODO this is a hack because we don't currently have a way to // specify any particular input shape unless a clause is the // first one and can use structure_for_processing. Ideally // merging should be parallel like resampling - auto entity_ids = merge_composites(std::move(comps)); - auto procs = gather_entities(component_manager_, std::move(entity_ids)); + auto entity_ids = flatten_entities(std::move(entity_ids_vec)); + auto proc = gather_entities(component_manager_, std::move(entity_ids)); auto compare = [](const std::unique_ptr &left, @@ -954,24 +953,22 @@ std::optional>> MergeClause::repartition( size_t max_end_row = 0; size_t min_start_col = std::numeric_limits::max(); size_t max_end_col = 0; - procs.broadcast([&input_streams, &min_start_row, &max_end_row, &min_start_col, &max_end_col](auto&& proc) { - for (auto&& [idx, segment]: folly::enumerate(proc.segments_.value())) { - size_t start_row = proc.row_ranges_->at(idx)->start(); - size_t end_row = proc.row_ranges_->at(idx)->end(); - min_start_row = std::min(start_row, min_start_row); - max_end_row = std::max(end_row, max_end_row); - - size_t start_col = proc.col_ranges_->at(idx)->start(); - size_t end_col = proc.col_ranges_->at(idx)->end(); - min_start_col = std::min(start_col, min_start_col); - max_end_col = std::max(end_col, max_end_col); - - input_streams.push(std::make_unique(std::move(*segment))); - } - }); + for (auto&& [idx, segment]: folly::enumerate(proc.segments_.value())) { + size_t start_row = proc.row_ranges_->at(idx)->start(); + size_t end_row = proc.row_ranges_->at(idx)->end(); + min_start_row = std::min(start_row, min_start_row); + max_end_row = std::max(end_row, max_end_row); + + size_t start_col = proc.col_ranges_->at(idx)->start(); + size_t end_col = proc.col_ranges_->at(idx)->end(); + min_start_col = std::min(start_col, min_start_col); + max_end_col = std::max(end_col, max_end_col); + + input_streams.push(std::make_unique(std::move(*segment))); + } const RowRange row_range{min_start_row, max_end_row}; const ColRange col_range{min_start_col, max_end_col}; - Composite ret; + std::vector> ret; std::visit( [this, &ret, &input_streams, &comp=compare, stream_id=stream_id_, &row_range, &col_range](auto idx, auto density) { merge_impl(component_manager_, @@ -984,15 +981,15 @@ std::optional>> MergeClause::repartition( idx, stream_descriptor_); }, index_, density_policy_); - - std::vector> output; - output.emplace_back(std::move(ret)); - return std::make_optional(std::move(output)); + return ret; } -Composite ColumnStatsGenerationClause::process(Composite&& entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids), true); +std::vector ColumnStatsGenerationClause::process(std::vector&& entity_ids) const { + internal::check( + !entity_ids.empty(), + "ColumnStatsGenerationClause::process does not make sense with no processing units"); + auto proc = gather_entities(component_manager_, std::move(entity_ids), true); std::vector aggregators_data; internal::check( static_cast(column_stats_aggregators_), @@ -1004,29 +1001,23 @@ Composite ColumnStatsGenerationClause::process(Composite&& ankerl::unordered_dense::set start_indexes; ankerl::unordered_dense::set end_indexes; - internal::check( - !procs.empty(), - "ColumnStatsGenerationClause::process does not make sense with no processing units"); - procs.broadcast( - [&start_indexes, &end_indexes, &aggregators_data, this](auto &proc) { - for (const auto& key: proc.atom_keys_.value()) { - start_indexes.insert(key->start_index()); - end_indexes.insert(key->end_index()); - } - for (auto agg_data : folly::enumerate(aggregators_data)) { - auto input_column_name = column_stats_aggregators_->at(agg_data.index).get_input_column_name(); - auto input_column = proc.get(input_column_name); - if (std::holds_alternative(input_column)) { - auto input_column_with_strings = std::get(input_column); - agg_data->aggregate(input_column_with_strings); - } else { - if (!processing_config_.dynamic_schema_) - internal::raise( - "Unable to resolve column denoted by aggregation operator: '{}'", - input_column_name); - } - } - }); + for (const auto& key: proc.atom_keys_.value()) { + start_indexes.insert(key->start_index()); + end_indexes.insert(key->end_index()); + } + for (auto agg_data : folly::enumerate(aggregators_data)) { + auto input_column_name = column_stats_aggregators_->at(agg_data.index).get_input_column_name(); + auto input_column = proc.get(input_column_name); + if (std::holds_alternative(input_column)) { + auto input_column_with_strings = std::get(input_column); + agg_data->aggregate(input_column_with_strings); + } else { + if (!processing_config_.dynamic_schema_) + internal::raise( + "Unable to resolve column denoted by aggregation operator: '{}'", + input_column_name); + } + } internal::check( start_indexes.size() == 1 && end_indexes.size() == 1, @@ -1052,7 +1043,7 @@ Composite ColumnStatsGenerationClause::process(Composite&& seg.concatenate(agg_data->finalize(column_stats_aggregators_->at(agg_data.index).get_output_column_names())); } seg.set_row_id(0); - return Composite(push_entities(component_manager_, ProcessingUnit(std::move(seg)))); + return push_entities(component_manager_, ProcessingUnit(std::move(seg))); } std::vector> RowRangeClause::structure_for_processing( @@ -1064,33 +1055,33 @@ std::vector> RowRangeClause::structure_for_processing( return structure_by_row_slice(ranges_and_keys, start_from); } -Composite RowRangeClause::process(Composite &&entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids)); - Composite output; - procs.broadcast([&output, this](ProcessingUnit &proc) { - for (auto&& [idx, row_range]: folly::enumerate(proc.row_ranges_.value())) { - if ((start_ > row_range->start() && start_ < row_range->end()) || - (end_ > row_range->start() && end_ < row_range->end())) { - // Zero-indexed within this slice - size_t start_row{0}; - size_t end_row{row_range->diff()}; - if (start_ > row_range->start() && start_ < row_range->end()) { - start_row = start_ - row_range->start(); - } - if (end_ > row_range->start() && end_ < row_range->end()) { - end_row = end_ - (row_range->start()); - } - auto truncated_segment = proc.segments_->at(idx)->truncate(start_row, end_row, false); - auto num_rows = truncated_segment.is_null() ? 0 : truncated_segment.row_count(); - proc.row_ranges_->at(idx) = std::make_shared(proc.row_ranges_->at(idx)->first, proc.row_ranges_->at(idx)->first + num_rows); - auto num_cols = truncated_segment.is_null() ? 0 : truncated_segment.descriptor().field_count() - truncated_segment.descriptor().index().field_count(); - proc.col_ranges_->at(idx) = std::make_shared(proc.col_ranges_->at(idx)->first, proc.col_ranges_->at(idx)->first + num_cols); - proc.segments_->at(idx) = std::make_shared(std::move(truncated_segment)); - } // else all rows in this segment are required, do nothing - } - output.push_back(push_entities(component_manager_, std::move(proc))); - }); - return output; +std::vector RowRangeClause::process(std::vector&& entity_ids) const { + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities(component_manager_, std::move(entity_ids)); + std::vector output; + for (auto&& [idx, row_range]: folly::enumerate(proc.row_ranges_.value())) { + if ((start_ > row_range->start() && start_ < row_range->end()) || + (end_ > row_range->start() && end_ < row_range->end())) { + // Zero-indexed within this slice + size_t start_row{0}; + size_t end_row{row_range->diff()}; + if (start_ > row_range->start() && start_ < row_range->end()) { + start_row = start_ - row_range->start(); + } + if (end_ > row_range->start() && end_ < row_range->end()) { + end_row = end_ - (row_range->start()); + } + auto truncated_segment = proc.segments_->at(idx)->truncate(start_row, end_row, false); + auto num_rows = truncated_segment.is_null() ? 0 : truncated_segment.row_count(); + proc.row_ranges_->at(idx) = std::make_shared(proc.row_ranges_->at(idx)->first, proc.row_ranges_->at(idx)->first + num_rows); + auto num_cols = truncated_segment.is_null() ? 0 : truncated_segment.descriptor().field_count() - truncated_segment.descriptor().index().field_count(); + proc.col_ranges_->at(idx) = std::make_shared(proc.col_ranges_->at(idx)->first, proc.col_ranges_->at(idx)->first + num_cols); + proc.segments_->at(idx) = std::make_shared(std::move(truncated_segment)); + } // else all rows in this segment are required, do nothing + } + return push_entities(component_manager_, std::move(proc)); } void RowRangeClause::set_processing_config(const ProcessingConfig& processing_config) { @@ -1151,27 +1142,27 @@ std::vector> DateRangeClause::structure_for_processing( return structure_by_row_slice(ranges_and_keys, start_from); } -Composite DateRangeClause::process(Composite &&entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids), true); - Composite output; - procs.broadcast([&output, this](ProcessingUnit &proc) { - // We are only interested in the index, which is in every SegmentInMemory in proc.segments_, so just use the first - auto row_range = proc.row_ranges_->at(0); - auto [start_index, end_index] = proc.atom_keys_->at(0)->time_range(); - if ((start_ > start_index && start_ < end_index) || (end_ >= start_index && end_ < end_index)) { - size_t start_row{0}; - size_t end_row{row_range->diff()}; - if (start_ > start_index && start_ < end_index) { - start_row = proc.segments_->at(0)->column_ptr(0)->search_sorted(start_); - } - if (end_ >= start_index && end_ < end_index) { - end_row = proc.segments_->at(0)->column_ptr(0)->search_sorted(end_, true); - } - proc.truncate(start_row, end_row); - } // else all rows in the processing unit are required, do nothing - output.push_back(push_entities(component_manager_, std::move(proc))); - }); - return output; +std::vector DateRangeClause::process(std::vector &&entity_ids) const { + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities(component_manager_, std::move(entity_ids), true); + std::vector output; + // We are only interested in the index, which is in every SegmentInMemory in proc.segments_, so just use the first + auto row_range = proc.row_ranges_->at(0); + auto [start_index, end_index] = proc.atom_keys_->at(0)->time_range(); + if ((start_ > start_index && start_ < end_index) || (end_ >= start_index && end_ < end_index)) { + size_t start_row{0}; + size_t end_row{row_range->diff()}; + if (start_ > start_index && start_ < end_index) { + start_row = proc.segments_->at(0)->column_ptr(0)->search_sorted(start_); + } + if (end_ >= start_index && end_ < end_index) { + end_row = proc.segments_->at(0)->column_ptr(0)->search_sorted(end_, true); + } + proc.truncate(start_row, end_row); + } // else all rows in the processing unit are required, do nothing + return push_entities(component_manager_, std::move(proc)); } std::string DateRangeClause::to_string() const { diff --git a/cpp/arcticdb/processing/clause.hpp b/cpp/arcticdb/processing/clause.hpp index 503ae1fec2..0d495e4d99 100644 --- a/cpp/arcticdb/processing/clause.hpp +++ b/cpp/arcticdb/processing/clause.hpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include @@ -76,14 +75,14 @@ struct IClause { return std::move(folly::poly_call<0>(*this, ranges_and_keys, start_from)); } - [[nodiscard]] Composite - process(Composite&& entity_ids) const { + [[nodiscard]] std::vector + process(std::vector&& entity_ids) const { return std::move(folly::poly_call<1>(*this, std::move(entity_ids))); } - [[nodiscard]] std::optional>> repartition( - std::vector>&& entity_ids) const { - return folly::poly_call<2>(*this, std::move(entity_ids)); + [[nodiscard]] std::optional>> repartition( + std::vector>&& entity_ids_vec) const { + return folly::poly_call<2>(*this, std::move(entity_ids_vec)); } [[nodiscard]] const ClauseInfo& clause_info() const { return folly::poly_call<3>(*this); }; @@ -115,13 +114,14 @@ std::vector> structure_by_column_slice(std::vector> structure_all_together(std::vector& ranges_and_keys); -Composite gather_entities(std::shared_ptr component_manager, - Composite&& entity_ids, - bool include_atom_keys = false, - bool include_bucket = false, - bool include_initial_expected_get_calls = false); +ProcessingUnit gather_entities(std::shared_ptr component_manager, + std::vector&& entity_ids, + bool include_atom_keys = false, + bool include_initial_expected_get_calls = false); -EntityIds push_entities(std::shared_ptr component_manager, ProcessingUnit&& proc); +std::vector push_entities(std::shared_ptr component_manager, ProcessingUnit&& proc); + +std::vector flatten_entities(std::vector>&& entity_ids_vec); struct PassthroughClause { ClauseInfo clause_info_; @@ -134,12 +134,9 @@ struct PassthroughClause { return structure_by_row_slice(ranges_and_keys, start_from); } - [[nodiscard]] Composite process( - Composite&& entity_ids - ) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - ARCTICDB_UNUSED std::vector>&&) const { + [[nodiscard]] std::optional>> repartition(ARCTICDB_UNUSED std::vector>&&) const { return std::nullopt; } @@ -176,12 +173,9 @@ struct FilterClause { return structure_by_row_slice(ranges_and_keys, start_from); } - [[nodiscard]] Composite process( - Composite&& entity_ids - ) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - ARCTICDB_UNUSED std::vector> &&) const { + [[nodiscard]] std::optional>> repartition(ARCTICDB_UNUSED std::vector> &&) const { return std::nullopt; } @@ -229,11 +223,9 @@ struct ProjectClause { return structure_by_row_slice(ranges_and_keys, start_from); } - [[nodiscard]] Composite - process(Composite&& entity_ids) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - ARCTICDB_UNUSED std::vector>&&) const { + [[nodiscard]] std::optional>> repartition(ARCTICDB_UNUSED std::vector>&&) const { return std::nullopt; } @@ -276,57 +268,50 @@ struct PartitionClause { return structure_by_row_slice(ranges_and_keys, start_from); } - [[nodiscard]] Composite process(Composite&& entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids)); - Composite output; - procs.broadcast([&output, this](auto &proc) { - Composite partitioned_proc = partition_processing_segment(proc, - ColumnName(grouping_column_), - processing_config_.dynamic_schema_); - partitioned_proc.broadcast([&output, this](auto& p) { - output.push_back(push_entities(component_manager_, std::move(p))); - }); - }); + [[nodiscard]] std::vector process(std::vector&& entity_ids) const { + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities(component_manager_, std::move(entity_ids)); + std::vector partitioned_procs = partition_processing_segment( + proc, + ColumnName(grouping_column_), + processing_config_.dynamic_schema_); + std::vector output; + for (auto &&partitioned_proc: partitioned_procs) { + std::vector proc_entity_ids = push_entities(component_manager_, std::move(partitioned_proc)); + output.insert(output.end(), proc_entity_ids.begin(), proc_entity_ids.end()); + } return output; } - [[nodiscard]] std::optional>> repartition( - std::vector>&& entity_ids) const { - std::vector> input_procs; - input_procs.reserve(entity_ids.size()); - for (auto&& comp: entity_ids) { - input_procs.emplace_back(gather_entities(component_manager_, std::move(comp), false, true)); - } - std::unordered_map> partition_map; + [[nodiscard]] std::optional>> repartition(std::vector>&& entity_ids_vec) const { schema::check( - std::any_of(input_procs.begin(), input_procs.end(), [](const Composite& proc) { - return !proc.empty(); + std::any_of(entity_ids_vec.cbegin(), entity_ids_vec.cend(), [](const std::vector& entity_ids) { + return !entity_ids.empty(); }), "Grouping column {} does not exist or is empty", grouping_column_ ); - - for (auto &comp : input_procs) { - comp.broadcast([&partition_map](auto &proc) { - internal::check(proc.bucket_.has_value(), - "PartitionClause::repartition failed, all processing units must have a bucket ID"); - - if (partition_map.find(*proc.bucket_) == partition_map.end()) { - partition_map[*proc.bucket_] = Composite(std::move(proc)); - } else { - partition_map[*proc.bucket_].push_back(std::move(proc)); - } - }); + // Some could be empty, so actual number may be lower + auto max_num_buckets = ConfigsMap::instance()->get_int("Partition.NumBuckets", + async::TaskScheduler::instance()->cpu_thread_count()); + max_num_buckets = std::min(max_num_buckets, static_cast(std::numeric_limits::max())); + // Preallocate results with expected sizes, erase later if any are empty + std::vector> res(max_num_buckets); + // With an even distribution, expect each element of res to have entity_ids_vec.size() elements + for (auto& res_element: res) { + res_element.reserve(entity_ids_vec.size()); } - - std::vector> ret; - ret.reserve(partition_map.size()); - for (auto&& [_, comp] : partition_map) { - ret.emplace_back(comp.transform([this](auto&& proc) { - return push_entities(component_manager_, std::move(proc)); - })); + // Experimentation shows flattening the entities into a single vector and a single call to + // component_manager_->get is faster than not flattening and making multiple calls + auto entity_ids = flatten_entities(std::move(entity_ids_vec)); + std::vector buckets{component_manager_->get(entity_ids)}; + for (auto [idx, entity_id]: folly::enumerate(entity_ids)) { + res[buckets[idx]].emplace_back(entity_id); } - - return ret; + // Get rid of any empty buckets + std::erase_if(res, [](const std::vector& entity_ids) { return entity_ids.empty(); }); + return res; } [[nodiscard]] const ClauseInfo& clause_info() const { @@ -384,11 +369,9 @@ struct AggregationClause { ); } - [[nodiscard]] Composite process(Composite&& entity_ids) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - ARCTICDB_UNUSED std::vector>&& - ) const { + [[nodiscard]] std::optional>> repartition(ARCTICDB_UNUSED std::vector>&&) const { return std::nullopt; } @@ -440,11 +423,9 @@ struct ResampleClause { std::vector& ranges_and_keys, size_t start_from); - [[nodiscard]] Composite process(Composite&& entity_ids) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - ARCTICDB_UNUSED std::vector>&& - ) const { + [[nodiscard]] std::optional>> repartition(ARCTICDB_UNUSED std::vector>&&) const { return std::nullopt; } @@ -501,10 +482,9 @@ struct RemoveColumnPartitioningClause { return structure_by_row_slice(ranges_and_keys, start_from); } - [[nodiscard]] Composite process(Composite&& entity_ids) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - ARCTICDB_UNUSED std::vector>&&) const { + [[nodiscard]] std::optional>> repartition(ARCTICDB_UNUSED std::vector>&&) const { return std::nullopt; } @@ -535,10 +515,9 @@ struct SplitClause { return structure_by_row_slice(ranges_and_keys, start_from); } - [[nodiscard]] Composite process(Composite&& entity_ids) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - ARCTICDB_UNUSED std::vector>&&) const { + [[nodiscard]] std::optional>> repartition(ARCTICDB_UNUSED std::vector>&&) const { return std::nullopt; } @@ -568,10 +547,9 @@ struct SortClause { return structure_by_row_slice(ranges_and_keys, start_from); } - [[nodiscard]] Composite process(Composite&& entity_ids) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - ARCTICDB_UNUSED std::vector>&&) const { + [[nodiscard]] std::optional>> repartition(ARCTICDB_UNUSED std::vector>&&) const { return std::nullopt; } @@ -614,10 +592,9 @@ struct MergeClause { return structure_all_together(ranges_and_keys); } - [[nodiscard]] Composite process(Composite&& entity_ids) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - std::vector>&& entity_ids) const; + [[nodiscard]] std::optional>> repartition(std::vector>&& entity_ids_vec) const; [[nodiscard]] const ClauseInfo& clause_info() const { return clause_info_; @@ -655,10 +632,9 @@ struct ColumnStatsGenerationClause { return structure_by_row_slice(ranges_and_keys, start_from); } - [[nodiscard]] Composite process(Composite&& entity_ids) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - ARCTICDB_UNUSED std::vector>&&) const { + [[nodiscard]] std::optional>> repartition(ARCTICDB_UNUSED std::vector>&&) const { return std::nullopt; } @@ -721,10 +697,9 @@ struct RowRangeClause { std::vector& ranges_and_keys, ARCTICDB_UNUSED size_t start_from); - [[nodiscard]] Composite process(Composite&& entity_ids) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - ARCTICDB_UNUSED std::vector>&&) const { + [[nodiscard]] std::optional>> repartition(ARCTICDB_UNUSED std::vector>&&) const { return std::nullopt; } @@ -762,10 +737,9 @@ struct DateRangeClause { std::vector& ranges_and_keys, size_t start_from); - [[nodiscard]] Composite process(Composite&& entity_ids) const; + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] std::optional>> repartition( - ARCTICDB_UNUSED std::vector>&&) const { + [[nodiscard]] std::optional>> repartition(ARCTICDB_UNUSED std::vector>&&) const { return std::nullopt; } diff --git a/cpp/arcticdb/processing/component_manager.hpp b/cpp/arcticdb/processing/component_manager.hpp index 8689b51795..6b7d6e22c5 100644 --- a/cpp/arcticdb/processing/component_manager.hpp +++ b/cpp/arcticdb/processing/component_manager.hpp @@ -8,7 +8,6 @@ #pragma once #include -#include #include #include @@ -16,8 +15,7 @@ namespace arcticdb { using namespace pipelines; -using EntityId = uint64_t; -using EntityIds = std::vector; +using EntityId = size_t; using bucket_id = uint8_t; class ComponentManager { @@ -28,8 +26,8 @@ class ComponentManager { void set_next_entity_id(EntityId id); template - EntityIds add(std::vector&& components, const std::optional& ids=std::nullopt) { - EntityIds insertion_ids; + std::vector add(std::vector&& components, const std::optional>& ids=std::nullopt) { + std::vector insertion_ids; if (ids.has_value()) { insertion_ids = *ids; } else { @@ -79,7 +77,7 @@ class ComponentManager { } template - std::vector get(const EntityIds& ids) { + std::vector get(const std::vector& ids) { if constexpr(std::is_same_v>) { return segment_map_.get(ids); } else if constexpr(std::is_same_v>) { @@ -115,12 +113,12 @@ class ComponentManager { public: explicit ComponentMap(std::string&& entity_type, bool track_expected_gets): entity_type_(std::move(entity_type)), - opt_expected_get_calls_map_(track_expected_gets ? std::make_optional>() : std::nullopt), - opt_expected_get_calls_initial_map_(track_expected_gets ? std::make_optional>() : std::nullopt){ + opt_expected_get_calls_map_(track_expected_gets ? std::make_optional>() : std::nullopt), + opt_expected_get_calls_initial_map_(track_expected_gets ? std::make_optional>() : std::nullopt){ }; ARCTICDB_NO_MOVE_OR_COPY(ComponentMap) - void add(const EntityIds& ids, + void add(const std::vector& ids, std::vector&& entities) { std::lock_guard lock(mtx_); for (auto [idx, id]: folly::enumerate(ids)) { @@ -158,7 +156,7 @@ class ComponentManager { entity_type_, id); } } - std::vector get(const EntityIds& ids) { + std::vector get(const std::vector& ids) { std::vector res; res.reserve(ids.size()); std::lock_guard lock(mtx_); @@ -200,11 +198,11 @@ class ComponentManager { private: // Just used for logging/exception messages std::string entity_type_; - std::unordered_map map_; + ankerl::unordered_dense::map map_; // If not nullopt, tracks the number of calls to get for each entity id, and erases from maps when it has been // called this many times - std::optional> opt_expected_get_calls_map_; - std::optional> opt_expected_get_calls_initial_map_; + std::optional> opt_expected_get_calls_map_; + std::optional> opt_expected_get_calls_initial_map_; std::mutex mtx_; }; diff --git a/cpp/arcticdb/processing/processing_unit.cpp b/cpp/arcticdb/processing/processing_unit.cpp index 2e9e410523..a384827f6b 100644 --- a/cpp/arcticdb/processing/processing_unit.cpp +++ b/cpp/arcticdb/processing/processing_unit.cpp @@ -109,29 +109,42 @@ std::vector split_by_row_slice(ProcessingUnit&& proc) { internal::check(input.col_ranges_.has_value(), "split_by_row_slice needs ColRanges"); auto include_expected_get_calls = input.segment_initial_expected_get_calls_.has_value(); - std::map output_map; - for (auto [idx, row_range_ptr]: folly::enumerate(*input.row_ranges_)) { - if (auto it = output_map.find(*row_range_ptr); it != output_map.end()) { - it->second.segments_->emplace_back(input.segments_->at(idx)); - it->second.row_ranges_->emplace_back(input.row_ranges_->at(idx)); - it->second.col_ranges_->emplace_back(input.col_ranges_->at(idx)); + std::vector output; + // Some clauses (e.g. AggregationClause) are lossy about row-ranges. We can assume that if all of the input column + // ranges start with zero, that every segment belongs to a different logical row-slice + if (std::all_of(input.col_ranges_->begin(), input.col_ranges_->end(), [](const auto& col_range) { return col_range->start() == 0; })) { + output.reserve(input.segments_->size()); + for (size_t idx = 0; idx < input.segments_->size(); ++idx) { + ProcessingUnit proc_tmp(std::move(*input.segments_->at(idx)), std::move(*input.row_ranges_->at(idx)), std::move(*input.col_ranges_->at(idx))); if (include_expected_get_calls) { - it->second.segment_initial_expected_get_calls_->emplace_back(input.segment_initial_expected_get_calls_->at(idx)); + proc_tmp.set_segment_initial_expected_get_calls({input.segment_initial_expected_get_calls_->at(idx)}); } - } else { - auto [inserted_it, _] = output_map.emplace(*row_range_ptr, ProcessingUnit{}); - inserted_it->second.segments_.emplace(1, input.segments_->at(idx)); - inserted_it->second.row_ranges_.emplace(1, input.row_ranges_->at(idx)); - inserted_it->second.col_ranges_.emplace(1, input.col_ranges_->at(idx)); - if (include_expected_get_calls) { - inserted_it->second.segment_initial_expected_get_calls_.emplace(1, input.segment_initial_expected_get_calls_->at(idx)); + output.emplace_back(std::move(proc_tmp)); + } + } else { + std::map output_map; + for (auto [idx, row_range_ptr]: folly::enumerate(*input.row_ranges_)) { + if (auto it = output_map.find(*row_range_ptr); it != output_map.end()) { + it->second.segments_->emplace_back(input.segments_->at(idx)); + it->second.row_ranges_->emplace_back(input.row_ranges_->at(idx)); + it->second.col_ranges_->emplace_back(input.col_ranges_->at(idx)); + if (include_expected_get_calls) { + it->second.segment_initial_expected_get_calls_->emplace_back(input.segment_initial_expected_get_calls_->at(idx)); + } + } else { + auto [inserted_it, _] = output_map.emplace(*row_range_ptr, ProcessingUnit{}); + inserted_it->second.segments_.emplace(1, input.segments_->at(idx)); + inserted_it->second.row_ranges_.emplace(1, input.row_ranges_->at(idx)); + inserted_it->second.col_ranges_.emplace(1, input.col_ranges_->at(idx)); + if (include_expected_get_calls) { + inserted_it->second.segment_initial_expected_get_calls_.emplace(1, input.segment_initial_expected_get_calls_->at(idx)); + } } } - } - std::vector output; - output.reserve(output_map.size()); - for (auto&& [_, processing_unit]: output_map) { - output.emplace_back(std::move(processing_unit)); + output.reserve(output_map.size()); + for (auto &&[_, processing_unit]: output_map) { + output.emplace_back(std::move(processing_unit)); + } } internal::check(!output.empty(), "Unexpected empty output in split_by_row_slice"); diff --git a/cpp/arcticdb/processing/processing_unit.hpp b/cpp/arcticdb/processing/processing_unit.hpp index d97b651127..606e0e65c4 100644 --- a/cpp/arcticdb/processing/processing_unit.hpp +++ b/cpp/arcticdb/processing/processing_unit.hpp @@ -110,19 +110,14 @@ namespace arcticdb { std::vector split_by_row_slice(ProcessingUnit&& proc); - inline std::vector collect_segments(Composite&& p) { - auto procs = std::move(p); + inline std::vector collect_segments(ProcessingUnit&& proc) { std::vector output; - - procs.broadcast([&output] (auto&& p) { - auto proc = std::forward(p); - internal::check(proc.segments_.has_value() && proc.row_ranges_.has_value() && proc.col_ranges_.has_value(), - "collect_segments requires all of segments, row_ranges, and col_ranges to be present"); - for (auto&& [idx, segment]: folly::enumerate(*proc.segments_)) { - pipelines::FrameSlice frame_slice(*proc.col_ranges_->at(idx), *proc.row_ranges_->at(idx)); - output.emplace_back(std::move(*segment), std::move(frame_slice)); - } - }); + internal::check(proc.segments_.has_value() && proc.row_ranges_.has_value() && proc.col_ranges_.has_value(), + "collect_segments requires all of segments, row_ranges, and col_ranges to be present"); + for (auto&& [idx, segment]: folly::enumerate(*proc.segments_)) { + pipelines::FrameSlice frame_slice(*proc.col_ranges_->at(idx), *proc.row_ranges_->at(idx)); + output.emplace_back(std::move(*segment), std::move(frame_slice)); + } return output; } @@ -169,12 +164,12 @@ namespace arcticdb { } template - Composite partition_processing_segment( + std::vector partition_processing_segment( ProcessingUnit& input, const ColumnName& grouping_column_name, bool dynamic_schema) { - Composite output; + std::vector output; auto get_result = input.get(ColumnName(grouping_column_name)); if (std::holds_alternative(get_result)) { auto partitioning_column = std::get(get_result); @@ -216,7 +211,7 @@ namespace arcticdb { for (auto&& [idx, proc]: folly::enumerate(procs)) { if (bucket_counts.at(idx) > 0) { proc.bucket_ = idx; - output.push_back(std::move(proc)); + output.emplace_back(std::move(proc)); } } } diff --git a/cpp/arcticdb/processing/test/benchmark_clause.cpp b/cpp/arcticdb/processing/test/benchmark_clause.cpp index 170b9daf88..bf5c0a061e 100644 --- a/cpp/arcticdb/processing/test/benchmark_clause.cpp +++ b/cpp/arcticdb/processing/test/benchmark_clause.cpp @@ -40,10 +40,10 @@ void time_merge_on_segments(const std::vector &segments, benchm // Pauses the timing while setting up the merge clause to only time the merging itself state.PauseTiming(); auto component_manager = std::make_shared(); - Composite entity_ids; + std::vector entity_ids; for (auto& segment : segments){ auto proc_unit = ProcessingUnit{segment.clone()}; - entity_ids.push_back(push_entities(component_manager, std::move(proc_unit))); + entity_ids.push_back(push_entities(component_manager, std::move(proc_unit))[0]); } auto stream_id = StreamId("Merge"); diff --git a/cpp/arcticdb/processing/test/test_clause.cpp b/cpp/arcticdb/processing/test/test_clause.cpp index ab46c9f934..8b5fdf9bdd 100644 --- a/cpp/arcticdb/processing/test/test_clause.cpp +++ b/cpp/arcticdb/processing/test/test_clause.cpp @@ -52,10 +52,10 @@ TEST(Clause, PartitionEmptyColumn) { partition.set_component_manager(component_manager); auto proc_unit = ProcessingUnit{generate_groupby_testing_empty_segment(100, 10)}; - auto entity_ids = Composite(push_entities(component_manager, std::move(proc_unit))); - auto partitioned = gather_entities(component_manager, partition.process(std::move(entity_ids))); + auto entity_ids = push_entities(component_manager, std::move(proc_unit)); + auto processed = partition.process(std::move(entity_ids)); - ASSERT_TRUE(partitioned.empty()); + ASSERT_TRUE(processed.empty()); } TEST(Clause, AggregationEmptyColumn) { @@ -73,12 +73,11 @@ TEST(Clause, AggregationEmptyColumn) { size_t num_rows{100}; size_t unique_grouping_values{10}; auto proc_unit = ProcessingUnit{generate_groupby_testing_segment(num_rows, unique_grouping_values)}; - auto entity_ids = Composite(push_entities(component_manager, std::move(proc_unit))); + auto entity_ids = push_entities(component_manager, std::move(proc_unit)); - auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))).as_range(); - ASSERT_EQ(1, aggregated.size()); - ASSERT_TRUE(aggregated[0].segments_.has_value()); - auto segments = aggregated[0].segments_.value(); + auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))); + ASSERT_TRUE(aggregated.segments_.has_value()); + auto segments = aggregated.segments_.value(); ASSERT_EQ(1, segments.size()); auto segment = segments[0]; @@ -148,12 +147,11 @@ TEST(Clause, AggregationColumn) size_t num_rows{100}; size_t unique_grouping_values{10}; auto proc_unit = ProcessingUnit{generate_groupby_testing_segment(num_rows, unique_grouping_values)}; - auto entity_ids = Composite(push_entities(component_manager, std::move(proc_unit))); + auto entity_ids = push_entities(component_manager, std::move(proc_unit)); - auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))).as_range(); - ASSERT_EQ(1, aggregated.size()); - ASSERT_TRUE(aggregated[0].segments_.has_value()); - auto segments = aggregated[0].segments_.value(); + auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))); + ASSERT_TRUE(aggregated.segments_.has_value()); + auto segments = aggregated.segments_.value(); ASSERT_EQ(1, segments.size()); using aggregation_test::check_column; @@ -180,12 +178,11 @@ TEST(Clause, AggregationSparseColumn) size_t num_rows{100}; size_t unique_grouping_values{10}; auto proc_unit = ProcessingUnit{generate_groupby_testing_sparse_segment(num_rows, unique_grouping_values)}; - auto entity_ids = Composite(push_entities(component_manager, std::move(proc_unit))); + auto entity_ids = push_entities(component_manager, std::move(proc_unit)); - auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))).as_range(); - ASSERT_EQ(1, aggregated.size()); - ASSERT_TRUE(aggregated[0].segments_.has_value()); - auto segments = aggregated[0].segments_.value(); + auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))); + ASSERT_TRUE(aggregated.segments_.has_value()); + auto segments = aggregated.segments_.value(); ASSERT_EQ(1, segments.size()); using aggregation_test::check_column; @@ -245,12 +242,11 @@ TEST(Clause, AggregationSparseGroupby) { // 1 more group because of missing values size_t unique_groups{unique_grouping_values + 1}; auto proc_unit = ProcessingUnit{generate_sparse_groupby_testing_segment(num_rows, unique_grouping_values)}; - auto entity_ids = Composite(push_entities(component_manager, std::move(proc_unit))); + auto entity_ids = push_entities(component_manager, std::move(proc_unit)); - auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))).as_range(); - ASSERT_EQ(1, aggregated.size()); - ASSERT_TRUE(aggregated[0].segments_.has_value()); - auto segments = aggregated[0].segments_.value(); + auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))); + ASSERT_TRUE(aggregated.segments_.has_value()); + auto segments = aggregated.segments_.value(); ASSERT_EQ(1, segments.size()); using aggregation_test::check_column; @@ -304,13 +300,12 @@ TEST(Clause, Passthrough) { auto seg = get_standard_timeseries_segment("passthrough"); auto copied = seg.clone(); auto proc_unit = ProcessingUnit{std::move(seg)};; - auto entity_ids = Composite(push_entities(component_manager, std::move(proc_unit))); + auto entity_ids = push_entities(component_manager, std::move(proc_unit)); - auto ret = gather_entities(component_manager, passthrough.process(std::move(entity_ids))).as_range(); - ASSERT_EQ(ret.size(), 1); - ASSERT_TRUE(ret[0].segments_.has_value()); - ASSERT_EQ(ret[0].segments_->size(), 1); - ASSERT_EQ(*ret[0].segments_->at(0), copied); + auto ret = gather_entities(component_manager, passthrough.process(std::move(entity_ids))); + ASSERT_TRUE(ret.segments_.has_value()); + ASSERT_EQ(ret.segments_->size(), 1); + ASSERT_EQ(*ret.segments_->at(0), copied); } TEST(Clause, Sort) { @@ -326,12 +321,11 @@ TEST(Clause, Sort) { std::mt19937 urng(rng()); std::shuffle(seg.begin(), seg.end(), urng); auto proc_unit = ProcessingUnit{std::move(seg)}; - auto entity_ids = Composite(push_entities(component_manager, std::move(proc_unit))); + auto entity_ids = push_entities(component_manager, std::move(proc_unit)); - auto res = gather_entities(component_manager, sort_clause.process(std::move(entity_ids))).as_range(); - ASSERT_EQ(res.size(), 1); - ASSERT_TRUE(res[0].segments_.has_value()); - ASSERT_EQ(*res[0].segments_->at(0), copied); + auto res = gather_entities(component_manager, sort_clause.process(std::move(entity_ids))); + ASSERT_TRUE(res.segments_.has_value()); + ASSERT_EQ(*res.segments_->at(0), copied); } TEST(Clause, Split) { @@ -345,10 +339,11 @@ TEST(Clause, Split) { auto seg = get_standard_timeseries_segment(symbol, 100); auto copied = seg.clone(); auto proc_unit = ProcessingUnit{std::move(seg)}; - auto entity_ids = Composite(push_entities(component_manager, std::move(proc_unit))); + auto entity_ids = push_entities(component_manager, std::move(proc_unit)); - auto res = gather_entities(component_manager, split_clause.process(std::move(entity_ids))).as_range(); - ASSERT_EQ(res.size(), 10); + auto res = gather_entities(component_manager, split_clause.process(std::move(entity_ids))); + ASSERT_TRUE(res.segments_.has_value()); + ASSERT_EQ(res.segments_->size(), 10); FieldCollection desc; const auto& fields = copied.descriptor().fields(); @@ -358,12 +353,9 @@ TEST(Clause, Split) { desc.add_field(field->ref()); } SegmentSinkWrapper seg_wrapper(symbol, TimeseriesIndex::default_index(), std::move(desc)); - for (auto& item: res) { - ASSERT_TRUE(item.segments_.has_value()); - ASSERT_EQ(item.segments_->size(), 1); - auto segment = *item.segments_->at(0); - pipelines::FrameSlice slice(segment); - seg_wrapper.aggregator_.add_segment(std::move(segment), slice, false); + for (auto segment: res.segments_.value()) { + pipelines::FrameSlice slice(*segment); + seg_wrapper.aggregator_.add_segment(std::move(*segment), slice, false); } seg_wrapper.aggregator_.commit(); @@ -385,7 +377,6 @@ TEST(Clause, Merge) { MergeClause merge_clause{TimeseriesIndex{"time"}, SparseColumnPolicy{}, stream_id, descriptor}; merge_clause.set_component_manager(component_manager); - Composite entity_ids; auto seg = get_standard_timeseries_segment(std::get(stream_id), num_rows); std::vector segs; @@ -411,17 +402,18 @@ TEST(Clause, Merge) { current.end_row(); } + std::vector entity_ids; for(auto x = 0u; x < num_segs; ++x) { auto proc_unit = ProcessingUnit{std::move(segs[x])}; - entity_ids.push_back(push_entities(component_manager, std::move(proc_unit))); + entity_ids.push_back(push_entities(component_manager, std::move(proc_unit))[0]); } - Composite processed_ids = merge_clause.process(std::move(entity_ids)); - std::vector> vec; + std::vector processed_ids = merge_clause.process(std::move(entity_ids)); + std::vector> vec; vec.emplace_back(std::move(processed_ids)); auto repartitioned = merge_clause.repartition(std::move(vec)); - auto res = gather_entities(component_manager, std::move(repartitioned->at(0))).as_range(); - ASSERT_EQ(res.size(), 1u); - const auto& received = res[0]; - ASSERT_EQ(*received.segments_->at(0), seg); + auto res = gather_entities(component_manager, std::move(repartitioned->at(0))); + ASSERT_TRUE(res.segments_.has_value()); + ASSERT_EQ(res.segments_->size(), 1u); + ASSERT_EQ(*res.segments_->at(0), seg); } diff --git a/cpp/arcticdb/processing/test/test_resample.cpp b/cpp/arcticdb/processing/test/test_resample.cpp index 18f1c81e9a..787e9181f4 100644 --- a/cpp/arcticdb/processing/test/test_resample.cpp +++ b/cpp/arcticdb/processing/test/test_resample.cpp @@ -242,12 +242,11 @@ TEST(Resample, ProcessOneSegment) { seg.set_row_id(num_rows - 1); auto proc_unit = ProcessingUnit{std::move(seg)}; - auto entity_ids = Composite(push_entities(component_manager, std::move(proc_unit))); + auto entity_ids = push_entities(component_manager, std::move(proc_unit)); - auto resampled = gather_entities(component_manager, resample.process(std::move(entity_ids))).as_range(); - ASSERT_EQ(1, resampled.size()); - ASSERT_TRUE(resampled[0].segments_.has_value()); - auto segments = resampled[0].segments_.value(); + auto resampled = gather_entities(component_manager, resample.process(std::move(entity_ids))); + ASSERT_TRUE(resampled.segments_.has_value()); + auto segments = resampled.segments_.value(); ASSERT_EQ(1, segments.size()); auto resampled_seg = *segments[0]; @@ -345,12 +344,12 @@ TEST(Resample, ProcessMultipleSegments) { component_manager->add(col_range_2, id_2); - auto ids_0 = Composite(EntityIds{id_0, id_1}); - auto ids_1 = Composite(EntityIds{id_1}); - auto ids_2 = Composite(EntityIds{id_2}); + std::vector ids_0{id_0, id_1}; + std::vector ids_1{id_1}; + std::vector ids_2{id_2}; - auto resampled_0 = gather_entities(component_manager, resample.process(std::move(ids_0))).as_range(); - auto resampled_seg_0 = *resampled_0[0].segments_.value()[0]; + auto resampled_0 = gather_entities(component_manager, resample.process(std::move(ids_0))); + auto resampled_seg_0 = *resampled_0.segments_.value()[0]; auto& resampled_index_column_0 = resampled_seg_0.column(0); auto& resampled_sum_column_0 = resampled_seg_0.column(1); ASSERT_EQ(-5, resampled_index_column_0.scalar_at(0)); @@ -358,8 +357,8 @@ TEST(Resample, ProcessMultipleSegments) { ASSERT_EQ(0, resampled_sum_column_0.scalar_at(0)); ASSERT_EQ(30, resampled_sum_column_0.scalar_at(1)); - auto resampled_1 = gather_entities(component_manager, resample.process(std::move(ids_1))).as_range(); - auto resampled_seg_1 = *resampled_1[0].segments_.value()[0]; + auto resampled_1 = gather_entities(component_manager, resample.process(std::move(ids_1))); + auto resampled_seg_1 = *resampled_1.segments_.value()[0]; auto& resampled_index_column_1 = resampled_seg_1.column(0); auto& resampled_sum_column_1 = resampled_seg_1.column(1); ASSERT_EQ(25, resampled_index_column_1.scalar_at(0)); @@ -367,8 +366,8 @@ TEST(Resample, ProcessMultipleSegments) { ASSERT_EQ(30, resampled_sum_column_1.scalar_at(0)); ASSERT_EQ(40, resampled_sum_column_1.scalar_at(1)); - auto resampled_2 = gather_entities(component_manager, resample.process(std::move(ids_2))).as_range(); - auto resampled_seg_2 = *resampled_2[0].segments_.value()[0]; + auto resampled_2 = gather_entities(component_manager, resample.process(std::move(ids_2))); + auto resampled_seg_2 = *resampled_2.segments_.value()[0]; auto& resampled_index_column_2 = resampled_seg_2.column(0); auto& resampled_sum_column_2 = resampled_seg_2.column(1); ASSERT_EQ(46, resampled_index_column_2.scalar_at(0)); diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 6d98976c10..9eff9d77c3 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -416,10 +415,10 @@ FrameAndDescriptor read_multi_key( return {res.frame_, multi_key_desc, keys, std::shared_ptr{}}; } -Composite process_clauses( +std::vector process_clauses( std::shared_ptr component_manager, std::vector>&& segment_and_slice_futures, - const std::vector>& processing_unit_indexes, + std::vector>&& processing_unit_indexes, std::vector> clauses ) { // pass by copy deliberately as we don't want to modify read_query std::vector> segment_and_slice_future_splitters; segment_and_slice_future_splitters.reserve(segment_and_slice_futures.size()); @@ -440,91 +439,79 @@ Composite process_clauses( internal::check( std::all_of(segment_proc_unit_counts.begin(), segment_proc_unit_counts.end(), [](const size_t& val) { return val != 0; }), "All segments should be needed by at least one ProcessingUnit"); - // Convert vector of vectors into vector of 1-element composites - // At this stage, each Composite contains a single list of entity IDs, which may refer to a row-slice, a column-slice, a - // general rectangular slice, or some more exotic collection of segments based on the clause's processing - // parallelisation. - std::vector> vec_comp_entity_ids; - vec_comp_entity_ids.reserve(processing_unit_indexes.size()); - for (const auto& list: processing_unit_indexes) { - EntityIds entity_ids; - for (auto idx: list) { - entity_ids.emplace_back(idx); - } - vec_comp_entity_ids.emplace_back(Composite(std::move(entity_ids))); - } + // Give this a more descriptive name as we modify it between clauses + std::vector> entity_ids_vec{std::move(processing_unit_indexes)}; // Used to make sure each entity is only added into the component manager once std::vector entity_added_mtx(segment_and_slice_futures.size()); std::vector entity_added(segment_and_slice_futures.size(), false); - std::vector>> futures; + std::vector>> futures; bool first_clause{true}; // Reverse the order of clauses and iterate through them backwards so that the erase is efficient std::reverse(clauses.begin(), clauses.end()); while (!clauses.empty()) { - for (auto&& comp_entity_ids: vec_comp_entity_ids) { + for (auto&& entity_ids: entity_ids_vec) { if (first_clause) { - internal::check(comp_entity_ids.is_single(), - "Expected Composite of size 1 on entry to process_clauses"); std::vector> local_futs; - for (auto id: std::get(comp_entity_ids[0])) { + local_futs.reserve(entity_ids.size()); + for (auto id: entity_ids) { local_futs.emplace_back(segment_and_slice_future_splitters[id].getFuture()); } futures.emplace_back( folly::collect(local_futs) .via(&async::cpu_executor()) .thenValue([component_manager, - &segment_proc_unit_counts, - &entity_added_mtx, - &entity_added, - &clauses, - comp_entity_ids = std::move(comp_entity_ids)](std::vector&& segment_and_slices) mutable { - auto entity_ids = std::get(comp_entity_ids[0]); + &segment_proc_unit_counts, + &entity_added_mtx, + &entity_added, + &clauses, + entity_ids = std::move(entity_ids)](std::vector&& segment_and_slices) mutable { for (auto&& [idx, segment_and_slice]: folly::enumerate(segment_and_slices)) { - std::lock_guard lock(entity_added_mtx[entity_ids[idx]]); - if (!entity_added[entity_ids[idx]]) { + auto entity_id = entity_ids[idx]; + std::lock_guard lock(entity_added_mtx[entity_id]); + if (!entity_added[entity_id]) { component_manager->add( std::make_shared(std::move(segment_and_slice.segment_in_memory_)), - entity_ids[idx], segment_proc_unit_counts[entity_ids[idx]]); + entity_id, segment_proc_unit_counts[entity_id]); component_manager->add( std::make_shared(std::move(segment_and_slice.ranges_and_key_.row_range_)), - entity_ids[idx]); + entity_id); component_manager->add( std::make_shared(std::move(segment_and_slice.ranges_and_key_.col_range_)), - entity_ids[idx]); + entity_id); component_manager->add( std::make_shared(std::move(segment_and_slice.ranges_and_key_.key_)), - entity_ids[idx]); - entity_added[entity_ids[idx]] = true; + entity_id); + entity_added[entity_id] = true; } } - return async::submit_cpu_task(async::MemSegmentProcessingTask(clauses, std::move(comp_entity_ids))); + return async::MemSegmentProcessingTask(clauses, std::move(entity_ids))(); })); } else { futures.emplace_back( async::submit_cpu_task( async::MemSegmentProcessingTask(clauses, - std::move(comp_entity_ids)) + std::move(entity_ids)) ) ); } } first_clause = false; - vec_comp_entity_ids = folly::collect(futures).get(); + entity_ids_vec = folly::collect(futures).get(); futures.clear(); while (clauses.size() > 0 && !clauses.back()->clause_info().requires_repartition_) { clauses.erase(clauses.end() - 1); } if (clauses.size() > 0 && clauses.back()->clause_info().requires_repartition_) { - vec_comp_entity_ids = clauses.back()->repartition(std::move(vec_comp_entity_ids)).value(); + entity_ids_vec = clauses.back()->repartition(std::move(entity_ids_vec)).value(); clauses.erase(clauses.end() - 1); } } - return merge_composites(std::move(vec_comp_entity_ids)); + return flatten_entities(std::move(entity_ids_vec)); } void set_output_descriptors( - const Composite& comp_processing_units, + const ProcessingUnit& proc, const std::vector>& clauses, const std::shared_ptr& pipeline_context) { std::optional index_column; @@ -539,26 +526,22 @@ void set_output_descriptors( } } std::optional new_stream_descriptor; - comp_processing_units.broadcast([&new_stream_descriptor](const auto& proc) { - if (!new_stream_descriptor.has_value()) { - if (proc.segments_.has_value() && proc.segments_->size() > 0) { - new_stream_descriptor = std::make_optional(); - new_stream_descriptor->set_index(proc.segments_->at(0)->descriptor().index()); - for (size_t idx = 0; idx < new_stream_descriptor->index().field_count(); idx++) { - new_stream_descriptor->add_field(proc.segments_->at(0)->descriptor().field(idx)); - } - } + if (proc.segments_.has_value() && proc.segments_->size() > 0) { + new_stream_descriptor = std::make_optional(); + new_stream_descriptor->set_index(proc.segments_->at(0)->descriptor().index()); + for (size_t idx = 0; idx < new_stream_descriptor->index().field_count(); idx++) { + new_stream_descriptor->add_field(proc.segments_->at(0)->descriptor().field(idx)); } - if (new_stream_descriptor.has_value() && proc.segments_.has_value()) { - std::vector> fields; - for (const auto& segment: *proc.segments_) { - fields.push_back(segment->descriptor().fields_ptr()); - } - new_stream_descriptor = merge_descriptors(*new_stream_descriptor, - fields, - std::vector{}); + } + if (new_stream_descriptor.has_value() && proc.segments_.has_value()) { + std::vector> fields; + for (const auto& segment: *proc.segments_) { + fields.push_back(segment->descriptor().fields_ptr()); } - }); + new_stream_descriptor = merge_descriptors(*new_stream_descriptor, + fields, + std::vector{}); + } if (new_stream_descriptor.has_value()) { // Finding and erasing fields from the FieldCollection contained in StreamDescriptor is O(n) in number of fields // So maintain map from field names to types in the new_stream_descriptor to make these operations O(1) @@ -627,12 +610,12 @@ std::vector generate_ranges_and_keys(const std::vector. Slices contained within a single - * Composite are processed within a single thread. + * processing unit collected into a single ProcessingUnit. Slices contained within a single ProcessingUnit are processed + * within a single thread. * - * The processing of a Composite is scheduled via the Async Store. Within a single thread, the + * The processing of a ProcessingUnit is scheduled via the Async Store. Within a single thread, the * segments will be retrieved from storage and decompressed before being passed to a MemSegmentProcessingTask which - * will process all clauses up until a reducing clause. + * will process all clauses up until a clause that requires a repartition. */ std::vector read_and_process( const std::shared_ptr& store, @@ -662,16 +645,16 @@ std::vector read_and_process( auto processed_entity_ids = process_clauses(component_manager, std::move(segment_and_slice_futures), - processing_unit_indexes, + std::move(processing_unit_indexes), read_query.clauses_); - auto comp_processing_units = gather_entities(component_manager, std::move(processed_entity_ids)); + auto proc = gather_entities(component_manager, std::move(processed_entity_ids)); if (std::any_of(read_query.clauses_.begin(), read_query.clauses_.end(), [](const std::shared_ptr& clause) { return clause->clause_info().modifies_output_descriptor_; })) { - set_output_descriptors(comp_processing_units, read_query.clauses_, pipeline_context); + set_output_descriptors(proc, read_query.clauses_, pipeline_context); } - return collect_segments(std::move(comp_processing_units)); + return collect_segments(std::move(proc)); } SegmentInMemory read_direct(const std::shared_ptr& store, diff --git a/python/tests/unit/arcticdb/version_store/test_aggregation.py b/python/tests/unit/arcticdb/version_store/test_aggregation.py index 10fcbf7be1..06dc8b3809 100644 --- a/python/tests/unit/arcticdb/version_store/test_aggregation.py +++ b/python/tests/unit/arcticdb/version_store/test_aggregation.py @@ -290,7 +290,6 @@ def test_first_aggregation(local_object_version_store): df = pd.DataFrame({"get_first": [100.0, 2.7, 5.8, np.nan]}, index=["group_1", "group_2", "group_3", "group_4"]) df.index.rename("grouping_column", inplace=True) - res.data.sort_index(inplace=True) assert_frame_equal(res.data, df) @@ -364,7 +363,6 @@ def test_last_aggregation(local_object_version_store): df = pd.DataFrame({"get_last": [3.45, 2.7, 5.8, np.nan, 6.9]}, index=["group_1", "group_2", "group_3", "group_4", "group_5"]) df.index.rename("grouping_column", inplace=True) - res.data.sort_index(inplace=True) assert_frame_equal(res.data, df) @@ -403,7 +401,6 @@ def test_sum_aggregation(local_object_version_store): df = pd.DataFrame({"to_sum": [4, 4]}, index=["group_1", "group_2"]) df.index.rename("grouping_column", inplace=True) - res.data.sort_index(inplace=True) assert_frame_equal(res.data, df) @@ -446,7 +443,6 @@ def test_mean_aggregation_float(local_object_version_store): df = pd.DataFrame({"to_mean": [(1.1 + 1.4 + 2.5) / 3, 2.2]}, index=["group_1", "group_2"]) df.index.rename("grouping_column", inplace=True) - res.data.sort_index(inplace=True) assert_frame_equal(res.data, df) diff --git a/python/tests/unit/arcticdb/version_store/test_aggregation_dynamic.py b/python/tests/unit/arcticdb/version_store/test_aggregation_dynamic.py index db096beea7..993dd3061c 100644 --- a/python/tests/unit/arcticdb/version_store/test_aggregation_dynamic.py +++ b/python/tests/unit/arcticdb/version_store/test_aggregation_dynamic.py @@ -378,6 +378,7 @@ def test_sum_aggregation_dynamic(s3_version_store_dynamic_schema_v2): q = q.groupby("grouping_column").agg({"to_sum": "sum"}) received = lib.read(symbol, query_builder=q).data + received.sort_index(inplace=True) expected = expected.groupby("grouping_column").agg({"to_sum": "sum"}) assert_equal_value(received, expected) @@ -396,6 +397,7 @@ def test_sum_aggregation_with_range_index_dynamic(lmdb_version_store_dynamic_sch q = q.groupby("grouping_column").agg({"to_sum": "sum"}) received = lib.read(symbol, query_builder=q).data + received.sort_index(inplace=True) expected = expected.groupby("grouping_column").agg({"to_sum": "sum"}) assert_equal_value(received, expected) diff --git a/python/tests/unit/arcticdb/version_store/test_query_builder_sparse.py b/python/tests/unit/arcticdb/version_store/test_query_builder_sparse.py index 079c691b2c..30b0401a05 100644 --- a/python/tests/unit/arcticdb/version_store/test_query_builder_sparse.py +++ b/python/tests/unit/arcticdb/version_store/test_query_builder_sparse.py @@ -130,6 +130,7 @@ def test_groupby(self, lmdb_version_store): q = q.groupby("sparse1").agg(aggs) received = lmdb_version_store.read(self.sym, query_builder=q).data received = received.reindex(columns=sorted(received.columns)) + received.sort_index(inplace=True) assert_frame_equal(expected, received, check_dtype=False)