diff --git a/lib/cloud_controller/diego/processes_sync.rb b/lib/cloud_controller/diego/processes_sync.rb index 91238ec330d..0a30b7cbd54 100644 --- a/lib/cloud_controller/diego/processes_sync.rb +++ b/lib/cloud_controller/diego/processes_sync.rb @@ -21,35 +21,24 @@ def sync @bump_freshness = true diego_lrps = bbs_apps_client.fetch_scheduling_infos.index_by { |d| d.desired_lrp_key.process_guid } logger.info('fetched-scheduling-infos') - - batched_processes do |processes| + to_desire = [] + to_update = {} + batched_processes_for_sync do |processes| processes.each do |process| process_guid = ProcessGuid.from_process(process) diego_lrp = diego_lrps.delete(process_guid) if diego_lrp.nil? - workpool.submit(process) do |p| - logger.info('desiring-lrp', process_guid: p.guid, app_guid: p.app_guid) - bbs_apps_client.desire_app(p) - logger.info('desire-lrp', process_guid: p.guid) - end + to_desire.append(process.id) elsif process.updated_at.to_f.to_s != diego_lrp.annotation - workpool.submit(process, diego_lrp) do |p, l| - logger.info('updating-lrp', process_guid: p.guid, app_guid: p.app_guid) - bbs_apps_client.update_app(p, l) - logger.info('update-lrp', process_guid: p.guid) - end + to_update[process.id] = diego_lrp end end end - diego_lrps.each_key do |process_guid_to_delete| - workpool.submit(process_guid_to_delete) do |guid| - logger.info('deleting-lrp', process_guid: guid) - bbs_apps_client.stop_app(guid) - logger.info('delete-lrp', process_guid: guid) - end - end + update_lrps(to_update) + desire_lrps(to_desire) + delete_lrps(diego_lrps) workpool.drain @@ -95,15 +84,49 @@ def process_workpool_exceptions(exceptions) @statsd_updater.update_synced_invalid_lrps(invalid_lrps) end + def update_lrps(to_update) + batched_processes(to_update.keys) do |processes| + processes.each do |process| + workpool.submit(process, to_update[process.id]) do |p, l| + logger.info('updating-lrp', process_guid: p.guid, app_guid: p.app_guid) + bbs_apps_client.update_app(p, l) + logger.info('update-lrp', process_guid: p.guid) + end + end + end + end + + def desire_lrps(to_desire) + batched_processes(to_desire) do |processes| + processes.each do |process| + workpool.submit(process) do |p| + logger.info('desiring-lrp', process_guid: p.guid, app_guid: p.app_guid) + bbs_apps_client.desire_app(p) + logger.info('desire-lrp', process_guid: p.guid) + end + end + end + end + + def delete_lrps(to_delete) + to_delete.each_key do |process_guid_to_delete| + workpool.submit(process_guid_to_delete) do |guid| + logger.info('deleting-lrp', process_guid: guid) + bbs_apps_client.stop_app(guid) + logger.info('delete-lrp', process_guid: guid) + end + end + end + def formatted_backtrace_from_error(error) error.backtrace.present? ? error.backtrace.join("\n") + "\n..." : '' end - def batched_processes + def batched_processes_for_sync last_id = 0 loop do - processes = processes(last_id).all + processes = processes_for_sync(last_id).all yield processes return if processes.count < BATCH_SIZE @@ -111,15 +134,19 @@ def batched_processes end end - def processes(last_id) + def batched_processes(ids) + ids.each_slice(BATCH_SIZE) do |id_chunk| + processes = processes(id_chunk).all + yield processes + end + end + + def processes(ids) processes = ProcessModel. diego. runnable. - where(Sequel.lit("#{ProcessModel.table_name}.id > ?", last_id)). - order("#{ProcessModel.table_name}__id".to_sym). - eager(:desired_droplet, :space, :service_bindings, { routes: :domain }, { app: :buildpack_lifecycle_data }). - limit(BATCH_SIZE) - + where(Sequel.lit("#{ProcessModel.table_name}.id IN ?", ids)). + eager(:desired_droplet, :space, :service_bindings, { routes: :domain }, { app: :buildpack_lifecycle_data }) if FeatureFlag.enabled?(:diego_docker) processes.select_all(ProcessModel.table_name) else @@ -128,6 +155,18 @@ def processes(last_id) end end + def processes_for_sync(last_id) + processes = ProcessModel. + diego. + runnable. + where(Sequel.lit("#{ProcessModel.table_name}.id > ?", last_id)). + order("#{ProcessModel.table_name}__id".to_sym). + limit(BATCH_SIZE) + + processes.select("#{ProcessModel.table_name}__id".to_sym, "#{ProcessModel.table_name}__guid".to_sym, "#{ProcessModel.table_name}__version".to_sym, + "#{ProcessModel.table_name}__updated_at".to_sym) + end + def bbs_apps_client CloudController::DependencyLocator.instance.bbs_apps_client end