diff --git a/cmd/labapp/command/root.go b/cmd/labapp/command/root.go index 1249912b..59b71cd4 100644 --- a/cmd/labapp/command/root.go +++ b/cmd/labapp/command/root.go @@ -105,7 +105,6 @@ func appAction(c *cli.Context) error { if err != nil { return err } - ctx := cliutil.CommandContext(c) ctx, tracer, closer := traceutil.New(ctx, "labapp", nil) defer closer.Close() @@ -125,7 +124,6 @@ func appAction(c *cli.Context) error { ctx = opentracing.ContextWithSpan(ctx, span) } } - app, err := labapp.New(ctx, root, c.GlobalString("address"), c.GlobalInt("libp2p-port"), zerolog.Ctx(ctx), metadata.PeerDefinition{ Transports: c.GlobalStringSlice("libp2p-transports"), Muxers: c.GlobalStringSlice("libp2p-muxers"), diff --git a/cmd/labctl/command/debug.go b/cmd/labctl/command/debug.go index 445952e8..62076774 100644 --- a/cmd/labctl/command/debug.go +++ b/cmd/labctl/command/debug.go @@ -19,8 +19,11 @@ import ( "errors" "fmt" + "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/metadata" + "github.com/Netflix/p2plab/nodes" "github.com/Netflix/p2plab/pkg/cliutil" + "github.com/Netflix/p2plab/query" "github.com/urfave/cli" ) @@ -58,6 +61,19 @@ var debugCommand = cli.Command{ }, }, }, + { + Name: "connect", + Aliases: []string{"c"}, + Usage: "Connects a cluster together", + ArgsUsage: "", + Action: connectAction, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "query,q", + Usage: "Runs a query to filter the listed nodes.", + }, + }, + }, }, } @@ -103,3 +119,41 @@ func runTaskAction(c *cli.Context) error { return nil } + +func connectAction(c *cli.Context) error { + if c.NArg() != 1 { + return errors.New("cluster id must be provided") + } + + control, err := ResolveControl(c) + if err != nil { + return err + } + + var opts []p2plab.ListOption + ctx := cliutil.CommandContext(c) + if c.IsSet("query") { + q, err := query.Parse(ctx, c.String("query")) + if err != nil { + return err + } + + opts = append(opts, p2plab.WithQuery(q.String())) + } + + cluster := c.Args().First() + ns, err := control.Node().List(ctx, cluster) + if err != nil { + return err + } + if len(ns) == 0 { + return fmt.Errorf("No nodes found for %q", cluster) + } + + err = nodes.WaitHealthy(ctx, ns) + if err != nil { + return err + } + + return nodes.Connect(ctx, ns) +} diff --git a/cmd/labctl/command/experiment.go b/cmd/labctl/command/experiment.go index b8169eaf..5ac08df2 100644 --- a/cmd/labctl/command/experiment.go +++ b/cmd/labctl/command/experiment.go @@ -16,6 +16,7 @@ package command import ( "errors" + "fmt" "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/experiments" @@ -33,7 +34,7 @@ var experimentCommand = cli.Command{ Subcommands: []cli.Command{ { Name: "create", - Aliases: []string{"s"}, + Aliases: []string{"c"}, Usage: "Creates an experiment from a definition file", ArgsUsage: "", Action: createExperimentAction, @@ -42,6 +43,10 @@ var experimentCommand = cli.Command{ Name: "name", Usage: "Name of the experiment, by default takes the name of the experiment definition.", }, + &cli.BoolFlag{ + Name: "dry-run", + Usage: "dry run the epxeriment creation, parsing the cue file and printing it to stdout", + }, }, }, { @@ -96,7 +101,7 @@ func createExperimentAction(c *cli.Context) error { return errors.New("experiment definition must be provided") } - p, err := CommandPrinter(c, printer.OutputID) + p, err := CommandPrinter(c, printer.OutputJSON) if err != nil { return err } @@ -112,18 +117,32 @@ func createExperimentAction(c *cli.Context) error { return err } + if c.Bool("dry-run") { + dt, err := edef.ToJSON() + if err != nil { + return err + } + fmt.Println(string(dt)) + return nil + } + control, err := ResolveControl(c) if err != nil { return err } ctx := cliutil.CommandContext(c) - experiment, err := control.Experiment().Create(ctx, name, edef) + id, err := control.Experiment().Create(ctx, name, edef) if err != nil { return err } + experiment, err := control.Experiment().Get(ctx, id) + if err != nil { + return err + } zerolog.Ctx(ctx).Info().Msgf("Completed experiment %q", experiment.Metadata().ID) + return p.Print(experiment.Metadata()) } diff --git a/cue/cue.mod/Makefile b/cue/cue.mod/Makefile index 2419c931..043ec4f4 100644 --- a/cue/cue.mod/Makefile +++ b/cue/cue.mod/Makefile @@ -1,3 +1,3 @@ .PHONY: build build: - cue export p2plab_example.cue p2plab.cue + cue export p2plab_example1.cue p2plab.cue diff --git a/cue/cue.mod/p2plab.cue b/cue/cue.mod/p2plab.cue index cffc3516..ee84b627 100644 --- a/cue/cue.mod/p2plab.cue +++ b/cue/cue.mod/p2plab.cue @@ -9,6 +9,17 @@ Group :: { region: string // labels is an optional field labels?: [...string] + // although not optional if left unspecified + // then we use the default values of Peer + peer: Peer | *Peer +} + +Peer :: { + gitReference: string | *"HEAD" + transports: [...string] | *["tcp"] + muxers: [...string] | *["mplex"] + securityTransports: [...string] | *["secio"] + routing: string | *"nil" } // a cluster is a collection of 1 or more groups of nodes diff --git a/cue/cue.mod/p2plab_example1.cue b/cue/cue.mod/p2plab_example1.cue index 2822cb3b..361d8d0b 100644 --- a/cue/cue.mod/p2plab_example1.cue +++ b/cue/cue.mod/p2plab_example1.cue @@ -34,18 +34,17 @@ scen1:: Scenario & { "neighbors": "golang" } benchmark: { - "(not neighbors)": "golang" + "(not 'neighbors')": "golang" } } scen2:: Scenario & { objects: [ items ] seed: { - "neighbors": "golang" + "neighbors": "mysql" } benchmark: { - "(neighbors)": "golang" - "(not neighbors)": "mysql" + "(not 'neighbors')": "mysql" } } @@ -60,5 +59,4 @@ experiment: Experiment & { scenario: scen2 } ] -// trials: [[clust1,scen1],[clust1,scen2]] -} \ No newline at end of file +} diff --git a/cue/cue.mod/p2plab_example2.cue b/cue/cue.mod/p2plab_example2.cue index 29d80a4a..3ad0171a 100644 --- a/cue/cue.mod/p2plab_example2.cue +++ b/cue/cue.mod/p2plab_example2.cue @@ -21,12 +21,13 @@ experiment: Experiment & { "neighbors": "image" } benchmark: { - "(not neighbors)": "image" + "(not 'neighbors')": "image" } } } for o in objects ] } + objects :: [ [{ image: { @@ -40,4 +41,4 @@ objects :: [ source: "docker.io/library/mysql:latest" } }], -] \ No newline at end of file +] diff --git a/cue/parser/p2plab_instance.go b/cue/parser/p2plab_instance.go index 5d91a1a3..8189fd21 100644 --- a/cue/parser/p2plab_instance.go +++ b/cue/parser/p2plab_instance.go @@ -20,7 +20,7 @@ func (p *P2PLabInstance) ToExperimentDefinition() (metadata.ExperimentDefinition return metadata.ExperimentDefinition{}, err } return metadata.ExperimentDefinition{ - TrialDefinition: trials, + Trials: trials, }, nil } diff --git a/cue/parser/parser.go b/cue/parser/parser.go index 78e608b9..cd21dd11 100644 --- a/cue/parser/parser.go +++ b/cue/parser/parser.go @@ -4,6 +4,65 @@ import ( "cuelang.org/go/cue" ) +var ( + // CueTemplate contains the bare cue source template used to generate + // cue files + CueTemplate = ` + // defines a set of nodes of size 1 or higher + // a "node" is simply an EC2 instance provisioned of the given type + // and there may be more than 1 node in a group, however there must always be 1 + Group :: { + // must be greater than or equal to 1 + // default value of this field is 1 + size: >=1 | *1 + instanceType: string + region: string + // labels is an optional field + labels?: [...string] + // although not optional if left unspecified + // then we use the default values of Peer + peer: Peer | *Peer + } + + Peer :: { + gitReference: string | *"HEAD" + transports: [...string] | *["tcp"] + muxers: [...string] | *["mplex"] + securityTransports: [...string] | *["secio"] + routing: string | *"nil" + } + + // a cluster is a collection of 1 or more groups of nodes + // that will be participating in a given benchmark + Cluster :: { + groups: [...Group] + } + + // an object is a particular data format to be used in benchmarking + // typically these are container images + object :: [Name=_]: { + type: string + source: string + } + + Scenario :: { + objects: [...object] + seed: { ... } + // enable any fields for benchmark + benchmark: { ... } + } + + Trial :: { + cluster: Cluster + scenario: Scenario + } + + Experiment :: { + trials: [...Trial] + } +` +) + // Parser bundles the cue runtime with helper functions // to enable parsing of cue source files type Parser struct { diff --git a/experiment.go b/experiment.go index 25a8f70b..dd9a42ec 100644 --- a/experiment.go +++ b/experiment.go @@ -23,7 +23,7 @@ import ( // ExperimentAPI is an unimplemented layer to run experiments, a collection // of benchmarks while varying some aspect. type ExperimentAPI interface { - Create(ctx context.Context, id string, edef metadata.ExperimentDefinition) (Experiment, error) + Create(ctx context.Context, name string, edef metadata.ExperimentDefinition) (id string, err error) Get(ctx context.Context, id string) (Experiment, error) diff --git a/experiments/definition.go b/experiments/definition.go index 5b208172..8a82e2ec 100644 --- a/experiments/definition.go +++ b/experiments/definition.go @@ -15,23 +15,26 @@ package experiments import ( - "encoding/json" "io/ioutil" + "github.com/Netflix/p2plab/cue/parser" "github.com/Netflix/p2plab/metadata" ) +// Parse reads the cue source file at filename and converts it to a +// metadata.ExperimentDefinition type func Parse(filename string) (metadata.ExperimentDefinition, error) { - var edef metadata.ExperimentDefinition content, err := ioutil.ReadFile(filename) if err != nil { - return edef, err + return metadata.ExperimentDefinition{}, err } - - err = json.Unmarshal(content, &edef) + psr := parser.NewParser([]string{parser.CueTemplate}) + inst, err := psr.Compile( + "p2plab_experiment", + string(content), + ) if err != nil { - return edef, err + return metadata.ExperimentDefinition{}, err } - - return edef, nil + return inst.ToExperimentDefinition() } diff --git a/experiments/experiments_test.go b/experiments/experiments_test.go index bfe9a76f..e4595773 100644 --- a/experiments/experiments_test.go +++ b/experiments/experiments_test.go @@ -2,12 +2,10 @@ package experiments import ( "context" - "io/ioutil" "os" "strings" "testing" - parser "github.com/Netflix/p2plab/cue/parser" "github.com/Netflix/p2plab/metadata" "github.com/google/uuid" "github.com/stretchr/testify/require" @@ -85,17 +83,7 @@ func TestExperimentDefinition(t *testing.T) { } func newTestExperiment(t *testing.T, sourceFile, name string) metadata.Experiment { - data, err := ioutil.ReadFile("../cue/cue.mod/p2plab.cue") - require.NoError(t, err) - sourceData, err := ioutil.ReadFile(sourceFile) - require.NoError(t, err) - psr := parser.NewParser([]string{string(data)}) - inst, err := psr.Compile( - name, - string(sourceData), - ) - require.NoError(t, err) - edef, err := inst.ToExperimentDefinition() + edef, err := Parse(sourceFile) require.NoError(t, err) return metadata.Experiment{ ID: uuid.New().String(), diff --git a/labagent/supervisor/supervisor.go b/labagent/supervisor/supervisor.go index 168b3364..914f2d16 100644 --- a/labagent/supervisor/supervisor.go +++ b/labagent/supervisor/supervisor.go @@ -26,6 +26,7 @@ import ( "os" "os/exec" "path/filepath" + "sync" "syscall" "github.com/Netflix/p2plab/downloaders" @@ -42,13 +43,14 @@ type Supervisor interface { } type supervisor struct { - root string - appRoot string - appPort string - client *httputil.Client - fs *downloaders.Downloaders - app *exec.Cmd - cancel func() + root string + appRoot string + appPort string + app *exec.Cmd + client *httputil.Client + fs *downloaders.Downloaders + mu sync.Mutex + cancel func() } func New(root, appRoot, appAddr string, client *httputil.Client, fs *downloaders.Downloaders) (Supervisor, error) { @@ -68,15 +70,18 @@ func New(root, appRoot, appAddr string, client *httputil.Client, fs *downloaders } return &supervisor{ - root: root, - appRoot: appRoot, - appPort: appPort, - client: client, - fs: fs, + root: root, + appRoot: appRoot, + appPort: appPort, + client: client, + fs: fs, }, nil } func (s *supervisor) Supervise(ctx context.Context, id, link string, pdef metadata.PeerDefinition) error { + s.mu.Lock() + defer s.mu.Unlock() + err := s.kill(ctx) if err != nil { return err @@ -134,7 +139,7 @@ func (s *supervisor) start(ctx context.Context, flags []string) error { } v := new(bytes.Buffer) - versionCmd := s.cmdWithStdio(ctx, v, ioutil.Discard, "--version") + versionCmd := s.cmdWithStdio(actx, v, ioutil.Discard, "--version") err = versionCmd.Run() if err != nil { return err @@ -165,9 +170,15 @@ func (s *supervisor) wait(ctx context.Context, flags []string) error { go func() { <-ctx.Done() - err := s.app.Process.Signal(syscall.SIGTERM) - if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("failed to SIGTERM labapp") + if ctx.Err() != nil { + zerolog.Ctx(ctx).Error().Err(ctx.Err()).Msg("context error is not nil") + } + if s.app.Process != nil { + zerolog.Ctx(ctx).Info().Msg("Forwarding kill signal to labapp") + err := s.app.Process.Signal(syscall.SIGTERM) + if err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("failed to SIGTERM labapp") + } } }() @@ -220,7 +231,7 @@ func (s *supervisor) atomicReplaceBinary(ctx context.Context, link string) error defer span.Finish() span.SetTag("link", link) - zerolog.Ctx(ctx).Debug().Msg("Atomically replacing binary") + zerolog.Ctx(ctx).Debug().Str("root", s.root).Msg("Atomically replacing binary") u, err := url.Parse(link) if err != nil { return err diff --git a/labd/controlapi/experiment.go b/labd/controlapi/experiment.go index 7ec69f70..17531a45 100644 --- a/labd/controlapi/experiment.go +++ b/labd/controlapi/experiment.go @@ -23,6 +23,7 @@ import ( "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/metadata" "github.com/Netflix/p2plab/pkg/httputil" + "github.com/Netflix/p2plab/pkg/logutil" "github.com/pkg/errors" ) @@ -31,29 +32,31 @@ type experimentAPI struct { url urlFunc } -func (a *experimentAPI) Create(ctx context.Context, id string, edef metadata.ExperimentDefinition) (p2plab.Experiment, error) { - content, err := json.MarshalIndent(&edef, "", " ") +func (a *experimentAPI) Create(ctx context.Context, name string, edef metadata.ExperimentDefinition) (id string, err error) { + content, err := edef.ToJSON() if err != nil { - return nil, err + return id, err } req := a.client.NewRequest("POST", a.url("/experiments/create"), httputil.WithRetryMax(0)). - Option("id", id). + Option("id", name). Body(bytes.NewReader(content)) resp, err := req.Send(ctx) if err != nil { - return nil, err + return id, err } defer resp.Body.Close() - e := experiment{client: a.client} - err = json.NewDecoder(resp.Body).Decode(&e.metadata) - if err != nil { - return nil, err + logWriter := logutil.LogWriter(ctx) + if logWriter != nil { + err = logutil.WriteRemoteLogs(ctx, resp.Body, logWriter) + if err != nil { + return id, err + } } - return &e, nil + return resp.Header.Get(ResourceID), nil } func (a *experimentAPI) Get(ctx context.Context, id string) (p2plab.Experiment, error) { diff --git a/labd/routers/clusterrouter/router.go b/labd/routers/clusterrouter/router.go index b0471042..e4dc413e 100644 --- a/labd/routers/clusterrouter/router.go +++ b/labd/routers/clusterrouter/router.go @@ -22,26 +22,29 @@ import ( "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/daemon" - "github.com/Netflix/p2plab/labd/controlapi" + "github.com/Netflix/p2plab/labd/routers/helpers" "github.com/Netflix/p2plab/metadata" - "github.com/Netflix/p2plab/nodes" "github.com/Netflix/p2plab/pkg/httputil" "github.com/Netflix/p2plab/pkg/logutil" "github.com/Netflix/p2plab/pkg/stringutil" "github.com/Netflix/p2plab/query" - "github.com/pkg/errors" - "github.com/rs/zerolog" - bolt "go.etcd.io/bbolt" ) type router struct { db metadata.DB provider p2plab.NodeProvider client *httputil.Client + rhelper *helpers.Helper } +// New returns a new clutser router initialized with the router helpers func New(db metadata.DB, provider p2plab.NodeProvider, client *httputil.Client) daemon.Router { - return &router{db, provider, client} + return &router{ + db, + provider, + client, + helpers.New(db, provider, client), + } } func (s *router) Routes() []daemon.Route { @@ -84,73 +87,8 @@ func (s *router) postClustersCreate(ctx context.Context, w http.ResponseWriter, return err } - name := r.FormValue("name") - ctx, logger := logutil.WithResponseLogger(ctx, w) - logger.UpdateContext(func(c zerolog.Context) zerolog.Context { - return c.Str("name", name) - }) - - cluster := metadata.Cluster{ - ID: name, - Status: metadata.ClusterCreating, - Definition: cdef, - Labels: append([]string{ - name, - }, cdef.GenerateLabels()...), - } - - cluster, err = s.db.CreateCluster(ctx, cluster) - if err != nil { - return err - } - w.Header().Add(controlapi.ResourceID, name) - - zerolog.Ctx(ctx).Info().Msg("Creating node group") - ng, err := s.provider.CreateNodeGroup(ctx, name, cdef) - if err != nil { - return err - } - - zerolog.Ctx(ctx).Info().Msg("Updating metadata with new nodes") - var mns []metadata.Node - cluster.Status = metadata.ClusterConnecting - err = s.db.Update(ctx, func(tx *bolt.Tx) error { - var err error - tctx := metadata.WithTransactionContext(ctx, tx) - cluster, err = s.db.UpdateCluster(tctx, cluster) - if err != nil { - return err - } - - mns, err = s.db.CreateNodes(tctx, cluster.ID, ng.Nodes) - if err != nil { - return err - } - - return nil - }) - if err != nil { - return err - } - - var ns []p2plab.Node - for _, n := range mns { - ns = append(ns, controlapi.NewNode(s.client, n)) - } - - err = nodes.WaitHealthy(ctx, ns) - if err != nil { - return err - } - - zerolog.Ctx(ctx).Info().Msg("Updating cluster metadata") - cluster.Status = metadata.ClusterCreated - _, err = s.db.UpdateCluster(ctx, cluster) - if err != nil { - return err - } - - return nil + _, err = s.rhelper.CreateCluster(ctx, cdef, r.FormValue("name")) + return err } func (s *router) putClustersLabel(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { @@ -173,49 +111,14 @@ func (s *router) putClustersLabel(ctx context.Context, w http.ResponseWriter, r func (s *router) deleteClusters(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { names := strings.Split(r.FormValue("names"), ",") - ctx, logger := logutil.WithResponseLogger(ctx, w) + ctx, _ = logutil.WithResponseLogger(ctx, w) // TODO: parallelize with different color loggers? for _, name := range names { - logger := logger.With().Str("name", name).Logger() - ctx = logger.WithContext(ctx) - - cluster, err := s.db.GetCluster(ctx, name) + err := s.rhelper.DeleteCluster(ctx, name) if err != nil { - return errors.Wrapf(err, "failed to get cluster %q", name) - } - - if cluster.Status != metadata.ClusterDestroying { - cluster.Status = metadata.ClusterDestroying - cluster, err = s.db.UpdateCluster(ctx, cluster) - if err != nil { - return errors.Wrap(err, "failed to update cluster status to destroying") - } - } - - ns, err := s.db.ListNodes(ctx, cluster.ID) - if err != nil { - return errors.Wrap(err, "failed to list nodes") - } - - ng := &p2plab.NodeGroup{ - ID: cluster.ID, - Nodes: ns, - } - - logger.Info().Msg("Destroying node group") - err = s.provider.DestroyNodeGroup(ctx, ng) - if err != nil { - return errors.Wrap(err, "failed to destroy node group") - } - - logger.Info().Msg("Deleting cluster metadata") - err = s.db.DeleteCluster(ctx, cluster.ID) - if err != nil { - return errors.Wrap(err, "failed to delete cluster metadata") + return err } - - logger.Info().Msg("Destroyed cluster") } return nil diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 45a28b92..e1b7742a 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -16,20 +16,34 @@ package experimentrouter import ( "context" + "encoding/json" + "fmt" "net/http" + "os" "strings" + "sync" "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/daemon" + "github.com/Netflix/p2plab/labd/controlapi" + "github.com/Netflix/p2plab/labd/routers/helpers" "github.com/Netflix/p2plab/metadata" + "github.com/Netflix/p2plab/nodes" "github.com/Netflix/p2plab/peer" "github.com/Netflix/p2plab/pkg/httputil" + "github.com/Netflix/p2plab/pkg/logutil" "github.com/Netflix/p2plab/pkg/stringutil" "github.com/Netflix/p2plab/query" + "github.com/Netflix/p2plab/reports" + "github.com/Netflix/p2plab/scenarios" "github.com/Netflix/p2plab/transformers" "github.com/containerd/containerd/errdefs" "github.com/pkg/errors" + "github.com/rs/xid" "github.com/rs/zerolog" + jaeger "github.com/uber/jaeger-client-go" + bolt "go.etcd.io/bbolt" + "golang.org/x/sync/errgroup" ) type router struct { @@ -39,17 +53,27 @@ type router struct { ts *transformers.Transformers seeder *peer.Peer builder p2plab.Builder + rhelper *helpers.Helper } +// New returns a new experiment router initialized with the router helpers func New(db metadata.DB, provider p2plab.NodeProvider, client *httputil.Client, ts *transformers.Transformers, seeder *peer.Peer, builder p2plab.Builder) daemon.Router { - return &router{db, provider, client, ts, seeder, builder} + return &router{ + db, + provider, + client, + ts, + seeder, + builder, + helpers.New(db, provider, client), + } } func (s *router) Routes() []daemon.Route { return []daemon.Route{ // GET daemon.NewGetRoute("/experiments/json", s.getExperiments), - daemon.NewGetRoute("/experiments/{id}/json", s.getExperimentByName), + daemon.NewGetRoute("/experiments/{id}/json", s.getExperimentByID), // POST daemon.NewPostRoute("/experiments/create", s.postExperimentsCreate), // PUT @@ -68,8 +92,8 @@ func (s *router) getExperiments(ctx context.Context, w http.ResponseWriter, r *h return daemon.WriteJSON(w, &experiments) } -func (s *router) getExperimentByName(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { - id := vars["name"] +func (s *router) getExperimentByID(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + id := vars["id"] experiment, err := s.db.GetExperiment(ctx, id) if err != nil { return err @@ -79,7 +103,131 @@ func (s *router) getExperimentByName(ctx context.Context, w http.ResponseWriter, } func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { - return errors.New("unimplemented") + var edef metadata.ExperimentDefinition + err := json.NewDecoder(r.Body).Decode(&edef) + if err != nil { + return err + } + + eid := xid.New().String() + w.Header().Add(controlapi.ResourceID, eid) + + ctx, logger := logutil.WithResponseLogger(ctx, w) + logger.UpdateContext(func(c zerolog.Context) zerolog.Context { + return c.Str("eid", eid) + }) + + experiment, err := s.db.CreateExperiment(ctx, metadata.Experiment{ + ID: eid, + Definition: edef, + Status: metadata.ExperimentRunning, + }) + if err != nil { + return err + } + + var seederAddrs []string + for _, addr := range s.seeder.Host().Addrs() { + seederAddrs = append(seederAddrs, fmt.Sprintf("%s/p2p/%s", addr, s.seeder.Host().ID())) + } + + var mu sync.Mutex + experiment.Reports = make([]metadata.Report, len(experiment.Definition.Trials)) + + eg, ctx := errgroup.WithContext(ctx) + for i, trial := range experiment.Definition.Trials { + i, trial := i, trial + name := fmt.Sprintf("experiment_%s_trial_%d", eid, i) + + eg.Go(func() error { + cluster, err := s.rhelper.CreateCluster(ctx, trial.Cluster, name) + if err != nil { + return err + } + + defer func() error { + return s.rhelper.DeleteCluster(ctx, name) + }() + + mns, err := s.db.ListNodes(ctx, cluster.ID) + if err != nil { + return err + } + + var ( + ns []p2plab.Node + lset = query.NewLabeledSet() + ) + for _, n := range mns { + node := controlapi.NewNode(s.client, n) + lset.Add(node) + ns = append(ns, node) + } + + var ids []string + for _, labeled := range lset.Slice() { + ids = append(ids, labeled.ID()) + } + zerolog.Ctx(ctx).Info().Int("trial", i).Strs("ids", ids).Msg("Created cluster for experiment") + + err = nodes.Update(ctx, s.builder, ns) + if err != nil { + return errors.Wrap(err, "failed to update cluster") + } + + err = nodes.Connect(ctx, ns) + if err != nil { + return errors.Wrap(err, "failed to connect cluster") + } + + plan, queries, err := scenarios.Plan(ctx, trial.Scenario, s.ts, s.seeder, lset) + if err != nil { + return err + } + + execution, err := scenarios.Run(ctx, lset, plan, seederAddrs) + if err != nil { + return errors.Wrapf(err, "failed to run scenario plan for %q", cluster.ID) + } + + report := metadata.Report{ + Summary: metadata.ReportSummary{ + TotalTime: execution.End.Sub(execution.Start), + }, + Nodes: execution.Report, + Queries: queries, + } + + report.Aggregates = reports.ComputeAggregates(report.Nodes) + jaegerUI := os.Getenv("JAEGER_UI") + if jaegerUI != "" { + sc, ok := execution.Span.Context().(jaeger.SpanContext) + if ok { + report.Summary.Trace = fmt.Sprintf("%s/trace/%s", jaegerUI, sc.TraceID()) + } + } + + mu.Lock() + experiment.Reports[i] = report + mu.Unlock() + return err + }) + } + + err = eg.Wait() + if err != nil { + return err + } + + experiment.Status = metadata.ExperimentDone + return s.db.Update(ctx, func(tx *bolt.Tx) error { + tctx := metadata.WithTransactionContext(ctx, tx) + _, err := s.db.UpdateExperiment(tctx, experiment) + if err != nil { + return err + } + return nil + }) } func (s *router) putExperimentsLabel(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { diff --git a/labd/routers/helpers/doc.go b/labd/routers/helpers/doc.go new file mode 100644 index 00000000..574d96fe --- /dev/null +++ b/labd/routers/helpers/doc.go @@ -0,0 +1,2 @@ +// Package helpers contains helper functions to be reused by multiple routers +package helpers diff --git a/labd/routers/helpers/helpers.go b/labd/routers/helpers/helpers.go new file mode 100644 index 00000000..c3625b9c --- /dev/null +++ b/labd/routers/helpers/helpers.go @@ -0,0 +1,138 @@ +package helpers + +import ( + "context" + + "github.com/Netflix/p2plab" + "github.com/Netflix/p2plab/labd/controlapi" + "github.com/Netflix/p2plab/metadata" + "github.com/Netflix/p2plab/nodes" + "github.com/Netflix/p2plab/pkg/httputil" + "github.com/pkg/errors" + "github.com/rs/zerolog" + bolt "go.etcd.io/bbolt" +) + +// TODO(bonedaddy): not sure if this is the best way to go about sharing code between routers + +// Helper abstracts commonly used functions to be shared by any router +type Helper struct { + db metadata.DB + provider p2plab.NodeProvider + client *httputil.Client +} + +// New instantiates our helper type +func New(db metadata.DB, provider p2plab.NodeProvider, client *httputil.Client) *Helper { + return &Helper{db, provider, client} +} + +// CreateCluster enables creating the nodes in a cluster, waiting for them to be healthy before returning +func (h *Helper) CreateCluster(ctx context.Context, cdef metadata.ClusterDefinition, name string) (metadata.Cluster, error) { + var ( + cluster = metadata.Cluster{ + ID: name, + Status: metadata.ClusterCreating, + Definition: cdef, + Labels: append([]string{ + name, + }, cdef.GenerateLabels()...), + } + err error + ) + // temporarily remove until hearing back from hinshun about this change + // if change is undesirable simply uncomment this + // TODO(bonedaddy): need a better way to set the peer definition + // - with cue we might want to include this as a configurable param + /*for i, def := range cluster.Definition.Groups { + if def.Peer == nil { + def.Peer = &metadata.DefaultPeerDefinition + cluster.Definition.Groups[i] = def + } + }*/ + cluster, err = h.db.CreateCluster(ctx, cluster) + if err != nil { + return cluster, err + } + + zerolog.Ctx(ctx).Info().Str("cid", name).Msg("Creating node group") + ng, err := h.provider.CreateNodeGroup(ctx, name, cdef) + if err != nil { + return cluster, err + } + + var mns []metadata.Node + cluster.Status = metadata.ClusterConnecting + if err := h.db.Update(ctx, func(tx *bolt.Tx) error { + var err error + tctx := metadata.WithTransactionContext(ctx, tx) + cluster, err = h.db.UpdateCluster(tctx, cluster) + if err != nil { + return err + } + + mns, err = h.db.CreateNodes(tctx, cluster.ID, ng.Nodes) + if err != nil { + return err + } + + return nil + }); err != nil { + return cluster, err + } + + var ns = make([]p2plab.Node, len(mns)) + for i, n := range mns { + ns[i] = controlapi.NewNode(h.client, n) + } + + if err := nodes.WaitHealthy(ctx, ns); err != nil { + return cluster, err + } + + cluster.Status = metadata.ClusterCreated + return h.db.UpdateCluster(ctx, cluster) +} + +func (h *Helper) DeleteCluster(ctx context.Context, name string) error { + logger := zerolog.Ctx(ctx).With().Str("name", name).Logger() + ctx = logger.WithContext(ctx) + + cluster, err := h.db.GetCluster(ctx, name) + if err != nil { + return errors.Wrapf(err, "failed to get cluster %q", name) + } + + if cluster.Status != metadata.ClusterDestroying { + cluster.Status = metadata.ClusterDestroying + cluster, err = h.db.UpdateCluster(ctx, cluster) + if err != nil { + return errors.Wrap(err, "failed to update cluster status to destroying") + } + } + + ns, err := h.db.ListNodes(ctx, cluster.ID) + if err != nil { + return errors.Wrap(err, "failed to list nodes") + } + + ng := &p2plab.NodeGroup{ + ID: cluster.ID, + Nodes: ns, + } + + logger.Info().Msg("Destroying node group") + err = h.provider.DestroyNodeGroup(ctx, ng) + if err != nil { + return errors.Wrap(err, "failed to destroy node group") + } + + logger.Info().Msg("Deleting cluster metadata") + err = h.db.DeleteCluster(ctx, cluster.ID) + if err != nil { + return errors.Wrap(err, "failed to delete cluster metadata") + } + + logger.Info().Msg("Destroyed cluster") + return nil +} diff --git a/metadata/experiment.go b/metadata/experiment.go index 9a96d18b..5cbb5af7 100644 --- a/metadata/experiment.go +++ b/metadata/experiment.go @@ -17,6 +17,7 @@ package metadata import ( "context" "encoding/json" + "strconv" "time" "github.com/Netflix/p2plab/errdefs" @@ -31,11 +32,24 @@ type Experiment struct { Definition ExperimentDefinition + Reports []Report + Labels []string CreatedAt, UpdatedAt time.Time } +// ToJSON is a helper function to convert an Experiment +// into it's JSON representation +func (e *Experiment) ToJSON() ([]byte, error) { + return json.MarshalIndent(e, "", " ") +} + +// FromJSON loads the experiment definition with the values from data +func (e *Experiment) FromJSON(data []byte) error { + return json.Unmarshal(data, e) +} + type ExperimentStatus string var ( @@ -46,16 +60,13 @@ var ( // ExperimentDefinition defines an experiment. type ExperimentDefinition struct { - IndependentVariable []IndependentVariable - TrialDefinition []TrialDefinition + Trials []TrialDefinition } -type IndependentVariable map[string]interface{} - // ToJSON is a helper function to convert an ExperimentDefinition // into it's JSON representation func (ed *ExperimentDefinition) ToJSON() ([]byte, error) { - return json.Marshal(ed) + return json.MarshalIndent(ed, "", " ") } // FromJSON loads the experiment definition with the values from data @@ -250,6 +261,21 @@ func readExperiment(bkt *bolt.Bucket, experiment *Experiment) error { return err } + i := 0 + rbkt := bkt.Bucket([]byte(strconv.Itoa(i))) + for rbkt != nil { + var report Report + err = readReport(rbkt, &report) + if err != nil { + return err + } + + experiment.Reports = append(experiment.Reports, report) + + i++ + rbkt = bkt.Bucket([]byte(strconv.Itoa(i))) + } + return bkt.ForEach(func(k, v []byte) error { if v == nil { return nil @@ -295,6 +321,18 @@ func writeExperiment(bkt *bolt.Bucket, experiment *Experiment) error { return err } + for i, report := range experiment.Reports { + rbkt, err := bkt.CreateBucket([]byte(strconv.Itoa(i))) + if err != nil { + return err + } + + err = writeReport(rbkt, report) + if err != nil { + return err + } + } + for _, f := range []field{ {bucketKeyID, []byte(experiment.ID)}, {bucketKeyStatus, []byte(experiment.Status)}, diff --git a/nodes/connect.go b/nodes/connect.go index 389f6619..5f64fd80 100644 --- a/nodes/connect.go +++ b/nodes/connect.go @@ -132,5 +132,10 @@ func Connect(ctx context.Context, ns []p2plab.Node) error { } } - return connectPeers.Wait() + err = connectPeers.Wait() + if err != nil { + return err + } + + return nil } diff --git a/nodes/session.go b/nodes/session.go index 8a998736..c4b5822c 100644 --- a/nodes/session.go +++ b/nodes/session.go @@ -33,18 +33,21 @@ func Session(ctx context.Context, ns []p2plab.Node, fn func(context.Context) err defer span.Finish() sctx := opentracing.ContextWithSpan(ctx, span) + var ids []string + for _, n := range ns { + ids = append(ids, n.ID()) + } + eg, gctx := errgroup.WithContext(sctx) zerolog.Ctx(ctx).Info().Msg("Starting a session for benchmarking") go logutil.Elapsed(gctx, 20*time.Second, "Starting a session for benchmarking") - - cancels := make([]context.CancelFunc, len(ns)) - for i, n := range ns { - i, n := i, n + var cancels = make([]context.CancelFunc, 0, len(ns)) + for _, n := range ns { + n := n eg.Go(func() error { lctx, cancel := context.WithCancel(gctx) - cancels[i] = cancel - + cancels = append(cancels, cancel) pdef := n.Metadata().Peer err := n.Update(lctx, n.ID(), "", pdef) if err != nil && !errdefs.IsCancelled(err) { @@ -65,7 +68,7 @@ func Session(ctx context.Context, ns []p2plab.Node, fn func(context.Context) err return nil, err } - zerolog.Ctx(ctx).Info().Msg("Ending the session") + zerolog.Ctx(ctx).Info().Strs("nodes", ids).Msg("Ending the session") for _, cancel := range cancels { cancel() } diff --git a/nodes/update.go b/nodes/update.go index 8f88024d..573ac7c3 100644 --- a/nodes/update.go +++ b/nodes/update.go @@ -60,9 +60,9 @@ func Update(ctx context.Context, builder p2plab.Builder, ns []p2plab.Node) error for _, n := range ns { n := n + pdef := n.Metadata().Peer + link := linkByCommit[commitByRef[pdef.GitReference]] updatePeers.Go(func() error { - pdef := n.Metadata().Peer - link := linkByCommit[commitByRef[pdef.GitReference]] return n.Update(gctx, n.ID(), link, pdef) }) } diff --git a/pkg/digestconv/digestconv.go b/pkg/digestconv/digestconv.go index aecfe75f..60835a28 100644 --- a/pkg/digestconv/digestconv.go +++ b/pkg/digestconv/digestconv.go @@ -24,6 +24,9 @@ import ( ) func DigestToCid(dgst digest.Digest) (cid.Cid, error) { + if dgst.String() == "" { + return cid.Cid{}, errors.New("bad digest") + } data, err := hex.DecodeString(dgst.Hex()) if err != nil { return cid.Cid{}, errors.Wrap(err, "failed to decode digest hex") diff --git a/providers/inmemory/provider.go b/providers/inmemory/provider.go index f77ecc6d..3dd36b7d 100644 --- a/providers/inmemory/provider.go +++ b/providers/inmemory/provider.go @@ -19,6 +19,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/labagent" @@ -33,6 +34,7 @@ type provider struct { nodes map[string][]*node logger *zerolog.Logger agentOpts []labagent.LabagentOption + mu sync.Mutex } func New(root string, db metadata.DB, logger *zerolog.Logger, agentOpts ...labagent.LabagentOption) (p2plab.NodeProvider, error) { @@ -40,7 +42,6 @@ func New(root string, db metadata.DB, logger *zerolog.Logger, agentOpts ...labag if err != nil { return nil, err } - p := &provider{ root: root, nodes: make(map[string][]*node), @@ -73,14 +74,27 @@ func New(root string, db metadata.DB, logger *zerolog.Logger, agentOpts ...labag } func (p *provider) CreateNodeGroup(ctx context.Context, id string, cdef metadata.ClusterDefinition) (*p2plab.NodeGroup, error) { - var ns []metadata.Node + p.mu.Lock() + defer p.mu.Unlock() + + numPorts := 0 + for _, group := range cdef.Groups { + numPorts += group.Size + } + + freePorts, err := freeport.GetFreePorts(numPorts * 2) + if err != nil { + return nil, err + } + + var ( + ns []metadata.Node + portIndex = 0 + ) for _, group := range cdef.Groups { for i := 0; i < group.Size; i++ { - freePorts, err := freeport.GetFreePorts(2) - if err != nil { - return nil, err - } - agentPort, appPort := freePorts[0], freePorts[1] + agentPort, appPort := freePorts[portIndex], freePorts[portIndex+1] + portIndex += 2 id := xid.New().String() n, err := p.newNode(id, agentPort, appPort) @@ -114,6 +128,7 @@ func (p *provider) DestroyNodeGroup(ctx context.Context, ng *p2plab.NodeGroup) e for _, n := range p.nodes[ng.ID] { err := n.Close() if err != nil { + p.logger.Error().Err(err).Str("node.id", n.ID).Msg("error encountered while destroying node group") return err } } @@ -133,9 +148,17 @@ type node struct { func (p *provider) newNode(id string, agentPort, appPort int) (*node, error) { agentRoot := filepath.Join(p.root, id, "labagent") agentAddr := fmt.Sprintf(":%d", agentPort) + err := os.MkdirAll(agentRoot, 0711) + if err != nil { + return nil, err + } appRoot := filepath.Join(p.root, id, "labapp") appAddr := fmt.Sprintf("http://localhost:%d", appPort) + err = os.MkdirAll(appRoot, 0711) + if err != nil { + return nil, err + } la, err := labagent.New(agentRoot, agentAddr, appRoot, appAddr, p.logger, p.agentOpts...) if err != nil { diff --git a/transformers/oci/oci.go b/transformers/oci/oci.go index d32dfe73..cd118a23 100644 --- a/transformers/oci/oci.go +++ b/transformers/oci/oci.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "io" "net/http" "path/filepath" @@ -202,6 +203,8 @@ func ConvertHandler(conversions map[digest.Digest]ocispec.Descriptor, peer p2pla defer rc.Close() target.Digest, err = AddBlob(ctx, peer, rc, opts...) + default: + return nil, fmt.Errorf("unhandled media type %s", desc.MediaType) } if err != nil { return nil, errors.Wrapf(err, "failed to convert %q [%s]", desc.Digest, desc.MediaType) @@ -210,6 +213,12 @@ func ConvertHandler(conversions map[digest.Digest]ocispec.Descriptor, peer p2pla if zerolog.Ctx(ctx).GetLevel() == zerolog.DebugLevel { c, err := digestconv.DigestToCid(target.Digest) if err != nil { + zerolog.Ctx(ctx).Error().Str("mediaType", desc.MediaType). + Str("source", desc.Digest.String()). + Str("cid", c.String()). + Int64("size", desc.Size). + Err(err). + Msg("failed to retrieve CID from digest") return nil, err } zerolog.Ctx(ctx).Debug().Str("mediaType", desc.MediaType).Str("source", desc.Digest.String()).Str("cid", c.String()).Int64("size", desc.Size).Msg("Added blob to peer")