From e2e26003cba40c6463524a06f82d18868f16963a Mon Sep 17 00:00:00 2001 From: Philip Laine Date: Wed, 14 Aug 2024 21:27:32 +0200 Subject: [PATCH] refactor: remove unnecessary retry logic from data injection (#2867) Signed-off-by: Philip Laine --- src/pkg/cluster/data.go | 181 +++++++++++++++++++--------------------- 1 file changed, 85 insertions(+), 96 deletions(-) diff --git a/src/pkg/cluster/data.go b/src/pkg/cluster/data.go index bd4b51aea5..68148003a7 100644 --- a/src/pkg/cluster/data.go +++ b/src/pkg/cluster/data.go @@ -60,109 +60,100 @@ func (c *Cluster) HandleDataInjection(ctx context.Context, data v1alpha1.ZarfDat return fmt.Errorf("unable to execute tar, ensure it is installed in the $PATH: %w", err) } - // TODO: Refactor to use retry. - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - message.Debugf("Attempting to inject data into %s", data.Target) - source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path)) - if helpers.InvalidPath(source) { - // The path is likely invalid because of how we compose OCI components, add an index suffix to the filename - source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path)) - if helpers.InvalidPath(source) { - return fmt.Errorf("could not find the data injection source path %s", source) - } - } - - target := podLookup{ - Namespace: data.Target.Namespace, - Selector: data.Target.Selector, - Container: data.Target.Container, - } - - // Wait until the pod we are injecting data into becomes available - pods, err := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer) - if err != nil { - return err - } - if len(pods) < 1 { - continue - } + message.Debugf("Attempting to inject data into %s", data.Target) + + source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path)) + if helpers.InvalidPath(source) { + // The path is likely invalid because of how we compose OCI components, add an index suffix to the filename + source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path)) + if helpers.InvalidPath(source) { + return fmt.Errorf("could not find the data injection source path %s", source) + } + } - // Inject into all the pods - for _, pod := range pods { - // Try to use the embedded kubectl if we can - zarfCommand, err := utils.GetFinalExecutableCommand() - kubectlBinPath := "kubectl" - if err != nil { - message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err) - } else { - kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand) - } - kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container) + // Wait until the pod we are injecting data into becomes available + target := podLookup{ + Namespace: data.Target.Namespace, + Selector: data.Target.Selector, + Container: data.Target.Container, + } + waitCtx, waitCancel := context.WithTimeout(ctx, 90*time.Second) + defer waitCancel() + pods, err := waitForPodsAndContainers(waitCtx, c.Clientset, target, podFilterByInitContainer) + if err != nil { + return err + } - // Note that each command flag is separated to provide the widest cross-platform tar support - tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag) - untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path) + // Inject into all the pods + for _, pod := range pods { + // Try to use the embedded kubectl if we can + zarfCommand, err := utils.GetFinalExecutableCommand() + kubectlBinPath := "kubectl" + if err != nil { + message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err) + } else { + kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand) + } + kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container) - // Must create the target directory before trying to change to it for untar - mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path) - if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil { - return fmt.Errorf("unable to create the data injection target directory %s in pod %s: %w", data.Target.Path, pod.Name, err) - } + // Note that each command flag is separated to provide the widest cross-platform tar support + tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag) + untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path) - cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s", - tarCmd, - source, - kubectlCmd, - untarCmd, - ) + // Must create the target directory before trying to change to it for untar + mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path) + if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil { + return fmt.Errorf("unable to create the data injection target directory %s in pod %s: %w", data.Target.Path, pod.Name, err) + } - // Do the actual data injection - if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil { - return fmt.Errorf("could not copy data into the pod %s: %w", pod.Name, err) - } + cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s", + tarCmd, + source, + kubectlCmd, + untarCmd, + ) - // Leave a marker in the target container for pods to track the sync action - cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s", - tarCmd, - componentPath.DataInjections, - config.GetDataInjectionMarker(), - kubectlCmd, - untarCmd, - ) - - if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil { - return fmt.Errorf("could not save the Zarf sync completion file after injection into pod %s: %w", pod.Name, err) - } - } + // Do the actual data injection + if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil { + return fmt.Errorf("could not copy data into the pod %s: %w", pod.Name, err) + } - // Do not look for a specific container after injection in case they are running an init container - podOnlyTarget := podLookup{ - Namespace: data.Target.Namespace, - Selector: data.Target.Selector, - } + // Leave a marker in the target container for pods to track the sync action + cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s", + tarCmd, + componentPath.DataInjections, + config.GetDataInjectionMarker(), + kubectlCmd, + untarCmd, + ) + + if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil { + return fmt.Errorf("could not save the Zarf sync completion file after injection into pod %s: %w", pod.Name, err) + } + } - // Block one final time to make sure at least one pod has come up and injected the data - // Using only the pod as the final selector because we don't know what the container name will be - // Still using the init container filter to make sure we have the right running pod - _, err = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer) - if err != nil { - return err - } + // Do not look for a specific container after injection in case they are running an init container + podOnlyTarget := podLookup{ + Namespace: data.Target.Namespace, + Selector: data.Target.Selector, + } - // Cleanup now to reduce disk pressure - err = os.RemoveAll(source) - if err != nil { - return err - } + // Block one final time to make sure at least one pod has come up and injected the data + // Using only the pod as the final selector because we don't know what the container name will be + // Still using the init container filter to make sure we have the right running pod + _, err = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer) + if err != nil { + return err + } - // Return to stop the loop - return nil - } + // Cleanup now to reduce disk pressure + err = os.RemoveAll(source) + if err != nil { + return err } + + // Return to stop the loop + return nil } // podLookup is a struct for specifying a pod to target for data injection or lookups. @@ -180,13 +171,11 @@ type podFilter func(pod corev1.Pod) bool // If the timeout is reached, an empty list will be returned. // TODO: Test, refactor and/or remove. func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) ([]corev1.Pod, error) { - waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second) - defer cancel() readyPods, err := retry.DoWithData(func() ([]corev1.Pod, error) { listOpts := metav1.ListOptions{ LabelSelector: target.Selector, } - podList, err := clientset.CoreV1().Pods(target.Namespace).List(waitCtx, listOpts) + podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts) if err != nil { return nil, err } @@ -241,7 +230,7 @@ func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interfac return nil, fmt.Errorf("no ready pods found") } return readyPods, nil - }, retry.Context(waitCtx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second)) + }, retry.Context(ctx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second)) if err != nil { return nil, err }