Skip to content

Commit

Permalink
Update diego sync to not fetch every bit of every object, only comple…
Browse files Browse the repository at this point in the history
…tely fetch objects that need syncing (#3503)
  • Loading branch information
Benjamin Fuller authored Nov 6, 2023
1 parent e92ef7b commit cb6e7c6
Showing 1 changed file with 66 additions and 27 deletions.
93 changes: 66 additions & 27 deletions lib/cloud_controller/diego/processes_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,24 @@ def sync
@bump_freshness = true
diego_lrps = bbs_apps_client.fetch_scheduling_infos.index_by { |d| d.desired_lrp_key.process_guid }
logger.info('fetched-scheduling-infos')

batched_processes do |processes|
to_desire = []
to_update = {}
batched_processes_for_sync do |processes|
processes.each do |process|
process_guid = ProcessGuid.from_process(process)
diego_lrp = diego_lrps.delete(process_guid)

if diego_lrp.nil?
workpool.submit(process) do |p|
logger.info('desiring-lrp', process_guid: p.guid, app_guid: p.app_guid)
bbs_apps_client.desire_app(p)
logger.info('desire-lrp', process_guid: p.guid)
end
to_desire.append(process.id)
elsif process.updated_at.to_f.to_s != diego_lrp.annotation
workpool.submit(process, diego_lrp) do |p, l|
logger.info('updating-lrp', process_guid: p.guid, app_guid: p.app_guid)
bbs_apps_client.update_app(p, l)
logger.info('update-lrp', process_guid: p.guid)
end
to_update[process.id] = diego_lrp
end
end
end

diego_lrps.each_key do |process_guid_to_delete|
workpool.submit(process_guid_to_delete) do |guid|
logger.info('deleting-lrp', process_guid: guid)
bbs_apps_client.stop_app(guid)
logger.info('delete-lrp', process_guid: guid)
end
end
update_lrps(to_update)
desire_lrps(to_desire)
delete_lrps(diego_lrps)

workpool.drain

Expand Down Expand Up @@ -95,31 +84,69 @@ def process_workpool_exceptions(exceptions)
@statsd_updater.update_synced_invalid_lrps(invalid_lrps)
end

def update_lrps(to_update)
batched_processes(to_update.keys) do |processes|
processes.each do |process|
workpool.submit(process, to_update[process.id]) do |p, l|
logger.info('updating-lrp', process_guid: p.guid, app_guid: p.app_guid)
bbs_apps_client.update_app(p, l)
logger.info('update-lrp', process_guid: p.guid)
end
end
end
end

def desire_lrps(to_desire)
batched_processes(to_desire) do |processes|
processes.each do |process|
workpool.submit(process) do |p|
logger.info('desiring-lrp', process_guid: p.guid, app_guid: p.app_guid)
bbs_apps_client.desire_app(p)
logger.info('desire-lrp', process_guid: p.guid)
end
end
end
end

def delete_lrps(to_delete)
to_delete.each_key do |process_guid_to_delete|
workpool.submit(process_guid_to_delete) do |guid|
logger.info('deleting-lrp', process_guid: guid)
bbs_apps_client.stop_app(guid)
logger.info('delete-lrp', process_guid: guid)
end
end
end

def formatted_backtrace_from_error(error)
error.backtrace.present? ? error.backtrace.join("\n") + "\n..." : ''
end

def batched_processes
def batched_processes_for_sync
last_id = 0

loop do
processes = processes(last_id).all
processes = processes_for_sync(last_id).all
yield processes
return if processes.count < BATCH_SIZE

last_id = processes.last.id
end
end

def processes(last_id)
def batched_processes(ids)
ids.each_slice(BATCH_SIZE) do |id_chunk|
processes = processes(id_chunk).all
yield processes
end
end

def processes(ids)
processes = ProcessModel.
diego.
runnable.
where(Sequel.lit("#{ProcessModel.table_name}.id > ?", last_id)).
order("#{ProcessModel.table_name}__id".to_sym).
eager(:desired_droplet, :space, :service_bindings, { routes: :domain }, { app: :buildpack_lifecycle_data }).
limit(BATCH_SIZE)

where(Sequel.lit("#{ProcessModel.table_name}.id IN ?", ids)).
eager(:desired_droplet, :space, :service_bindings, { routes: :domain }, { app: :buildpack_lifecycle_data })
if FeatureFlag.enabled?(:diego_docker)
processes.select_all(ProcessModel.table_name)
else
Expand All @@ -128,6 +155,18 @@ def processes(last_id)
end
end

def processes_for_sync(last_id)
processes = ProcessModel.
diego.
runnable.
where(Sequel.lit("#{ProcessModel.table_name}.id > ?", last_id)).
order("#{ProcessModel.table_name}__id".to_sym).
limit(BATCH_SIZE)

processes.select("#{ProcessModel.table_name}__id".to_sym, "#{ProcessModel.table_name}__guid".to_sym, "#{ProcessModel.table_name}__version".to_sym,
"#{ProcessModel.table_name}__updated_at".to_sym)
end

def bbs_apps_client
CloudController::DependencyLocator.instance.bbs_apps_client
end
Expand Down

0 comments on commit cb6e7c6

Please sign in to comment.