diff --git a/src/create-model.jl b/src/create-model.jl index d02f39af..ccc7c7dd 100644 --- a/src/create-model.jl +++ b/src/create-model.jl @@ -72,6 +72,7 @@ function create_model( ## Add expressions to dataframes # TODO: What will improve this? Variables (#884)?, Constraints? @timeit to "add_expressions_to_constraints!" add_expressions_to_constraints!( + connection, variables, constraints, model, diff --git a/src/io.jl b/src/io.jl index 8e5189e4..486511ba 100644 --- a/src/io.jl +++ b/src/io.jl @@ -32,7 +32,7 @@ function create_internal_structures(connection) "profiles_timeframe" ] for table in tables_allowed_to_be_missing - _check_if_table_exist(connection, table) + _create_empty_unless_exists(connection, table) end # Get the years struct ordered by year @@ -305,18 +305,14 @@ function get_schema(tablename) end end -function _check_if_table_exist(connection, table_name) +function _create_empty_unless_exists(connection, table_name) schema = get_schema(table_name) - existence_query = DBInterface.execute( - connection, - "SELECT table_name FROM information_schema.tables WHERE table_name = '$table_name'", - ) - if length(collect(existence_query)) == 0 + if !_check_if_table_exists(connection, table_name) columns_in_table = join(("$col $col_type" for (col, col_type) in schema), ",") - create_table_query = - DuckDB.query(connection, "CREATE TABLE $table_name ($columns_in_table)") + DuckDB.query(connection, "CREATE TABLE $table_name ($columns_in_table)") end + return end diff --git a/src/model-preparation.jl b/src/model-preparation.jl index add5c4cd..2df0b2d5 100644 --- a/src/model-preparation.jl +++ b/src/model-preparation.jl @@ -4,11 +4,10 @@ export create_sets, prepare_profiles_structure """ add_expression_terms_rep_period_constraints!(df_cons, df_flows, - workspace, - representative_periods, - graph; + workspace; use_highest_resolution = true, multiply_by_duration = true, + add_min_outgoing_flow_duration = false, ) Computes the incoming and outgoing expressions per row of df_cons for the constraints @@ -19,24 +18,43 @@ This function is only used internally in the model. This strategy is based on the replies in this discourse thread: - https://discourse.julialang.org/t/help-improving-the-speed-of-a-dataframes-operation/107615/23 + +# Implementation + +This expression computation uses a workspace to store all variables defined for +each timestep. +The idea of this algorithm is to append all variables defined at time +`timestep` in `workspace[timestep]` and then aggregate then for the constraint +time block. + +The algorithm works like this: + +1. Loop over each group of (asset, year, rep_period) +1.1. Loop over each variable in the group: (var_idx, var_time_block_start, var_time_block_end) +1.1.1. Loop over each timestep in var_time_block_start:var_time_block_end +1.1.1.1. Compute the coefficient of the variable based on the rep_period + resolution and the variable efficiency +1.1.1.2. Store (var_idx, coefficient) in workspace[timestep] +1.2. Loop over each constraint in the group: (cons_idx, cons_time_block_start, cons_time_block_end) +1.2.1. Aggregate all variables in workspace[timestep] for timestep in the time + block to create a list of variable indices and their coefficients [(var_idx1, coef1), ...] +1.2.2. Compute the expression using the variable container, the indices and coefficients + +Notes: +- On step 1.2.1, the aggregation can be either by uniqueness of not, i.e., if + the variable happens in more that one `workspace[timestep]`, should we add up + the coefficients or not. This is defined by the keyword +`multiply_by_duration` """ function add_expression_terms_rep_period_constraints!( + connection, cons::TulipaConstraint, - flow::TulipaVariable, - workspace, - representative_periods, - graph; + flow::TulipaVariable; use_highest_resolution = true, multiply_by_duration = true, add_min_outgoing_flow_duration = false, ) - # Aggregating function: If the duration should NOT be taken into account, we have to compute unique appearances of the flows. - # Otherwise, just use the sum - agg = multiply_by_duration ? v -> sum(v) : v -> sum(unique(v)) - - grouped_cons = DataFrames.groupby(cons.indices, [:year, :rep_period, :asset]) - - # grouped_cons' asset will be matched with either to or from, depending on whether + # cons' asset will be matched with flow's to or from, depending on whether # we are filling incoming or outgoing flows cases = [ (expr_key = :incoming, asset_match = :to, selected_assets = ["hub", "consumer"]), @@ -47,6 +65,34 @@ function add_expression_terms_rep_period_constraints!( ), ] num_rows = size(cons.indices, 1) + # TODO: Move this new workspace definition out of this function if and when it's used by other functions + Tmax = only( + row[1] for + row in DuckDB.query(connection, "SELECT MAX(num_timesteps) FROM rep_periods_data") + )::Int32 + + workspace = [Dict{Int,Float64}() for _ in 1:Tmax] + + # The SQL strategy to improve looping over the groups and then the + # constraints and variables, is to create grouped tables beforehand and join them + # The grouped constraint table is created below + grouped_cons_table_name = "t_grouped_$(cons.table_name)" + if !_check_if_table_exists(connection, grouped_cons_table_name) + DuckDB.query( + connection, + "CREATE TEMP TABLE $grouped_cons_table_name AS + SELECT + cons.asset, + cons.year, + cons.rep_period, + ARRAY_AGG(cons.index ORDER BY cons.index) AS index, + ARRAY_AGG(cons.time_block_start ORDER BY cons.index) AS time_block_start, + ARRAY_AGG(cons.time_block_end ORDER BY cons.index) AS time_block_end, + FROM $(cons.table_name) AS cons + GROUP BY cons.asset, cons.year, cons.rep_period + ", + ) + end for case in cases attach_expression!(cons, case.expr_key, Vector{JuMP.AffExpr}(undef, num_rows)) @@ -64,60 +110,134 @@ function add_expression_terms_rep_period_constraints!( # TulipaConstraint created just for this purpose attach_coefficient!(cons, :min_outgoing_flow_duration, ones(num_rows)) end - grouped_flows = DataFrames.groupby(flow.indices, [:year, :rep_period, case.asset_match]) - for ((year, rep_period, asset), sub_df) in pairs(grouped_cons) - if !haskey(grouped_flows, (year, rep_period, asset)) - continue - end - resolution = - multiply_by_duration ? representative_periods[year][rep_period].resolution : 1.0 - for i in eachindex(workspace) - workspace[i] = JuMP.AffExpr(0.0) - end + + # The grouped variable table is created below for each case (from=asset, to=asset) + grouped_var_table_name = "t_grouped_$(flow.table_name)_match_on_$(case.asset_match)" + if !_check_if_table_exists(connection, grouped_var_table_name) + DuckDB.query( + connection, + "CREATE TEMP TABLE $grouped_var_table_name AS + SELECT + var.$(case.asset_match) AS asset, + var.year, + var.rep_period, + ARRAY_AGG(var.index ORDER BY var.index) AS index, + ARRAY_AGG(var.time_block_start ORDER BY var.index) AS time_block_start, + ARRAY_AGG(var.time_block_end ORDER BY var.index) AS time_block_end, + ARRAY_AGG(var.efficiency ORDER BY var.index) AS efficiency, + FROM $(flow.table_name) AS var + GROUP BY var.$(case.asset_match), var.year, var.rep_period + ", + ) + end + + resolution_query = multiply_by_duration ? "rep_periods_data.resolution" : "1.0::FLOAT8" + + # Start of the algorithm + # 1. Loop over each group of (asset, year, rep_period) + for group_row in DuckDB.query( + connection, + "SELECT + cons.asset, + cons.year, + cons.rep_period, + cons.index AS cons_idx, + cons.time_block_start AS cons_time_block_start, + cons.time_block_end AS cons_time_block_end, + var.index AS var_idx, + var.time_block_start AS var_time_block_start, + var.time_block_end AS var_time_block_end, + var.efficiency, + asset.type AS type, + $resolution_query AS resolution, + FROM $grouped_cons_table_name AS cons + LEFT JOIN $grouped_var_table_name AS var + ON cons.asset = var.asset + AND cons.year = var.year + AND cons.rep_period = var.rep_period + LEFT JOIN asset + ON cons.asset = asset.asset + LEFT JOIN rep_periods_data + ON cons.rep_period = rep_periods_data.rep_period + AND cons.year = rep_periods_data.year + WHERE + len(var.index) > 0 + ", + ) + resolution = group_row.resolution::Float64 + empty!.(workspace) outgoing_flow_durations = typemax(Int64) #LARGE_NUMBER to start finding the minimum outgoing flow duration - # Store the corresponding flow in the workspace - for row in eachrow(grouped_flows[(year, rep_period, asset)]) - asset = row[case.asset_match] - for t in row.time_block_start:row.time_block_end + + # Step 1.1. Loop over each variable in the group + for (var_idx, time_block_start, time_block_end, efficiency) in zip( + group_row.var_idx::Vector{Union{Missing,Int64}}, + group_row.var_time_block_start::Vector{Union{Missing,Int32}}, + group_row.var_time_block_end::Vector{Union{Missing,Int32}}, + group_row.efficiency::Vector{Union{Missing,Float64}}, + ) + time_block = time_block_start:time_block_end + # Step 1.1.1. + for timestep in time_block + # Step 1.1.1.1. # Set the efficiency to 1 for inflows and outflows of hub and consumer assets, and outflows for producer assets # And when you want the highest resolution (which is asset type-agnostic) efficiency_coefficient = - if graph[asset].type in case.selected_assets || use_highest_resolution + if group_row.type::String in case.selected_assets || use_highest_resolution 1.0 else if case.expr_key == :incoming - row.efficiency + efficiency::Float64 else # Divide by efficiency for outgoing flows - 1.0 / row.efficiency + 1.0 / efficiency::Float64 end end - JuMP.add_to_expression!( - workspace[t], - flow.container[row.index], - resolution * efficiency_coefficient, - ) - if conditions_to_add_min_outgoing_flow_duration - outgoing_flow_durations = min( - outgoing_flow_durations, - row.time_block_end - row.time_block_start + 1, - ) - end + # Step 1.1.1.2. + workspace[timestep][var_idx] = resolution * efficiency_coefficient + end + if conditions_to_add_min_outgoing_flow_duration + outgoing_flow_durations = + min(outgoing_flow_durations, (time_block_end - time_block_start + 1)::Int64) end end - # Sum the corresponding flows from the workspace - for row in eachrow(sub_df) - # TODO: This is a hack to handle constraint tables that still have timesteps_block - # In particular, storage_level_rep_period - cons.expressions[case.expr_key][row.index] = - agg(@view workspace[row.time_block_start:row.time_block_end]) + + # Step 1.2. Loop over each constraint + for (cons_idx, time_block_start, time_block_end) in zip( + group_row.cons_idx::Vector{Union{Missing,Int64}}, + group_row.cons_time_block_start::Vector{Union{Missing,Int32}}, + group_row.cons_time_block_end::Vector{Union{Missing,Int32}}, + ) + time_block = time_block_start:time_block_end + workspace_agg = Dict{Int,Float64}() + # Step 1.2.1. + for timestep in time_block + for (var_idx, var_coefficient) in workspace[timestep] + if !haskey(workspace_agg, var_idx) + # First time a variable is encountered it adds to the aggregation + workspace_agg[var_idx] = var_coefficient + elseif multiply_by_duration + # In this case, accumulates more of the variable, + # i.e., which effectively multiplies the variable + # by its duration in the time block + workspace_agg[var_idx] += var_coefficient + end + end + end + if length(workspace_agg) > 0 + # Step 1.2.2. + cons.expressions[case.expr_key][cons_idx] = sum( + duration * flow.container[var_idx] for (var_idx, duration) in workspace_agg + ) + end if conditions_to_add_min_outgoing_flow_duration - cons.coefficients[:min_outgoing_flow_duration][row.index] = + cons.coefficients[:min_outgoing_flow_duration][cons_idx] = outgoing_flow_durations end end end end + + return end """ @@ -328,6 +448,7 @@ function add_expression_terms_over_clustered_year_constraints!( end function add_expressions_to_constraints!( + connection, variables, constraints, model, @@ -339,38 +460,30 @@ function add_expressions_to_constraints!( # Unpack variables # Creating the incoming and outgoing flow expressions @timeit to "add_expression_terms_rep_period_constraints!" add_expression_terms_rep_period_constraints!( + connection, constraints[:balance_conversion], - variables[:flow], - expression_workspace, - representative_periods, - graph; + variables[:flow]; use_highest_resolution = false, multiply_by_duration = true, ) @timeit to "add_expression_terms_rep_period_constraints!" add_expression_terms_rep_period_constraints!( + connection, constraints[:balance_storage_rep_period], - variables[:flow], - expression_workspace, - representative_periods, - graph; + variables[:flow]; use_highest_resolution = false, multiply_by_duration = true, ) @timeit to "add_expression_terms_rep_period_constraints!" add_expression_terms_rep_period_constraints!( + connection, constraints[:balance_consumer], - variables[:flow], - expression_workspace, - representative_periods, - graph; + variables[:flow]; use_highest_resolution = true, multiply_by_duration = false, ) @timeit to "add_expression_terms_rep_period_constraints!" add_expression_terms_rep_period_constraints!( + connection, constraints[:balance_hub], - variables[:flow], - expression_workspace, - representative_periods, - graph; + variables[:flow]; use_highest_resolution = true, multiply_by_duration = false, ) @@ -384,11 +497,9 @@ function add_expressions_to_constraints!( :capacity_outgoing_investable_storage_with_binary, ) @timeit to "add_expression_terms_rep_period_constraints! for $table_name" add_expression_terms_rep_period_constraints!( + connection, constraints[table_name], - variables[:flow], - expression_workspace, - representative_periods, - graph; + variables[:flow]; use_highest_resolution = true, multiply_by_duration = false, ) @@ -408,11 +519,9 @@ function add_expressions_to_constraints!( :max_output_flow_with_basic_unit_commitment, ) @timeit to "add_expression_terms_rep_period_constraints!" add_expression_terms_rep_period_constraints!( + connection, constraints[table_name], - variables[:flow], - expression_workspace, - representative_periods, - graph; + variables[:flow]; use_highest_resolution = true, multiply_by_duration = false, add_min_outgoing_flow_duration = true, diff --git a/src/utils.jl b/src/utils.jl index 94a16af6..dff97bf5 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -1,5 +1,13 @@ # Auxiliary functions to create the model +function _check_if_table_exists(connection, table_name) + existence_query = DBInterface.execute( + connection, + "SELECT table_name FROM information_schema.tables WHERE table_name = '$table_name'", + ) + return length(collect(existence_query)) > 0 +end + # FIXME: Ugly hack applied """ is_active(graph, a, y)