Skip to content

Commit

Permalink
Optimize aggregation (#464)
Browse files Browse the repository at this point in the history
* Documents as private some (recently introduced) class methods in Daru::Core::GroupBy

* Makes initialization of @groups and @df lazy in Daru::Core::GroupBy

* Simplifies aggregation code in DataFrame

* Avoids intermediary dataframe creation in Daru::Core::GroupBy#aggregate

* Reduces the number of calls to DataFrame#get_sub_dataframe during aggregation

(instead of doing iterations over columns, we iterate over groups)

* Document and consolidate use of internal method to access rows in Daru::DataFrame

* Cleans code in Daru::Index

* Cleans code in Daru::Core::GroupBy

* Makes loading of ploting libraries lazy

* Cleans some code (cosmetic)
  • Loading branch information
paisible-wanderer authored and v0dro committed Dec 4, 2018
1 parent 5e962f3 commit 984ff72
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 96 deletions.
15 changes: 12 additions & 3 deletions lib/daru/category.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ def plotting_library= lib
end
end

# this method is overwritten: see Daru::Category#plotting_library=
def plot(*args, **options, &b)
init_plotting_library

plot(*args, **options, &b)
end

alias_method :rename, :name=

# Returns an enumerator that enumerates on categorical data
Expand Down Expand Up @@ -748,6 +755,11 @@ def positions(*values)

private

# Will lazily load the plotting library being used
def init_plotting_library
self.plotting_library = Daru.plotting_library
end

def validate_categories input_categories
raise ArgumentError, 'Input categories and speculated categories mismatch' unless
(categories - input_categories).empty?
Expand All @@ -768,9 +780,6 @@ def initialize_core_attributes data
# To link every instance to its category,
# it stores integer for every instance representing its category
@array = map_cat_int.values_at(*data)

# Include plotting functionality
self.plotting_library = Daru.plotting_library
end

def category_from_position position
Expand Down
71 changes: 40 additions & 31 deletions lib/daru/core/group_by.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module Daru
module Core
class GroupBy
class << self
# @private
def get_positions_group_map_on(indexes_with_positions, sort: false)
group_map = {}

Expand All @@ -17,6 +18,7 @@ def get_positions_group_map_on(indexes_with_positions, sort: false)
group_map
end

# @private
def get_positions_group_for_aggregation(multi_index, level=-1)
raise unless multi_index.is_a?(Daru::MultiIndex)

Expand All @@ -26,16 +28,19 @@ def get_positions_group_for_aggregation(multi_index, level=-1)
get_positions_group_map_on(new_index.each_with_index)
end

# @private
def get_positions_group_map_for_df(df, group_by_keys, sort: true)
indexes_with_positions = df[*group_by_keys].to_df.each_row.map(&:to_a).each_with_index

get_positions_group_map_on(indexes_with_positions, sort: sort)
end

# @private
def group_map_from_positions_to_indexes(positions_group_map, index)
positions_group_map.map { |k, positions| [k, positions.map { |pos| index.at(pos) }] }.to_h
end

# @private
def df_from_group_map(df, group_map, remaining_vectors, from_position: true)
return nil if group_map == {}

Expand All @@ -52,7 +57,17 @@ def df_from_group_map(df, group_map, remaining_vectors, from_position: true)
end
end

attr_reader :groups, :df
# lazy accessor/attr_reader for the attribute groups
def groups
@groups ||= GroupBy.group_map_from_positions_to_indexes(@groups_by_pos, @context.index)
end
alias :groups_by_idx :groups

# lazy accessor/attr_reader for the attribute df
def df
@df ||= GroupBy.df_from_group_map(@context, @groups_by_pos, @non_group_vectors)
end
alias :grouped_df :df

# Iterate over each group created by group_by. A DataFrame is yielded in
# block.
Expand All @@ -75,31 +90,26 @@ def each_group
end

def initialize context, names
@group_vectors = names
@non_group_vectors = context.vectors.to_a - names
@context = context

@context = context # TODO: maybe rename in @original_df or @grouped_db

# FIXME: It feels like we don't want to sort here. Ruby's #group_by
# never sorts:
#
# ['test', 'me', 'please'].group_by(&:size)
# # => {4=>["test"], 2=>["me"], 6=>["please"]}
#
# - zverok, 2016-09-12
positions_groups = GroupBy.get_positions_group_map_for_df(@context, names, sort: true)

@groups = GroupBy.group_map_from_positions_to_indexes(positions_groups, @context.index)
@df = GroupBy.df_from_group_map(@context, positions_groups, @non_group_vectors)
@groups_by_pos = GroupBy.get_positions_group_map_for_df(@context, @group_vectors, sort: true)
end

# Get a Daru::Vector of the size of each group.
def size
index =
if multi_indexed_grouping?
Daru::MultiIndex.from_tuples @groups.keys
else
Daru::Index.new @groups.keys.flatten
end
index = get_grouped_index

values = @groups.values.map(&:size)
values = @groups_by_pos.values.map(&:size)
Daru::Vector.new(values, index: index, name: :size)
end

Expand Down Expand Up @@ -246,7 +256,7 @@ def min
# # a b c d
# # 5 bar two 6 66
def get_group group
indexes = @groups[group]
indexes = groups_by_idx[group]
elements = @context.each_vector.map(&:to_a)
transpose = elements.transpose
rows = indexes.each.map { |idx| transpose[idx] }
Expand All @@ -273,7 +283,7 @@ def get_group group
# # a ACE
# # b BDF
def reduce(init=nil)
result_hash = @groups.each_with_object({}) do |(group, indices), h|
result_hash = groups_by_idx.each_with_object({}) do |(group, indices), h|
group_indices = indices.map { |v| @context.index.to_a[v] }

grouped_result = init
Expand All @@ -284,18 +294,13 @@ def reduce(init=nil)
h[group] = grouped_result
end

index =
if multi_indexed_grouping?
Daru::MultiIndex.from_tuples result_hash.keys
else
Daru::Index.new result_hash.keys.flatten
end
index = get_grouped_index(result_hash.keys)

Daru::Vector.new(result_hash.values, index: index)
end

def inspect
@df.inspect
grouped_df.inspect
end

# Function to use for aggregating the data.
Expand Down Expand Up @@ -335,7 +340,9 @@ def inspect
# Ram Hyderabad,Mumbai
#
def aggregate(options={})
@df.aggregate(options)
new_index = get_grouped_index

@context.aggregate(options) { [@groups_by_pos.values, new_index] }
end

private
Expand All @@ -344,7 +351,7 @@ def select_groups_from method, quantity
selection = @context
rows, indexes = [], []

@groups.each_value do |index|
groups_by_idx.each_value do |index|
index.send(method, quantity).each do |idx|
rows << selection.row[idx].to_a
indexes << idx
Expand All @@ -360,29 +367,31 @@ def apply_method method_type, method
method_type == :numeric && @context[ngvec].type == :numeric
end

rows = @groups.map do |_group, indexes|
rows = groups_by_idx.map do |_group, indexes|
order.map do |ngvector|
slice = @context[ngvector][*indexes]
slice.is_a?(Daru::Vector) ? slice.send(method) : slice
end
end

index = apply_method_index
index = get_grouped_index
order = Daru::Index.new(order)
Daru::DataFrame.new(rows.transpose, index: index, order: order)
end

def apply_method_index
def get_grouped_index(index_tuples=nil)
index_tuples = @groups_by_pos.keys if index_tuples.nil?

if multi_indexed_grouping?
Daru::MultiIndex.from_tuples(@groups.keys)
Daru::MultiIndex.from_tuples(index_tuples)
else
Daru::Index.new(@groups.keys.flatten)
Daru::Index.new(index_tuples.flatten)
end
end

def multi_indexed_grouping?
return false unless @groups.keys[0]
@groups.keys[0].size > 1
return false unless @groups_by_pos.keys[0]
@groups_by_pos.keys[0].size > 1
end
end
end
Expand Down
Loading

0 comments on commit 984ff72

Please sign in to comment.