Skip to content

Commit

Permalink
Enable SDS writing
Browse files Browse the repository at this point in the history
  • Loading branch information
howardjohn committed Aug 17, 2022
1 parent 1199aae commit 0f951b8
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 36 deletions.
156 changes: 120 additions & 36 deletions pkg/simulation/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path"
"path/filepath"
"sort"
"strings"

Expand All @@ -14,10 +15,12 @@ import (
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
tls "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/howardjohn/pilot-load/adsc"
"github.com/howardjohn/pilot-load/pkg/simulation/model"
"github.com/howardjohn/pilot-load/pkg/simulation/security"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -88,38 +91,24 @@ func (i *DumpSimulation) Run(ctx model.Context) error {
return err
}
log.Infof("response received")
return i.write(resp)
return nil
//xsim := xds.Simulation{
// Labels: pod.Labels,
// Metadata: meta,
// Namespace: pod.Namespace,
// Name: pod.Name,
// IP: ip,
// Cluster: "Kubernetes",
// PodType: "sidecar", // TODO: support ingress?
// GrpcOpts: ctx.Args.Auth.GrpcOptions(pod.Spec.ServiceAccountName, pod.Namespace),
// Delta: ctx.Args.DeltaXDS,
//}
//log.Infof("Starting pod %v/%v (%v)", pod.Name, pod.Namespace, ip)
//go func() {
// if err := xsim.Run(ctx); err != nil {
// log.Errorf("failed running %v: %v", ip, err)
// }
//
// close(done)
//}()
//return nil
cert, err := ctx.Args.Auth.Certificate(ctx.Client.FetchRootCert, ctx.Args.PilotAddress, pod.Spec.ServiceAccountName, pod.Namespace)
if err != nil {
return fmt.Errorf("failed to create cert: %v", err)
}
log.Infof("certificate received")
_ = cert
return i.write(resp, cert)
}

func (i *DumpSimulation) write(resp *adsc.Responses) error {
func (i *DumpSimulation) write(resp *adsc.Responses, cert security.Cert) error {
if i.Spec.OutputDir != "" {
_ = os.MkdirAll(i.Spec.OutputDir, 0o777)
_ = os.MkdirAll(i.Spec.OutputDir+"/rds", 0o777)
_ = os.MkdirAll(i.Spec.OutputDir+"/eds", 0o777)
_ = os.MkdirAll(i.Spec.OutputDir+"/sds", 0o777)
}
writeResponse(clusterResponse(transmute[*cluster.Cluster](resp.Clusters)), i.Spec.OutputDir, "cds.yaml")
writeResponse(listenerResponse(transmute[*listener.Listener](resp.Listeners)), i.Spec.OutputDir, "lds.yaml")
writeResponse(clusterResponse(i.Spec.OutputDir, transmute[*cluster.Cluster](resp.Clusters)), i.Spec.OutputDir, "cds.yaml")
writeResponse(listenerResponse(i.Spec.OutputDir, transmute[*listener.Listener](resp.Listeners)), i.Spec.OutputDir, "lds.yaml")
for name, rt := range resp.Routes {
writeResponse(routesResponse([]*route.RouteConfiguration{rt.(*route.RouteConfiguration)}), i.Spec.OutputDir, fmt.Sprintf("rds/%s.yaml", SanitizeName(name)))
}
Expand All @@ -128,6 +117,10 @@ func (i *DumpSimulation) write(resp *adsc.Responses) error {
}

writeBytes(bootstrap(i.Spec.OutputDir), i.Spec.OutputDir, "config.yaml")

writeResponse(secretResponse(i.Spec.OutputDir, cert), i.Spec.OutputDir, "sds/default.yaml")
writeResponse(secretRootResponse(i.Spec.OutputDir, cert), i.Spec.OutputDir, "sds/ROOTCA.yaml")

return nil
}

Expand Down Expand Up @@ -168,13 +161,13 @@ func endpointsResponse(response []*endpoint.ClusterLoadAssignment) *discovery.Di
return out
}

func clusterResponse(response []*cluster.Cluster) *discovery.DiscoveryResponse {
func clusterResponse(path string, response []*cluster.Cluster) *discovery.DiscoveryResponse {
out := &discovery.DiscoveryResponse{
TypeUrl: v3.ClusterType,
VersionInfo: "0",
}

sanitizeClusterAds(response)
sanitizeClusterAds(path, response)

for _, c := range response {
cc, _ := anypb.New(c)
Expand All @@ -184,13 +177,64 @@ func clusterResponse(response []*cluster.Cluster) *discovery.DiscoveryResponse {
return out
}

func listenerResponse(response []*listener.Listener) *discovery.DiscoveryResponse {
func secretResponse(path string, cert security.Cert) *discovery.DiscoveryResponse {
out := &discovery.DiscoveryResponse{
TypeUrl: v3.SecretType,
VersionInfo: "0",
}
secret := &tls.Secret{
Name: "default",
Type: &tls.Secret_TlsCertificate{
TlsCertificate: &tls.TlsCertificate{
CertificateChain: &core.DataSource{
Specifier: &core.DataSource_InlineBytes{
InlineBytes: cert.ClientCert,
},
},
PrivateKey: &core.DataSource{
Specifier: &core.DataSource_InlineBytes{
InlineBytes: cert.Key,
},
},
},
},
}
cc, _ := anypb.New(secret)
out.Resources = append(out.Resources, cc)

return out
}

func secretRootResponse(path string, cert security.Cert) *discovery.DiscoveryResponse {
out := &discovery.DiscoveryResponse{
TypeUrl: v3.SecretType,
VersionInfo: "0",
}
secret := &tls.Secret{
Name: "ROOTCA",
Type: &tls.Secret_ValidationContext{
ValidationContext: &tls.CertificateValidationContext{
TrustedCa: &core.DataSource{
Specifier: &core.DataSource_InlineBytes{
InlineBytes: cert.RootCert,
},
},
},
},
}
cc, _ := anypb.New(secret)
out.Resources = append(out.Resources, cc)

return out
}

func listenerResponse(path string, response []*listener.Listener) *discovery.DiscoveryResponse {
out := &discovery.DiscoveryResponse{
TypeUrl: v3.ListenerType,
VersionInfo: "0",
}

sanitizeListenerAds(response)
sanitizeListenerAds(path, response)

for _, c := range response {
cc, _ := anypb.New(c)
Expand All @@ -214,21 +258,57 @@ func routesResponse(response []*route.RouteConfiguration) *discovery.DiscoveryRe
return out
}

func sanitizeClusterAds(response []*cluster.Cluster) {
func sanitizeClusterAds(path string, response []*cluster.Cluster) {
for _, r := range response {
rewriteTransportSocket(path, r.TransportSocket)
for _, tsm := range r.TransportSocketMatches {
rewriteTransportSocket(path, tsm.TransportSocket)
}
if r.EdsClusterConfig == nil {
continue
}
path := fmt.Sprintf("/etc/config/eds/%s.yaml", SanitizeName(r.Name))
r.EdsClusterConfig.EdsConfig = &core.ConfigSource{
ConfigSourceSpecifier: &core.ConfigSource_Path{Path: path},
path := filepath.Join(path, "eds", SanitizeName(r.Name)+".yaml")
r.EdsClusterConfig.EdsConfig = toPath(path)
}
}

func rewriteTransportSocket(path string, s *core.TransportSocket) {
if s == nil {
return
}
if s.GetTypedConfig().TypeUrl == TypeName[*tls.DownstreamTlsContext]() {
tl := SilentlyUnmarshalAny[tls.DownstreamTlsContext](s.GetTypedConfig())
for _, sds := range tl.CommonTlsContext.TlsCertificateSdsSecretConfigs {
sds.SdsConfig = toPath(filepath.Join(path, "sds", SanitizeName(sds.Name)+".yaml"))
}
if v := tl.CommonTlsContext.GetValidationContextSdsSecretConfig(); v != nil {
v.SdsConfig = toPath(filepath.Join(path, "sds", SanitizeName(v.Name)+".yaml"))
}
if v := tl.CommonTlsContext.GetCombinedValidationContext().GetValidationContextSdsSecretConfig(); v != nil {
v.SdsConfig = toPath(filepath.Join(path, "sds", SanitizeName(v.Name)+".yaml"))
}
s.ConfigType = &core.TransportSocket_TypedConfig{TypedConfig: protoconv.MessageToAny(tl)}
return
}
if s.GetTypedConfig().TypeUrl == TypeName[*tls.UpstreamTlsContext]() {
tl := SilentlyUnmarshalAny[tls.UpstreamTlsContext](s.GetTypedConfig())
for _, sds := range tl.CommonTlsContext.TlsCertificateSdsSecretConfigs {
sds.SdsConfig = toPath(filepath.Join(path, "sds", SanitizeName(sds.Name)+".yaml"))
}
if v := tl.CommonTlsContext.GetValidationContextSdsSecretConfig(); v != nil {
v.SdsConfig = toPath(filepath.Join(path, "sds", SanitizeName(v.Name)+".yaml"))
}
if v := tl.CommonTlsContext.GetCombinedValidationContext().GetValidationContextSdsSecretConfig(); v != nil {
v.SdsConfig = toPath(filepath.Join(path, "sds", SanitizeName(v.Name)+".yaml"))
}
s.ConfigType = &core.TransportSocket_TypedConfig{TypedConfig: protoconv.MessageToAny(tl)}
}
}

func sanitizeListenerAds(response []*listener.Listener) {
func sanitizeListenerAds(path string, response []*listener.Listener) {
for _, c := range response {
for _, fc := range filterChains(c) {
rewriteTransportSocket(path, fc.TransportSocket)
for _, f := range fc.Filters {
if f.GetTypedConfig() == nil {
continue
Expand All @@ -239,9 +319,8 @@ func sanitizeListenerAds(response []*listener.Listener) {
switch r := h.GetRouteSpecifier().(type) {
case *hcm.HttpConnectionManager_Rds:
routeName := r.Rds.RouteConfigName
path := fmt.Sprintf("/etc/config/rds/%s.yaml", SanitizeName(routeName))
h.RouteSpecifier = &hcm.HttpConnectionManager_Rds{Rds: &hcm.Rds{
ConfigSource: toPath(path),
ConfigSource: toPath(filepath.Join(path, "rds", SanitizeName(routeName)+".yaml")),
RouteConfigName: "routeName",
}}
f.ConfigType = &listener.Filter_TypedConfig{TypedConfig: protoconv.MessageToAny(h)}
Expand Down Expand Up @@ -332,3 +411,8 @@ func MarshallYaml(w proto.Message) []byte {
func SanitizeName(name string) string {
return strings.ReplaceAll(name, "|", "_.")
}

func TypeName[T proto.Message]() string {
ft := new(T)
return "type.googleapis.com/" + string((*ft).ProtoReflect().Descriptor().FullName())
}
49 changes: 49 additions & 0 deletions pkg/simulation/security/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@ import (
"fmt"
"net"
"strings"
"time"

"github.com/howardjohn/pilot-load/pkg/kube"
"google.golang.org/api/cloudresourcemanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"

pb "istio.io/api/security/v1alpha1"
"istio.io/istio/pkg/bootstrap/platform"
"istio.io/istio/pkg/security"
"istio.io/istio/security/pkg/nodeagent/plugin/providers/google/stsclient"
pkiutil "istio.io/istio/security/pkg/pki/util"
"istio.io/istio/security/pkg/stsservice"
"istio.io/istio/security/pkg/stsservice/server"
"istio.io/istio/security/pkg/stsservice/tokenmanager/google"
Expand Down Expand Up @@ -137,6 +141,51 @@ func (a *AuthOptions) AutoPopulate() error {
return nil
}

func (a *AuthOptions) Certificate(fetchRoot func() (string, error), addr, serviceAccount, namespace string) (Cert, error) {
rootCert, err := fetchRoot()
if err != nil {
return Cert{}, fmt.Errorf("failed to fetch root cert: %v", err)
}

token, err := GetServiceAccountToken(a.Client, "istio-ca", namespace, serviceAccount)
if err != nil {
return Cert{}, err
}

san := fmt.Sprintf("spiffe://%s/ns/%s/sa/%s", "cluster.local", namespace, serviceAccount)
options := pkiutil.CertOptions{
Host: san,
RSAKeySize: 2048,
}
// Generate the cert/key, send CSR to CA.
csrPEM, keyPEM, err := pkiutil.GenCSR(options)
if err != nil {
return Cert{}, err
}
client, err := newCitadelClient(addr, []byte(rootCert))
if err != nil {
return Cert{}, fmt.Errorf("creating citadel client: %v", err)
}
req := &pb.IstioCertificateRequest{
Csr: string(csrPEM),
ValidityDuration: int64((time.Hour * 24 * 7).Seconds()),
}
rctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("Authorization", "Bearer "+token, "ClusterID", "Kubernetes"))
resp, err := client.CreateCertificate(rctx, req)
if err != nil {
return Cert{}, fmt.Errorf("send CSR: %v", err)
}
certChain := []byte{}
for _, c := range resp.CertChain {
certChain = append(certChain, []byte(c)...)
}
return Cert{certChain, keyPEM, []byte(rootCert)}, nil
}

type Cert struct {
ClientCert, Key, RootCert []byte
}

func (a *AuthOptions) GrpcOptions(serviceAccount, namespace string) []grpc.DialOption {
insecureTls := grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
Expand Down
28 changes: 28 additions & 0 deletions pkg/simulation/security/cert.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package security

import (
"crypto/tls"
"crypto/x509"
"fmt"
"sync"
"time"

"github.com/howardjohn/pilot-load/pkg/kube"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

pb "istio.io/api/security/v1alpha1"
)

// map of SAN to jwt token. Used to avoid repetitive calls
Expand Down Expand Up @@ -43,3 +49,25 @@ func GetServiceAccountToken(c *kube.Client, aud, ns, sa string) (string, error)
cachedTokens.Store(san, token{t, exp})
return t, nil
}

// NewCitadelClient create a CA client for Citadel.
func newCitadelClient(endpoint string, rootCert []byte) (pb.IstioCertificateServiceClient, error) {
certPool := x509.NewCertPool()
ok := certPool.AppendCertsFromPEM(rootCert)
if !ok {
return nil, fmt.Errorf("failed to append certificates")
}
config := tls.Config{
RootCAs: certPool,
InsecureSkipVerify: true,
}
transportCreds := credentials.NewTLS(&config)

conn, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(transportCreds))
if err != nil {
return nil, fmt.Errorf("failed to connect to endpoint %s", endpoint)
}

client := pb.NewIstioCertificateServiceClient(conn)
return client, nil
}

0 comments on commit 0f951b8

Please sign in to comment.