diff --git a/adsc/adsc.go b/adsc/adsc.go index 5155bab..a27ded4 100644 --- a/adsc/adsc.go +++ b/adsc/adsc.go @@ -20,9 +20,9 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" structpb "github.com/golang/protobuf/ptypes/struct" "google.golang.org/grpc" + "google.golang.org/protobuf/proto" "istio.io/pkg/log" ) diff --git a/adsc/imports.go b/adsc/imports.go index 0b19d2b..eb0ce4d 100644 --- a/adsc/imports.go +++ b/adsc/imports.go @@ -6,6 +6,7 @@ import ( udpa "github.com/cncf/udpa/go/udpa/type/v1" _ "istio.io/api/envoy/config/filter/http/alpn/v2alpha1" + _ "istio.io/api/envoy/config/filter/network/metadata_exchange" _ "istio.io/istio/pkg/config/xds" ) diff --git a/cmd/cmd.go b/cmd/cmd.go index 867877e..bb50692 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -156,6 +156,7 @@ func init() { startupCmd, xdsLatencyCmd, reproduceCmd, + dumpCmd, ) } diff --git a/cmd/dump.go b/cmd/dump.go new file mode 100644 index 0000000..46862e7 --- /dev/null +++ b/cmd/dump.go @@ -0,0 +1,37 @@ +package cmd + +import ( + "fmt" + + "github.com/howardjohn/pilot-load/pkg/simulation" + "github.com/howardjohn/pilot-load/pkg/simulation/model" + "github.com/spf13/cobra" +) + +var dumpConfig = model.DumpConfig{} + +func init() { + dumpCmd.PersistentFlags().StringVar(&dumpConfig.Pod, "pod", dumpConfig.Pod, "pod to dump from") + dumpCmd.PersistentFlags().StringVar(&dumpConfig.Namespace, "namespace", dumpConfig.Namespace, "namespace to dump from") + dumpCmd.PersistentFlags().StringVar(&dumpConfig.OutputDir, "out", dumpConfig.OutputDir, "output directory") +} + +var dumpCmd = &cobra.Command{ + Use: "dump", + Short: "dump XDS for a pod to file, rewritten to be runnable with only files", + RunE: func(cmd *cobra.Command, _ []string) error { + args, err := GetArgs() + if err != nil { + return err + } + if dumpConfig.Pod == "" { + return fmt.Errorf("--pod required") + } + if dumpConfig.Namespace == "" { + return fmt.Errorf("--namespace required") + } + args.DumpConfig = dumpConfig + logConfig(args.DumpConfig) + return simulation.Dump(args) + }, +} diff --git a/go.mod b/go.mod index 7f4b7ae..64c713e 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/google/go-cmp v0.5.8 github.com/lthibault/jitterbug v2.0.0+incompatible github.com/spf13/cobra v1.5.0 + golang.org/x/exp v0.0.0-20220407100705-7b9b53b0aca4 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 google.golang.org/api v0.93.0 google.golang.org/grpc v1.48.0 diff --git a/go.sum b/go.sum index 7ec528c..9f2d036 100644 --- a/go.sum +++ b/go.sum @@ -417,6 +417,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220407100705-7b9b53b0aca4 h1:K3x+yU+fbot38x5bQbU2QqUAVyYLEktdNH2GxZLnM3U= +golang.org/x/exp v0.0.0-20220407100705-7b9b53b0aca4/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/pkg/simulation/determinism.go b/pkg/simulation/determinism.go index 0980a5b..df93221 100644 --- a/pkg/simulation/determinism.go +++ b/pkg/simulation/determinism.go @@ -4,17 +4,18 @@ import ( "fmt" "sync" - "github.com/golang/protobuf/proto" "github.com/google/go-cmp/cmp" + "github.com/howardjohn/pilot-load/adsc" + "github.com/howardjohn/pilot-load/pkg/simulation/model" + "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/testing/protocmp" - "istio.io/pkg/log" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" klabels "k8s.io/apimachinery/pkg/labels" corev1 "k8s.io/client-go/informers/core/v1" - "github.com/howardjohn/pilot-load/adsc" - "github.com/howardjohn/pilot-load/pkg/simulation/model" + "istio.io/pkg/log" ) type DeterministicSimulation struct{} @@ -93,11 +94,11 @@ func (d DeterministicSimulation) checkPod(ctx model.Context, pod *v1.Pod, addres wg.Add(1) go func() { res, err := adsc.Fetch(addr, &adsc.Config{ - Namespace: pod.Namespace, - Workload: pod.Name, - Meta: meta, - IP: ip, - Context: ctx, + Namespace: pod.Namespace, + Workload: pod.Name, + Meta: meta, + IP: ip, + Context: ctx, StoreResponses: true, }) if err != nil { @@ -155,8 +156,8 @@ func compare(base, comp map[string]proto.Message) string { if diff := cmp.Diff(got, want, protocmp.Transform()); diff != "" { return fmt.Sprintf("proto diff: %v", diff) } - gots := marshaler.Text(got) - wants := marshaler.Text(want) + gots := marshaler.Format(got) + wants := marshaler.Format(want) if gots != wants { return fmt.Sprintf("text diff:\n%v\n%v\n", gots, wants) } @@ -164,7 +165,7 @@ func compare(base, comp map[string]proto.Message) string { return "" } -var marshaler = proto.TextMarshaler{ExpandAny: true} +var marshaler = prototext.MarshalOptions{} func (d DeterministicSimulation) Cleanup(ctx model.Context) error { return nil diff --git a/pkg/simulation/dump/dump.go b/pkg/simulation/dump/dump.go new file mode 100644 index 0000000..df8f5bb --- /dev/null +++ b/pkg/simulation/dump/dump.go @@ -0,0 +1,334 @@ +package dump + +import ( + "context" + "fmt" + "os" + "path" + "sort" + "strings" + + cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "github.com/howardjohn/pilot-load/adsc" + "github.com/howardjohn/pilot-load/pkg/simulation/model" + "golang.org/x/exp/maps" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "istio.io/istio/pilot/pkg/util/protoconv" + v3 "istio.io/istio/pilot/pkg/xds/v3" + "istio.io/istio/pkg/util/protomarshal" + "istio.io/pkg/log" +) + +type DumpSpec struct { + Pod string + Namespace string + + OutputDir string +} + +type DumpSimulation struct { + Spec DumpSpec + done []chan struct{} +} + +var _ model.Simulation = &DumpSimulation{} + +func NewSimulation(spec DumpSpec) *DumpSimulation { + return &DumpSimulation{Spec: spec} +} + +func (i *DumpSimulation) Run(ctx model.Context) error { + pod, err := ctx.Client.Kubernetes.CoreV1().Pods(i.Spec.Namespace).Get(context.Background(), i.Spec.Pod, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("pod not found: %v", err) + } + done := make(chan struct{}) + i.done = append(i.done, done) + ip := pod.Status.PodIP + + podMeta := map[string]string{} + for _, c := range pod.Spec.Containers { + for _, e := range c.Env { + if strings.HasPrefix(e.Name, "ISTIO_META_") && e.Value != "" { + podMeta[strings.TrimPrefix(e.Name, "ISTIO_META_")] = e.Value + } + } + } + + meta := clone(ctx.Args.Metadata) + meta["ISTIO_VERSION"] = "1.20.0-pilot-load" + meta["LABELS"] = pod.Labels + meta["NAMESPACE"] = pod.Namespace + meta["SERVICE_ACCOUNT"] = pod.Spec.ServiceAccountName + for k, v := range podMeta { + meta[k] = v + } + + resp, err := adsc.Fetch(ctx.Args.PilotAddress, &adsc.Config{ + Namespace: pod.Namespace, + Workload: pod.Name, + Meta: meta, + NodeType: "sidecar", // TODO: support ingress? + IP: ip, + Context: ctx, + GrpcOpts: ctx.Args.Auth.GrpcOptions(pod.Spec.ServiceAccountName, pod.Namespace), + StoreResponses: true, + }) + if err != nil { + return err + } + log.Infof("response received") + return i.write(resp) + return nil + //xsim := xds.Simulation{ + // Labels: pod.Labels, + // Metadata: meta, + // Namespace: pod.Namespace, + // Name: pod.Name, + // IP: ip, + // Cluster: "Kubernetes", + // PodType: "sidecar", // TODO: support ingress? + // GrpcOpts: ctx.Args.Auth.GrpcOptions(pod.Spec.ServiceAccountName, pod.Namespace), + // Delta: ctx.Args.DeltaXDS, + //} + //log.Infof("Starting pod %v/%v (%v)", pod.Name, pod.Namespace, ip) + //go func() { + // if err := xsim.Run(ctx); err != nil { + // log.Errorf("failed running %v: %v", ip, err) + // } + // + // close(done) + //}() + //return nil +} + +func (i *DumpSimulation) write(resp *adsc.Responses) error { + if i.Spec.OutputDir != "" { + _ = os.MkdirAll(i.Spec.OutputDir, 0o777) + _ = os.MkdirAll(i.Spec.OutputDir+"/rds", 0o777) + _ = os.MkdirAll(i.Spec.OutputDir+"/eds", 0o777) + } + writeResponse(clusterResponse(transmute[*cluster.Cluster](resp.Clusters)), i.Spec.OutputDir, "cds.yaml") + writeResponse(listenerResponse(transmute[*listener.Listener](resp.Listeners)), i.Spec.OutputDir, "lds.yaml") + for name, rt := range resp.Routes { + writeResponse(routesResponse([]*route.RouteConfiguration{rt.(*route.RouteConfiguration)}), i.Spec.OutputDir, fmt.Sprintf("rds/%s.yaml", SanitizeName(name))) + } + for name, ep := range resp.Endpoints { + writeResponse(endpointsResponse([]*endpoint.ClusterLoadAssignment{ep.(*endpoint.ClusterLoadAssignment)}), i.Spec.OutputDir, fmt.Sprintf("eds/%s.yaml", SanitizeName(name))) + } + + writeBytes(bootstrap(i.Spec.OutputDir), i.Spec.OutputDir, "config.yaml") + return nil +} + +func (i *DumpSimulation) Cleanup(ctx model.Context) error { + return nil +} + +func clone(m map[string]string) map[string]interface{} { + n := map[string]interface{}{} + for k, v := range m { + n[k] = v + } + return n +} + +func transmute[T proto.Message](resp map[string]proto.Message) []T { + keys := maps.Keys(resp) + sort.Strings(keys) + res := make([]T, 0, len(resp)) + for _, k := range keys { + m := resp[k] + res = append(res, m.(T)) + } + return res +} + +func endpointsResponse(response []*endpoint.ClusterLoadAssignment) *discovery.DiscoveryResponse { + out := &discovery.DiscoveryResponse{ + TypeUrl: v3.EndpointType, + VersionInfo: "0", + } + + for _, c := range response { + cc, _ := anypb.New(c) + out.Resources = append(out.Resources, cc) + } + + return out +} + +func clusterResponse(response []*cluster.Cluster) *discovery.DiscoveryResponse { + out := &discovery.DiscoveryResponse{ + TypeUrl: v3.ClusterType, + VersionInfo: "0", + } + + sanitizeClusterAds(response) + + for _, c := range response { + cc, _ := anypb.New(c) + out.Resources = append(out.Resources, cc) + } + + return out +} + +func listenerResponse(response []*listener.Listener) *discovery.DiscoveryResponse { + out := &discovery.DiscoveryResponse{ + TypeUrl: v3.ListenerType, + VersionInfo: "0", + } + + sanitizeListenerAds(response) + + for _, c := range response { + cc, _ := anypb.New(c) + out.Resources = append(out.Resources, cc) + } + + return out +} + +func routesResponse(response []*route.RouteConfiguration) *discovery.DiscoveryResponse { + out := &discovery.DiscoveryResponse{ + TypeUrl: v3.RouteType, + VersionInfo: "0", + } + + for _, c := range response { + cc, _ := anypb.New(c) + out.Resources = append(out.Resources, cc) + } + + return out +} + +func sanitizeClusterAds(response []*cluster.Cluster) { + for _, r := range response { + if r.EdsClusterConfig == nil { + continue + } + path := fmt.Sprintf("/etc/config/eds/%s.yaml", SanitizeName(r.Name)) + r.EdsClusterConfig.EdsConfig = &core.ConfigSource{ + ConfigSourceSpecifier: &core.ConfigSource_Path{Path: path}, + } + } +} + +func sanitizeListenerAds(response []*listener.Listener) { + for _, c := range response { + for _, fc := range filterChains(c) { + for _, f := range fc.Filters { + if f.GetTypedConfig() == nil { + continue + } + switch f.Name { + case wellknown.HTTPConnectionManager: + h := SilentlyUnmarshalAny[hcm.HttpConnectionManager](f.GetTypedConfig()) + switch r := h.GetRouteSpecifier().(type) { + case *hcm.HttpConnectionManager_Rds: + routeName := r.Rds.RouteConfigName + path := fmt.Sprintf("/etc/config/rds/%s.yaml", SanitizeName(routeName)) + h.RouteSpecifier = &hcm.HttpConnectionManager_Rds{Rds: &hcm.Rds{ + ConfigSource: toPath(path), + RouteConfigName: "routeName", + }} + f.ConfigType = &listener.Filter_TypedConfig{TypedConfig: protoconv.MessageToAny(h)} + } + default: + } + } + } + } +} + +func toPath(p string) *core.ConfigSource { + return &core.ConfigSource{ + ConfigSourceSpecifier: &core.ConfigSource_Path{Path: p}, + } +} + +func filterChains(c *listener.Listener) []*listener.FilterChain { + var chains []*listener.FilterChain + chains = append(chains, c.FilterChains...) + if c.DefaultFilterChain != nil { + chains = append(chains, c.DefaultFilterChain) + } + return chains +} + +func ExtractListenerNames(ll []*listener.Listener) []string { + res := []string{} + for _, l := range ll { + res = append(res, l.Name) + } + return res +} + +func SilentlyUnmarshalAny[T any](a *anypb.Any) *T { + dst := any(new(T)).(proto.Message) + if err := a.UnmarshalTo(dst); err != nil { + var z *T + return z + } + return any(dst).(*T) +} + +func writeResponse(r *discovery.DiscoveryResponse, dir string, file string) { + writeBytes(MarshallYaml(r), dir, file) +} + +func writeBytes(yaml []byte, dir string, file string) { + if dir == "" { + fmt.Println(string(yaml)) + } else { + if err := os.WriteFile(path.Join(dir, file), yaml, 0o777); err != nil { + panic(err) + } + } +} + +func bootstrap(outdir string) []byte { + return []byte(fmt.Sprintf(`node: + id: node + cluster: envoy +admin: + access_log_path: /dev/stdout + address: + socket_address: + address: 0.0.0.0 + port_value: 15000 +bootstrap_extensions: +- name: envoy.bootstrap.internal_listener + typed_config: + "@type": type.googleapis.com/udpa.type.v1.TypedStruct + type_url: type.googleapis.com/envoy.extensions.bootstrap.internal_listener.v3.InternalListener +dynamic_resources: + lds_config: + path: %s/lds.yaml + cds_config: + path: %s/cds.yaml`, outdir, outdir)) +} + +func MarshallYaml(w proto.Message) []byte { + b, err := protomarshal.ToYAML(w) + if err != nil { + panic(fmt.Sprintf("failed to marshal %v:\n%+v", err, nil)) + } + return []byte(b) +} + +func SanitizeName(name string) string { + return strings.ReplaceAll(name, "|", "_.") +} diff --git a/pkg/simulation/model/model.go b/pkg/simulation/model/model.go index b485cd6..63b052e 100644 --- a/pkg/simulation/model/model.go +++ b/pkg/simulation/model/model.go @@ -135,6 +135,12 @@ func (c ClusterConfig) ApplyDefaults() ClusterConfig { return *ret } +type DumpConfig struct { + Pod string + Namespace string + OutputDir string +} + type AdscConfig struct { Count int Delay time.Duration @@ -187,6 +193,7 @@ type Args struct { ProberConfig ProberConfig Metadata map[string]string DeltaXDS bool + DumpConfig DumpConfig } type Context struct { diff --git a/pkg/simulation/simulations.go b/pkg/simulation/simulations.go index 9b7be81..b9dc954 100644 --- a/pkg/simulation/simulations.go +++ b/pkg/simulation/simulations.go @@ -8,6 +8,7 @@ import ( "syscall" "github.com/howardjohn/pilot-load/pkg/simulation/cluster" + "github.com/howardjohn/pilot-load/pkg/simulation/dump" "github.com/howardjohn/pilot-load/pkg/simulation/gateway" "github.com/howardjohn/pilot-load/pkg/simulation/impersonate" "github.com/howardjohn/pilot-load/pkg/simulation/model" @@ -15,6 +16,7 @@ import ( "github.com/howardjohn/pilot-load/pkg/simulation/reproduce" "github.com/howardjohn/pilot-load/pkg/simulation/util" "github.com/howardjohn/pilot-load/pkg/simulation/xds" + "istio.io/pkg/log" ) @@ -87,6 +89,18 @@ func Cluster(a model.Args) error { return nil } +func Dump(a model.Args) error { + sim := dump.NewSimulation(dump.DumpSpec{ + Pod: a.DumpConfig.Pod, + Namespace: a.DumpConfig.Namespace, + OutputDir: a.DumpConfig.OutputDir, + }) + if err := ExecuteOneshot(a, sim); err != nil { + return fmt.Errorf("error executing: %v", err) + } + return nil +} + func Adsc(a model.Args) error { sims := []model.Simulation{} count := a.AdsConfig.Count @@ -136,6 +150,21 @@ func ExecuteSimulations(a model.Args, simulation model.Simulation) error { return simulation.Cleanup(simulationContext) } +func ExecuteOneshot(a model.Args, simulation model.Simulation) error { + ctx, cancel := context.WithCancel(context.Background()) + go captureTermination(ctx, cancel) + defer cancel() + go monitoring.StartMonitoring(ctx, 8765) + simulationContext := model.Context{Context: ctx, Args: a, Client: a.Client, Cancel: cancel} + if err := simulation.Run(simulationContext); err != nil { + log.Errorf("failed: %v, starting cleanup", err) + cleanupErr := simulation.Cleanup(simulationContext) + return fmt.Errorf("failed to run: %v; cleanup: %v", err, cleanupErr) + } + defer log.Infof("simulation completed successfully") + return simulation.Cleanup(simulationContext) +} + func captureTermination(ctx context.Context, cancel context.CancelFunc) { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) diff --git a/pkg/simulation/xds/xds.go b/pkg/simulation/xds/xds.go index fc1a9fb..c49d8ce 100644 --- a/pkg/simulation/xds/xds.go +++ b/pkg/simulation/xds/xds.go @@ -10,6 +10,7 @@ import ( type Simulation struct { Labels map[string]string + Metadata map[string]string Namespace string ServiceAccount string Name string @@ -48,6 +49,9 @@ func (x *Simulation) Run(ctx model.Context) error { meta["NAMESPACE"] = x.Namespace meta["SERVICE_ACCOUNT"] = x.ServiceAccount meta["SDS"] = "true" + for k, v := range x.Metadata { + meta[k] = v + } go func() { adsc.Connect(ctx.Args.PilotAddress, &adsc.Config{ Namespace: x.Namespace,