From ead8831325328029dae814b29284e698fb78a234 Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 15:24:59 -0700 Subject: [PATCH 01/66] cue/parser: embed the template into parser package as exported variable --- cue/parser/parser.go | 49 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/cue/parser/parser.go b/cue/parser/parser.go index 78e608b9..aa250be3 100644 --- a/cue/parser/parser.go +++ b/cue/parser/parser.go @@ -4,6 +4,55 @@ import ( "cuelang.org/go/cue" ) +var ( + // CueTemplate contains the bare cue source template used to generate + // cue files + CueTemplate = ` + // defines a set of nodes of size 1 or higher + // a "node" is simply an EC2 instance provisioned of the given type + // and there may be more than 1 node in a group, however there must always be 1 + Group :: { + // must be greater than or equal to 1 + // default value of this field is 1 + size: >=1 | *1 + instanceType: string + region: string + // labels is an optional field + labels?: [...string] + } + + // a cluster is a collection of 1 or more groups of nodes + // that will be participating in a given benchmark + Cluster :: { + groups: [...Group] + } + + // an object is a particular data format to be used in benchmarking + // typically these are container images + object :: [Name=_]: { + type: string + source: string + } + + Scenario :: { + objects: [...object] + seed: { ... } + // enable any fields for benchmark + benchmark: { ... } + } + + Trial :: { + cluster: Cluster + scenario: Scenario + } + + Experiment :: { + trials: [...Trial] + // trials: [ ...[Cluster,Scenario]] + } +` +) + // Parser bundles the cue runtime with helper functions // to enable parsing of cue source files type Parser struct { From 714cce8c3bedc1ef9d23c1db7ae1d77af755abe6 Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 15:25:19 -0700 Subject: [PATCH 02/66] experiments: modfiy the Parser function to use cue not json --- experiments/definition.go | 19 +++++++++++-------- experiments/experiments_test.go | 14 +------------- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/experiments/definition.go b/experiments/definition.go index 5b208172..8a82e2ec 100644 --- a/experiments/definition.go +++ b/experiments/definition.go @@ -15,23 +15,26 @@ package experiments import ( - "encoding/json" "io/ioutil" + "github.com/Netflix/p2plab/cue/parser" "github.com/Netflix/p2plab/metadata" ) +// Parse reads the cue source file at filename and converts it to a +// metadata.ExperimentDefinition type func Parse(filename string) (metadata.ExperimentDefinition, error) { - var edef metadata.ExperimentDefinition content, err := ioutil.ReadFile(filename) if err != nil { - return edef, err + return metadata.ExperimentDefinition{}, err } - - err = json.Unmarshal(content, &edef) + psr := parser.NewParser([]string{parser.CueTemplate}) + inst, err := psr.Compile( + "p2plab_experiment", + string(content), + ) if err != nil { - return edef, err + return metadata.ExperimentDefinition{}, err } - - return edef, nil + return inst.ToExperimentDefinition() } diff --git a/experiments/experiments_test.go b/experiments/experiments_test.go index bfe9a76f..e4595773 100644 --- a/experiments/experiments_test.go +++ b/experiments/experiments_test.go @@ -2,12 +2,10 @@ package experiments import ( "context" - "io/ioutil" "os" "strings" "testing" - parser "github.com/Netflix/p2plab/cue/parser" "github.com/Netflix/p2plab/metadata" "github.com/google/uuid" "github.com/stretchr/testify/require" @@ -85,17 +83,7 @@ func TestExperimentDefinition(t *testing.T) { } func newTestExperiment(t *testing.T, sourceFile, name string) metadata.Experiment { - data, err := ioutil.ReadFile("../cue/cue.mod/p2plab.cue") - require.NoError(t, err) - sourceData, err := ioutil.ReadFile(sourceFile) - require.NoError(t, err) - psr := parser.NewParser([]string{string(data)}) - inst, err := psr.Compile( - name, - string(sourceData), - ) - require.NoError(t, err) - edef, err := inst.ToExperimentDefinition() + edef, err := Parse(sourceFile) require.NoError(t, err) return metadata.Experiment{ ID: uuid.New().String(), From 92d9d756f29743b78a7762b586155d98fc6d8db9 Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 15:30:23 -0700 Subject: [PATCH 03/66] cmd/labctl/command: add dry run for debug purposes --- cmd/labctl/command/experiment.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/cmd/labctl/command/experiment.go b/cmd/labctl/command/experiment.go index b8169eaf..26c62cfa 100644 --- a/cmd/labctl/command/experiment.go +++ b/cmd/labctl/command/experiment.go @@ -16,6 +16,7 @@ package command import ( "errors" + "fmt" "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/experiments" @@ -42,6 +43,10 @@ var experimentCommand = cli.Command{ Name: "name", Usage: "Name of the experiment, by default takes the name of the experiment definition.", }, + &cli.BoolFlag{ + Name: "dry.run", + Usage: "dry run the epxeriment creation, parsing the cue file and printing it to stdout", + }, }, }, { @@ -111,7 +116,14 @@ func createExperimentAction(c *cli.Context) error { if err != nil { return err } - + if c.Bool("dry.run") { + jedef, err := edef.ToJSON() + if err != nil { + return err + } + fmt.Printf("%+v\n", string(jedef)) + return nil + } control, err := ResolveControl(c) if err != nil { return err From 1dc0c6e81456e6bf4ec5fd113d48175276cc709d Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 16:36:38 -0700 Subject: [PATCH 04/66] labd/routers/experimentrouter: commit current state --- labd/routers/experimentrouter/router.go | 48 ++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 45a28b92..2c89345f 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -16,8 +16,11 @@ package experimentrouter import ( "context" + "fmt" + "io/ioutil" "net/http" "strings" + "sync" "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/daemon" @@ -28,6 +31,7 @@ import ( "github.com/Netflix/p2plab/query" "github.com/Netflix/p2plab/transformers" "github.com/containerd/containerd/errdefs" + "github.com/google/uuid" "github.com/pkg/errors" "github.com/rs/zerolog" ) @@ -79,7 +83,49 @@ func (s *router) getExperimentByName(ctx context.Context, w http.ResponseWriter, } func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { - return errors.New("unimplemented") + defer r.Body.Close() + data, err := ioutil.ReadAll(r.Body) + if err != nil { + return err + } + var edef metadata.ExperimentDefinition + if err := edef.FromJSON(data); err != nil { + return err + } + exp, err := s.db.CreateExperiment(ctx, metadata.Experiment{ + ID: uuid.New().String(), + Definition: edef, + Status: metadata.ExperimentRunning, + }) + if err != nil { + return err + } + wg := &sync.WaitGroup{} + wg.Add(len(exp.Definition.TrialDefinition)) + logger := zerolog.Ctx(ctx).With().Str("experiment", exp.ID).Logger() + logger.Info().Msg("creating trial goroutines") + for _, t := range exp.Definition.TrialDefinition { + go func(trial metadata.TrialDefinition) { + defer wg.Done() + benchmark := metadata.Benchmark{ + ID: uuid.New().String(), + Status: metadata.BenchmarkRunning, + Cluster: metadata.Cluster{ + ID: uuid.New().String(), + Definition: trial.Cluster, + }, + Scenario: metadata.Scenario{ + ID: uuid.New().String(), + Definition: trial.Scenario, + }, + } + fmt.Printf("%+v\n", benchmark) + }(t) + } + logger.Info().Msg("waiting for trials to finish") + wg.Wait() + logger.Info().Msg("finished trials") + return nil } func (s *router) putExperimentsLabel(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { From 4bcdabb6e9751230e49d63e626cfd3e7dbc9f72f Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 16:43:41 -0700 Subject: [PATCH 05/66] labd/routers/experimentrouter: commit the panic code --- labd/routers/experimentrouter/router.go | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 2c89345f..390834ad 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io/ioutil" + "log" "net/http" "strings" "sync" @@ -102,29 +103,18 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite } wg := &sync.WaitGroup{} wg.Add(len(exp.Definition.TrialDefinition)) - logger := zerolog.Ctx(ctx).With().Str("experiment", exp.ID).Logger() - logger.Info().Msg("creating trial goroutines") for _, t := range exp.Definition.TrialDefinition { go func(trial metadata.TrialDefinition) { defer wg.Done() - benchmark := metadata.Benchmark{ - ID: uuid.New().String(), - Status: metadata.BenchmarkRunning, - Cluster: metadata.Cluster{ - ID: uuid.New().String(), - Definition: trial.Cluster, - }, - Scenario: metadata.Scenario{ - ID: uuid.New().String(), - Definition: trial.Scenario, - }, + nodeGroup, err := s.provider.CreateNodeGroup(ctx, uuid.New().String(), trial.Cluster) + if err != nil { + log.Println(err) + return } - fmt.Printf("%+v\n", benchmark) + fmt.Printf("%+v\n", nodeGroup) }(t) } - logger.Info().Msg("waiting for trials to finish") wg.Wait() - logger.Info().Msg("finished trials") return nil } From cf58c94a71d01db4d170e2be8e0ef0c8ac2fa6a2 Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 16:52:11 -0700 Subject: [PATCH 06/66] labd/routers/experimentrouter: something works? --- labd/routers/experimentrouter/router.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 390834ad..9a01a76e 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "io/ioutil" - "log" "net/http" "strings" "sync" @@ -30,6 +29,7 @@ import ( "github.com/Netflix/p2plab/pkg/httputil" "github.com/Netflix/p2plab/pkg/stringutil" "github.com/Netflix/p2plab/query" + "github.com/Netflix/p2plab/scenarios" "github.com/Netflix/p2plab/transformers" "github.com/containerd/containerd/errdefs" "github.com/google/uuid" @@ -106,12 +106,14 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite for _, t := range exp.Definition.TrialDefinition { go func(trial metadata.TrialDefinition) { defer wg.Done() - nodeGroup, err := s.provider.CreateNodeGroup(ctx, uuid.New().String(), trial.Cluster) + plan, queries, err := scenarios.Plan( + ctx, trial.Scenario, s.ts, s.seeder, nil, + ) if err != nil { - log.Println(err) return } - fmt.Printf("%+v\n", nodeGroup) + fmt.Printf("plans\t%+v\n", plan) + fmt.Printf("queries\t%+v\n", queries) }(t) } wg.Wait() From 91691b464f06c5d962c304583bf8b9205f6928ae Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 17:00:17 -0700 Subject: [PATCH 07/66] labd/routers/experimentrouter: fix panic --- labd/routers/experimentrouter/router.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 9a01a76e..6935662d 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -105,9 +105,10 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite wg.Add(len(exp.Definition.TrialDefinition)) for _, t := range exp.Definition.TrialDefinition { go func(trial metadata.TrialDefinition) { + lset := query.NewLabeledSet() defer wg.Done() plan, queries, err := scenarios.Plan( - ctx, trial.Scenario, s.ts, s.seeder, nil, + ctx, trial.Scenario, s.ts, s.seeder, lset, ) if err != nil { return From 1aa007d2fef55678b46d615f3e933edce38318f7 Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 17:09:48 -0700 Subject: [PATCH 08/66] labd/routers/experimentrouter: continue making some minor modifications note that unless we use properly formatted cue files they cause errors, i will be committing them next --- labd/routers/experimentrouter/router.go | 37 +++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 6935662d..03141400 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io/ioutil" + "log" "net/http" "strings" "sync" @@ -111,10 +112,42 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite ctx, trial.Scenario, s.ts, s.seeder, lset, ) if err != nil { + log.Println(err) return } - fmt.Printf("plans\t%+v\n", plan) - fmt.Printf("queries\t%+v\n", queries) + benchmark := metadata.Benchmark{ + ID: uuid.New().String(), + Status: metadata.BenchmarkRunning, + Cluster: metadata.Cluster{ + Definition: trial.Cluster, + }, + Scenario: metadata.Scenario{ + Definition: trial.Scenario, + }, + Plan: plan, + } + benchmark, err = s.db.CreateBenchmark(ctx, benchmark) + if err != nil { + log.Println(err) + return + } + var seederAddrs []string + for _, addr := range s.seeder.Host().Addrs() { + seederAddrs = append(seederAddrs, fmt.Sprintf("%s/p2p/%s", addr, s.seeder.Host().ID())) + } + execution, err := scenarios.Run(ctx, lset, plan, seederAddrs) + if err != nil { + log.Println(err) + return + } + report := metadata.Report{ + Summary: metadata.ReportSummary{ + TotalTime: execution.End.Sub(execution.Start), + }, + Nodes: execution.Report, + Queries: queries, + } + fmt.Printf("%+v\n", report) }(t) } wg.Wait() From 9b975ed41e32eb44c5bc9dad27ce0422b14cccfe Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 17:10:32 -0700 Subject: [PATCH 09/66] cue/cue.mod: add some fixes to labels --- cue/cue.mod/p2plab_example1.cue | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cue/cue.mod/p2plab_example1.cue b/cue/cue.mod/p2plab_example1.cue index 2822cb3b..8da3290e 100644 --- a/cue/cue.mod/p2plab_example1.cue +++ b/cue/cue.mod/p2plab_example1.cue @@ -23,7 +23,7 @@ clust1:: Cluster & { size: 2 instanceType: "t3.medium" region: "us-east-1" - labels: [ "neighbors" ] + labels: [ "'neighbors'" ] } ] } @@ -31,17 +31,17 @@ clust1:: Cluster & { scen1:: Scenario & { objects: [ items ] seed: { - "neighbors": "golang" + "\"neighbors\"": "golang" } benchmark: { - "(not neighbors)": "golang" + "(not 'neighbors')": "golang" } } scen2:: Scenario & { objects: [ items ] seed: { - "neighbors": "golang" + "\"neighbors\"": "golang" } benchmark: { "(neighbors)": "golang" @@ -61,4 +61,4 @@ experiment: Experiment & { } ] // trials: [[clust1,scen1],[clust1,scen2]] -} \ No newline at end of file +} From ee184a9ed91ea6fce97268fbe1cdbb3c13a5e518 Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 18:57:14 -0700 Subject: [PATCH 10/66] labd/routers/experimentrouter: switch to use errgorup instead of wait --- labd/routers/experimentrouter/router.go | 68 ++++++------------------- 1 file changed, 16 insertions(+), 52 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 03141400..058dc91a 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -18,10 +18,8 @@ import ( "context" "fmt" "io/ioutil" - "log" "net/http" "strings" - "sync" "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/daemon" @@ -30,12 +28,12 @@ import ( "github.com/Netflix/p2plab/pkg/httputil" "github.com/Netflix/p2plab/pkg/stringutil" "github.com/Netflix/p2plab/query" - "github.com/Netflix/p2plab/scenarios" "github.com/Netflix/p2plab/transformers" "github.com/containerd/containerd/errdefs" "github.com/google/uuid" "github.com/pkg/errors" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" ) type router struct { @@ -102,56 +100,22 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite if err != nil { return err } - wg := &sync.WaitGroup{} - wg.Add(len(exp.Definition.TrialDefinition)) - for _, t := range exp.Definition.TrialDefinition { - go func(trial metadata.TrialDefinition) { - lset := query.NewLabeledSet() - defer wg.Done() - plan, queries, err := scenarios.Plan( - ctx, trial.Scenario, s.ts, s.seeder, lset, - ) - if err != nil { - log.Println(err) - return - } - benchmark := metadata.Benchmark{ - ID: uuid.New().String(), - Status: metadata.BenchmarkRunning, - Cluster: metadata.Cluster{ - Definition: trial.Cluster, - }, - Scenario: metadata.Scenario{ - Definition: trial.Scenario, - }, - Plan: plan, - } - benchmark, err = s.db.CreateBenchmark(ctx, benchmark) - if err != nil { - log.Println(err) - return - } - var seederAddrs []string - for _, addr := range s.seeder.Host().Addrs() { - seederAddrs = append(seederAddrs, fmt.Sprintf("%s/p2p/%s", addr, s.seeder.Host().ID())) - } - execution, err := scenarios.Run(ctx, lset, plan, seederAddrs) - if err != nil { - log.Println(err) - return - } - report := metadata.Report{ - Summary: metadata.ReportSummary{ - TotalTime: execution.End.Sub(execution.Start), - }, - Nodes: execution.Report, - Queries: queries, - } - fmt.Printf("%+v\n", report) - }(t) + errg, ctx := errgroup.WithContext(ctx) + for _, trial := range exp.Definition.TrialDefinition { + trial := trial + errg.Go(func() error { + // just temporary to silence error + _ = trial + fmt.Println("creating cluster") + defer func() { + fmt.Println("cluster tearing down") + }() + fmt.Println("creating plan from scenario") + fmt.Println("running plan on cluster") + return nil + }) } - wg.Wait() - return nil + return errg.Wait() } func (s *router) putExperimentsLabel(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { From 150de93d576baeea626863b6794b08b14ac38b08 Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 19:14:43 -0700 Subject: [PATCH 11/66] labd/routers: add a helper package to be reused by multiple routers --- labd/routers/helpers/doc.go | 2 + labd/routers/helpers/helpers.go | 94 +++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 labd/routers/helpers/doc.go create mode 100644 labd/routers/helpers/helpers.go diff --git a/labd/routers/helpers/doc.go b/labd/routers/helpers/doc.go new file mode 100644 index 00000000..574d96fe --- /dev/null +++ b/labd/routers/helpers/doc.go @@ -0,0 +1,2 @@ +// Package helpers contains helper functions to be reused by multiple routers +package helpers diff --git a/labd/routers/helpers/helpers.go b/labd/routers/helpers/helpers.go new file mode 100644 index 00000000..c15a236e --- /dev/null +++ b/labd/routers/helpers/helpers.go @@ -0,0 +1,94 @@ +package helpers + +import ( + "context" + "net/http" + + "github.com/Netflix/p2plab" + "github.com/Netflix/p2plab/labd/controlapi" + "github.com/Netflix/p2plab/metadata" + "github.com/Netflix/p2plab/nodes" + "github.com/Netflix/p2plab/pkg/httputil" + "github.com/Netflix/p2plab/pkg/logutil" + "github.com/rs/zerolog" + bolt "go.etcd.io/bbolt" +) + +// TODO(bonedaddy): not sure if this is the best way to go about sharing code between routers + +// Helper abstracts commonly used functions to be shared by any router +type Helper struct { + db metadata.DB + provider p2plab.NodeProvider + client *httputil.Client +} + +// New instantiates our helper type +func New(db metadata.DB, provider p2plab.NodeProvider, client *httputil.Client) *Helper { + return &Helper{db, provider, client} +} + +// CreateCluster enables creating the nodes in a cluster, waiting for them to be healthy before returning +func (h *Helper) CreateCluster(ctx context.Context, cdef metadata.ClusterDefinition, name string, w http.ResponseWriter) (metadata.Cluster, error) { + var ( + cluster = metadata.Cluster{ + ID: name, + Status: metadata.ClusterCreating, + Definition: cdef, + Labels: append([]string{ + name, + }, cdef.GenerateLabels()...), + } + err error + ) + + ctx, logger := logutil.WithResponseLogger(ctx, w) + logger.UpdateContext(func(c zerolog.Context) zerolog.Context { + return c.Str("name", name) + }) + + cluster, err = h.db.CreateCluster(ctx, cluster) + if err != nil { + return cluster, err + } + + zerolog.Ctx(ctx).Info().Msg("creating node group") + ng, err := h.provider.CreateNodeGroup(ctx, name, cdef) + if err != nil { + return cluster, err + } + + zerolog.Ctx(ctx).Info().Msg("updating metadata with new nodes") + var mns []metadata.Node + cluster.Status = metadata.ClusterConnecting + if err := h.db.Update(ctx, func(tx *bolt.Tx) error { + var err error + tctx := metadata.WithTransactionContext(ctx, tx) + cluster, err = h.db.UpdateCluster(tctx, cluster) + if err != nil { + return err + } + + mns, err = h.db.CreateNodes(tctx, cluster.ID, ng.Nodes) + if err != nil { + return err + } + + return nil + }); err != nil { + return cluster, err + } + + var ns = make([]p2plab.Node, len(mns)) + for i, n := range mns { + ns[i] = controlapi.NewNode(h.client, n) + } + + if err := nodes.WaitHealthy(ctx, ns); err != nil { + return cluster, err + } + + zerolog.Ctx(ctx).Info().Msg("updating cluster metadata") + cluster.Status = metadata.ClusterCreated + return h.db.UpdateCluster(ctx, cluster) +} From 09227648967fa5a4ab7d15a7b65523a20300e20e Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 19:19:55 -0700 Subject: [PATCH 12/66] labd/routers: use the helper functions --- labd/routers/clusterrouter/router.go | 84 ++++--------------------- labd/routers/experimentrouter/router.go | 19 +++++- 2 files changed, 28 insertions(+), 75 deletions(-) diff --git a/labd/routers/clusterrouter/router.go b/labd/routers/clusterrouter/router.go index b0471042..f3bb3e24 100644 --- a/labd/routers/clusterrouter/router.go +++ b/labd/routers/clusterrouter/router.go @@ -22,26 +22,30 @@ import ( "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/daemon" - "github.com/Netflix/p2plab/labd/controlapi" + "github.com/Netflix/p2plab/labd/routers/helpers" "github.com/Netflix/p2plab/metadata" - "github.com/Netflix/p2plab/nodes" "github.com/Netflix/p2plab/pkg/httputil" "github.com/Netflix/p2plab/pkg/logutil" "github.com/Netflix/p2plab/pkg/stringutil" "github.com/Netflix/p2plab/query" "github.com/pkg/errors" - "github.com/rs/zerolog" - bolt "go.etcd.io/bbolt" ) type router struct { db metadata.DB provider p2plab.NodeProvider client *httputil.Client + rhelper *helpers.Helper } +// New returns a new clutser router initialized with the router helpers func New(db metadata.DB, provider p2plab.NodeProvider, client *httputil.Client) daemon.Router { - return &router{db, provider, client} + return &router{ + db, + provider, + client, + helpers.New(db, provider, client), + } } func (s *router) Routes() []daemon.Route { @@ -83,74 +87,8 @@ func (s *router) postClustersCreate(ctx context.Context, w http.ResponseWriter, if err != nil { return err } - - name := r.FormValue("name") - ctx, logger := logutil.WithResponseLogger(ctx, w) - logger.UpdateContext(func(c zerolog.Context) zerolog.Context { - return c.Str("name", name) - }) - - cluster := metadata.Cluster{ - ID: name, - Status: metadata.ClusterCreating, - Definition: cdef, - Labels: append([]string{ - name, - }, cdef.GenerateLabels()...), - } - - cluster, err = s.db.CreateCluster(ctx, cluster) - if err != nil { - return err - } - w.Header().Add(controlapi.ResourceID, name) - - zerolog.Ctx(ctx).Info().Msg("Creating node group") - ng, err := s.provider.CreateNodeGroup(ctx, name, cdef) - if err != nil { - return err - } - - zerolog.Ctx(ctx).Info().Msg("Updating metadata with new nodes") - var mns []metadata.Node - cluster.Status = metadata.ClusterConnecting - err = s.db.Update(ctx, func(tx *bolt.Tx) error { - var err error - tctx := metadata.WithTransactionContext(ctx, tx) - cluster, err = s.db.UpdateCluster(tctx, cluster) - if err != nil { - return err - } - - mns, err = s.db.CreateNodes(tctx, cluster.ID, ng.Nodes) - if err != nil { - return err - } - - return nil - }) - if err != nil { - return err - } - - var ns []p2plab.Node - for _, n := range mns { - ns = append(ns, controlapi.NewNode(s.client, n)) - } - - err = nodes.WaitHealthy(ctx, ns) - if err != nil { - return err - } - - zerolog.Ctx(ctx).Info().Msg("Updating cluster metadata") - cluster.Status = metadata.ClusterCreated - _, err = s.db.UpdateCluster(ctx, cluster) - if err != nil { - return err - } - - return nil + _, err = s.rhelper.CreateCluster(ctx, cdef, r.FormValue("name"), w) + return err } func (s *router) putClustersLabel(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 058dc91a..cfe6ea85 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -23,6 +23,7 @@ import ( "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/daemon" + "github.com/Netflix/p2plab/labd/routers/helpers" "github.com/Netflix/p2plab/metadata" "github.com/Netflix/p2plab/peer" "github.com/Netflix/p2plab/pkg/httputil" @@ -43,10 +44,20 @@ type router struct { ts *transformers.Transformers seeder *peer.Peer builder p2plab.Builder + rhelper *helpers.Helper } +// New returns a new experiment router initialized with the router helpers func New(db metadata.DB, provider p2plab.NodeProvider, client *httputil.Client, ts *transformers.Transformers, seeder *peer.Peer, builder p2plab.Builder) daemon.Router { - return &router{db, provider, client, ts, seeder, builder} + return &router{ + db, + provider, + client, + ts, + seeder, + builder, + helpers.New(db, provider, client), + } } func (s *router) Routes() []daemon.Route { @@ -106,7 +117,11 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite errg.Go(func() error { // just temporary to silence error _ = trial - fmt.Println("creating cluster") + cluster, err := s.rhelper.CreateCluster(ctx, trial.Cluster, uuid.New().String(), w) + if err != nil { + return err + } + fmt.Printf("%+v\n", cluster) defer func() { fmt.Println("cluster tearing down") }() From c8b0f772fb9d04c7dd57329881d2e8bfb0bfd1d3 Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 19:23:20 -0700 Subject: [PATCH 13/66] labd/routers/helpers: remove usage of logutil --- labd/routers/helpers/helpers.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/labd/routers/helpers/helpers.go b/labd/routers/helpers/helpers.go index c15a236e..f99a1768 100644 --- a/labd/routers/helpers/helpers.go +++ b/labd/routers/helpers/helpers.go @@ -9,7 +9,6 @@ import ( "github.com/Netflix/p2plab/metadata" "github.com/Netflix/p2plab/nodes" "github.com/Netflix/p2plab/pkg/httputil" - "github.com/Netflix/p2plab/pkg/logutil" "github.com/rs/zerolog" bolt "go.etcd.io/bbolt" ) @@ -42,11 +41,6 @@ func (h *Helper) CreateCluster(ctx context.Context, cdef metadata.ClusterDefinit err error ) - ctx, logger := logutil.WithResponseLogger(ctx, w) - logger.UpdateContext(func(c zerolog.Context) zerolog.Context { - return c.Str("name", name) - }) - cluster, err = h.db.CreateCluster(ctx, cluster) if err != nil { return cluster, err From 1aaa9bcda31245b6821a1c7bbaeb9643f6fb705e Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 20:47:25 -0700 Subject: [PATCH 14/66] labd/routers/experimentrouter: remove nil assign --- labd/routers/experimentrouter/router.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index cfe6ea85..18535a72 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -115,8 +115,6 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite for _, trial := range exp.Definition.TrialDefinition { trial := trial errg.Go(func() error { - // just temporary to silence error - _ = trial cluster, err := s.rhelper.CreateCluster(ctx, trial.Cluster, uuid.New().String(), w) if err != nil { return err From 0f855d4b8283203d92e71f648b5931c146b17bb0 Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 20:47:41 -0700 Subject: [PATCH 15/66] labd/routers/helpers: set default peer def if peer def is empty --- labd/routers/helpers/helpers.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/labd/routers/helpers/helpers.go b/labd/routers/helpers/helpers.go index f99a1768..2413ebe4 100644 --- a/labd/routers/helpers/helpers.go +++ b/labd/routers/helpers/helpers.go @@ -40,12 +40,18 @@ func (h *Helper) CreateCluster(ctx context.Context, cdef metadata.ClusterDefinit } err error ) - + // TODO(bonedaddy): need a better way to set the peer definition + // - with cue we might want to include this as a configurable param + for i, def := range cluster.Definition.Groups { + if def.Peer == nil { + def.Peer = &metadata.DefaultPeerDefinition + cluster.Definition.Groups[i] = def + } + } cluster, err = h.db.CreateCluster(ctx, cluster) if err != nil { return cluster, err } - zerolog.Ctx(ctx).Info().Msg("creating node group") ng, err := h.provider.CreateNodeGroup(ctx, name, cdef) if err != nil { From 72fead1e2efc6646541fd0f64c3c52ec7b9263d9 Mon Sep 17 00:00:00 2001 From: postables Date: Wed, 29 Apr 2020 21:12:12 -0700 Subject: [PATCH 16/66] labd/routers/experimentrouter: check in current code --- labd/routers/experimentrouter/router.go | 35 +++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 18535a72..980e2892 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -29,6 +29,7 @@ import ( "github.com/Netflix/p2plab/pkg/httputil" "github.com/Netflix/p2plab/pkg/stringutil" "github.com/Netflix/p2plab/query" + "github.com/Netflix/p2plab/scenarios" "github.com/Netflix/p2plab/transformers" "github.com/containerd/containerd/errdefs" "github.com/google/uuid" @@ -123,8 +124,38 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite defer func() { fmt.Println("cluster tearing down") }() - fmt.Println("creating plan from scenario") - fmt.Println("running plan on cluster") + plan, queries, err := scenarios.Plan(ctx, trial.Scenario, s.ts, s.seeder, query.NewLabeledSet()) + if err != nil { + return err + } + if _, err = s.db.CreateBenchmark(ctx, metadata.Benchmark{ + ID: uuid.New().String(), + Status: metadata.BenchmarkRunning, + Cluster: cluster, + Scenario: metadata.Scenario{ + Definition: trial.Scenario, + }, + Plan: plan, + Labels: cluster.Labels, + }); err != nil { + return err + } + var seederAddrs []string + for _, addr := range s.seeder.Host().Addrs() { + seederAddrs = append(seederAddrs, fmt.Sprintf("%s/p2p/%s", addr, s.seeder.Host().ID())) + } + execution, err := scenarios.Run(ctx, query.NewLabeledSet(), plan, seederAddrs) + if err != nil { + return errors.Wrap(err, "failed to run scenario plan") + } + report := metadata.Report{ + Summary: metadata.ReportSummary{ + TotalTime: execution.End.Sub(execution.Start), + }, + Nodes: execution.Report, + Queries: queries, + } + fmt.Printf("%+v\n", report) return nil }) } From 853ec43a05f2bdaa118501f5f12c3f4149bb99e5 Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 11:46:06 -0700 Subject: [PATCH 17/66] metadata: add a ToJSONIndent helper for experiment definitions --- metadata/experiment.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/metadata/experiment.go b/metadata/experiment.go index 9a96d18b..43aa09cc 100644 --- a/metadata/experiment.go +++ b/metadata/experiment.go @@ -52,6 +52,10 @@ type ExperimentDefinition struct { type IndependentVariable map[string]interface{} +func (ed *ExperimentDefinition) ToJSONIndent() ([]byte, error) { + return json.MarshalIndent(ed, "", " ") +} + // ToJSON is a helper function to convert an ExperimentDefinition // into it's JSON representation func (ed *ExperimentDefinition) ToJSON() ([]byte, error) { From 79e7cbf94984380745600b41410443687e08bcae Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 12:11:32 -0700 Subject: [PATCH 18/66] cue/cue.mod: fix labels accordingly --- cue/cue.mod/p2plab_example1.cue | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cue/cue.mod/p2plab_example1.cue b/cue/cue.mod/p2plab_example1.cue index 8da3290e..4cee7a74 100644 --- a/cue/cue.mod/p2plab_example1.cue +++ b/cue/cue.mod/p2plab_example1.cue @@ -23,7 +23,7 @@ clust1:: Cluster & { size: 2 instanceType: "t3.medium" region: "us-east-1" - labels: [ "'neighbors'" ] + labels: [ "neighbors" ] } ] } @@ -31,7 +31,7 @@ clust1:: Cluster & { scen1:: Scenario & { objects: [ items ] seed: { - "\"neighbors\"": "golang" + "neighbors": "golang" } benchmark: { "(not 'neighbors')": "golang" @@ -41,11 +41,11 @@ scen1:: Scenario & { scen2:: Scenario & { objects: [ items ] seed: { - "\"neighbors\"": "golang" + "neighbors": "golang" } benchmark: { - "(neighbors)": "golang" - "(not neighbors)": "mysql" + "('neighbors')": "golang" + "(not 'neighbors')": "mysql" } } From 078017db20eb0af0d39c549fd771ca35a75497e3 Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 12:21:20 -0700 Subject: [PATCH 19/66] transformers/oci: add default case to return unhandled media type --- transformers/oci/oci.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/transformers/oci/oci.go b/transformers/oci/oci.go index d32dfe73..29733a32 100644 --- a/transformers/oci/oci.go +++ b/transformers/oci/oci.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "io" "net/http" "path/filepath" @@ -202,6 +203,8 @@ func ConvertHandler(conversions map[digest.Digest]ocispec.Descriptor, peer p2pla defer rc.Close() target.Digest, err = AddBlob(ctx, peer, rc, opts...) + default: + return nil, fmt.Errorf("unhandled media type %s", desc.MediaType) } if err != nil { return nil, errors.Wrapf(err, "failed to convert %q [%s]", desc.Digest, desc.MediaType) From 57931eeaf35a4c26fb25699053b6064a031a001f Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 12:23:22 -0700 Subject: [PATCH 20/66] cue/cue.mod: fix incorrect function name --- cue/cue.mod/p2plab_example1.cue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cue/cue.mod/p2plab_example1.cue b/cue/cue.mod/p2plab_example1.cue index 4cee7a74..6278dcac 100644 --- a/cue/cue.mod/p2plab_example1.cue +++ b/cue/cue.mod/p2plab_example1.cue @@ -44,7 +44,7 @@ scen2:: Scenario & { "neighbors": "golang" } benchmark: { - "('neighbors')": "golang" + "(and 'neighbors')": "golang" "(not 'neighbors')": "mysql" } } From 80f6dcb0f9ccc752ba26adc9c968b05f3f25fe7d Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 12:24:30 -0700 Subject: [PATCH 21/66] cmd/labctl/command: use indent json marshal for dry.run --- cmd/labctl/command/experiment.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/labctl/command/experiment.go b/cmd/labctl/command/experiment.go index 26c62cfa..9f9109fb 100644 --- a/cmd/labctl/command/experiment.go +++ b/cmd/labctl/command/experiment.go @@ -117,7 +117,7 @@ func createExperimentAction(c *cli.Context) error { return err } if c.Bool("dry.run") { - jedef, err := edef.ToJSON() + jedef, err := edef.ToJSONIndent() if err != nil { return err } From bdd4c0c9ddd81dee2d842a55b3e5911a2e28fced Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 12:35:28 -0700 Subject: [PATCH 22/66] transformers/oci: add an error log --- transformers/oci/oci.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/transformers/oci/oci.go b/transformers/oci/oci.go index 29733a32..cd118a23 100644 --- a/transformers/oci/oci.go +++ b/transformers/oci/oci.go @@ -213,6 +213,12 @@ func ConvertHandler(conversions map[digest.Digest]ocispec.Descriptor, peer p2pla if zerolog.Ctx(ctx).GetLevel() == zerolog.DebugLevel { c, err := digestconv.DigestToCid(target.Digest) if err != nil { + zerolog.Ctx(ctx).Error().Str("mediaType", desc.MediaType). + Str("source", desc.Digest.String()). + Str("cid", c.String()). + Int64("size", desc.Size). + Err(err). + Msg("failed to retrieve CID from digest") return nil, err } zerolog.Ctx(ctx).Debug().Str("mediaType", desc.MediaType).Str("source", desc.Digest.String()).Str("cid", c.String()).Int64("size", desc.Size).Msg("Added blob to peer") From a324555e66d88fce4f23087881fe09c304bf183c Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 13:01:23 -0700 Subject: [PATCH 23/66] labd/routers/experimentrouter: make sure to retrieve nodes --- labd/routers/experimentrouter/router.go | 50 +++++++++++++++++++------ 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 980e2892..39fb0dbe 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -23,6 +23,7 @@ import ( "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/daemon" + "github.com/Netflix/p2plab/labd/controlapi" "github.com/Netflix/p2plab/labd/routers/helpers" "github.com/Netflix/p2plab/metadata" "github.com/Netflix/p2plab/peer" @@ -113,30 +114,55 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite return err } errg, ctx := errgroup.WithContext(ctx) - for _, trial := range exp.Definition.TrialDefinition { + for i, trial := range exp.Definition.TrialDefinition { trial := trial + name := fmt.Sprintf("%s-%v", uuid.New().String(), i) errg.Go(func() error { - cluster, err := s.rhelper.CreateCluster(ctx, trial.Cluster, uuid.New().String(), w) + cluster, err := s.rhelper.CreateCluster(ctx, trial.Cluster, name, w) + if err != nil { + return err + } + zerolog.Ctx(ctx).Info().Msg("creating scenario") + scenID := uuid.New().String() + scenario := metadata.Scenario{ + ID: scenID, + Definition: trial.Scenario, + Labels: []string{ + name, + }, + } + scenario, err = s.db.CreateScenario(ctx, scenario) if err != nil { return err } fmt.Printf("%+v\n", cluster) defer func() { - fmt.Println("cluster tearing down") + zerolog.Ctx(ctx).Info().Msg("tearing down cluster") }() - plan, queries, err := scenarios.Plan(ctx, trial.Scenario, s.ts, s.seeder, query.NewLabeledSet()) + mns, err := s.db.ListNodes(ctx, cluster.ID) + if err != nil { + return err + } + var ( + ns []p2plab.Node + lset = query.NewLabeledSet() + ) + for _, n := range mns { + node := controlapi.NewNode(s.client, n) + lset.Add(node) + ns = append(ns, node) + } + plan, queries, err := scenarios.Plan(ctx, trial.Scenario, s.ts, s.seeder, lset) if err != nil { return err } if _, err = s.db.CreateBenchmark(ctx, metadata.Benchmark{ - ID: uuid.New().String(), - Status: metadata.BenchmarkRunning, - Cluster: cluster, - Scenario: metadata.Scenario{ - Definition: trial.Scenario, - }, - Plan: plan, - Labels: cluster.Labels, + ID: uuid.New().String(), + Status: metadata.BenchmarkRunning, + Cluster: cluster, + Scenario: scenario, + Plan: plan, + Labels: cluster.Labels, }); err != nil { return err } From 440ae0840a706a0faa27f76def6f60385d5d6cfd Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 13:33:49 -0700 Subject: [PATCH 24/66] labd/routers/experimentrouter: use lset --- labd/routers/experimentrouter/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 39fb0dbe..5f2a261f 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -170,7 +170,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite for _, addr := range s.seeder.Host().Addrs() { seederAddrs = append(seederAddrs, fmt.Sprintf("%s/p2p/%s", addr, s.seeder.Host().ID())) } - execution, err := scenarios.Run(ctx, query.NewLabeledSet(), plan, seederAddrs) + execution, err := scenarios.Run(ctx, lset, plan, seederAddrs) if err != nil { return errors.Wrap(err, "failed to run scenario plan") } From e60abe573ea0a860ba960913ac6694bf78a21ee6 Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 13:54:55 -0700 Subject: [PATCH 25/66] labd/routers/experimentrouter: fix seed issues --- labd/routers/experimentrouter/router.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 5f2a261f..f868a4e4 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -19,8 +19,11 @@ import ( "fmt" "io/ioutil" "net/http" + "strconv" "strings" + "github.com/Netflix/p2plab/nodes" + "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/daemon" "github.com/Netflix/p2plab/labd/controlapi" @@ -96,6 +99,15 @@ func (s *router) getExperimentByName(ctx context.Context, w http.ResponseWriter, } func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + noReset := false + if r.FormValue("no-reset") != "" { + var err error + noReset, err = strconv.ParseBool(r.FormValue("no-reset")) + if err != nil { + return err + } + } + defer r.Body.Close() data, err := ioutil.ReadAll(r.Body) if err != nil { @@ -152,6 +164,14 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite lset.Add(node) ns = append(ns, node) } + if !noReset { + if err := nodes.Update(ctx, s.builder, ns); err != nil { + return errors.Wrap(err, "failed to update cluster") + } + if err := nodes.Connect(ctx, ns); err != nil { + return errors.Wrap(err, "failed to connect cluster") + } + } plan, queries, err := scenarios.Plan(ctx, trial.Scenario, s.ts, s.seeder, lset) if err != nil { return err From 9285c9643d78465696ef3cef2b9df9b0ea7a4b9a Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 14:24:59 -0700 Subject: [PATCH 26/66] labagent/supervisor: log context error, dont sigterm if nil --- labagent/supervisor/supervisor.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/labagent/supervisor/supervisor.go b/labagent/supervisor/supervisor.go index 168b3364..a101a185 100644 --- a/labagent/supervisor/supervisor.go +++ b/labagent/supervisor/supervisor.go @@ -165,9 +165,14 @@ func (s *supervisor) wait(ctx context.Context, flags []string) error { go func() { <-ctx.Done() - err := s.app.Process.Signal(syscall.SIGTERM) - if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("failed to SIGTERM labapp") + if ctx.Err() != nil { + zerolog.Ctx(ctx).Error().Err(ctx.Err()).Msg("context error is not nil") + } + if s.app.Process != nil { + err := s.app.Process.Signal(syscall.SIGTERM) + if err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("failed to SIGTERM labapp") + } } }() From 25193d5f3887e7b66c8330010a12ab6aa3a59217 Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 14:25:25 -0700 Subject: [PATCH 27/66] labd/routers/experimentrouter: make sure to cleanup cluster --- labd/routers/experimentrouter/router.go | 35 ++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index f868a4e4..cbd1d840 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -19,10 +19,13 @@ import ( "fmt" "io/ioutil" "net/http" + "os" "strconv" "strings" "github.com/Netflix/p2plab/nodes" + "github.com/Netflix/p2plab/reports" + "github.com/uber/jaeger-client-go" "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/daemon" @@ -148,8 +151,25 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite return err } fmt.Printf("%+v\n", cluster) - defer func() { + defer func() error { zerolog.Ctx(ctx).Info().Msg("tearing down cluster") + cluster, err := s.db.GetCluster(ctx, name) + if err != nil { + return errors.Wrap(err, "failed to get cluster'") + } + ns, err := s.db.ListNodes(ctx, cluster.ID) + if err != nil { + return errors.Wrap(err, "failed to list nodes") + } + ng := &p2plab.NodeGroup{ + ID: cluster.ID, + Nodes: ns, + } + if err := s.provider.DestroyNodeGroup(ctx, ng); err != nil { + return errors.Wrap(err, "failed to destroy node group") + } + zerolog.Ctx(ctx).Info().Msg("tore down cluster") + return nil }() mns, err := s.db.ListNodes(ctx, cluster.ID) if err != nil { @@ -176,14 +196,15 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite if err != nil { return err } - if _, err = s.db.CreateBenchmark(ctx, metadata.Benchmark{ + benchmark := metadata.Benchmark{ ID: uuid.New().String(), Status: metadata.BenchmarkRunning, Cluster: cluster, Scenario: scenario, Plan: plan, Labels: cluster.Labels, - }); err != nil { + } + if benchmark, err = s.db.CreateBenchmark(ctx, benchmark); err != nil { return err } var seederAddrs []string @@ -201,6 +222,14 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite Nodes: execution.Report, Queries: queries, } + report.Aggregates = reports.ComputeAggregates(report.Nodes) + jaegerUI := os.Getenv("JAEGER_UI") + if jaegerUI != "" { + sc, ok := execution.Span.Context().(jaeger.SpanContext) + if ok { + report.Summary.Trace = fmt.Sprintf("%s/trace/%s", jaegerUI, sc.TraceID()) + } + } fmt.Printf("%+v\n", report) return nil }) From 8e4721b6be8fe3731e77379c66bf42bf832a04e6 Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 14:39:44 -0700 Subject: [PATCH 28/66] labd/routers/experimentrouter: make sure to update benchmark --- labd/routers/experimentrouter/router.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index cbd1d840..02c25454 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -26,6 +26,7 @@ import ( "github.com/Netflix/p2plab/nodes" "github.com/Netflix/p2plab/reports" "github.com/uber/jaeger-client-go" + bolt "go.etcd.io/bbolt" "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/daemon" @@ -230,8 +231,24 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite report.Summary.Trace = fmt.Sprintf("%s/trace/%s", jaegerUI, sc.TraceID()) } } - fmt.Printf("%+v\n", report) - return nil + zerolog.Ctx(ctx).Info().Msg("Updating benchmark metadata") + err = s.db.Update(ctx, func(tx *bolt.Tx) error { + tctx := metadata.WithTransactionContext(ctx, tx) + + err := s.db.CreateReport(tctx, benchmark.ID, report) + if err != nil { + return errors.Wrap(err, "failed to create report") + } + + benchmark.Status = metadata.BenchmarkDone + _, err = s.db.UpdateBenchmark(tctx, benchmark) + if err != nil { + return errors.Wrap(err, "failed to update benchmark") + } + + return nil + }) + return err }) } return errg.Wait() From 63532d3b6145c4041dcad282f1037c4232b311ca Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 15:12:03 -0700 Subject: [PATCH 29/66] labd/routers/experimentrouter: temporarily get single benchmark working --- labd/routers/experimentrouter/router.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 02c25454..27c0cb86 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -131,6 +131,13 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite } errg, ctx := errgroup.WithContext(ctx) for i, trial := range exp.Definition.TrialDefinition { + // TODO(bonedaddy): if we dont do this, then we run into some issues + // with port numbers being re-used. For example when we start the goroutines + // for each of the benchmarks they use the same ports for all peers + // and as such we run into port issues + if i > 0 { + break + } trial := trial name := fmt.Sprintf("%s-%v", uuid.New().String(), i) errg.Go(func() error { @@ -197,13 +204,18 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite if err != nil { return err } + bid := uuid.New().String() benchmark := metadata.Benchmark{ - ID: uuid.New().String(), + ID: bid, Status: metadata.BenchmarkRunning, Cluster: cluster, Scenario: scenario, Plan: plan, - Labels: cluster.Labels, + Labels: []string{ + cluster.ID, + scenario.ID, + bid, + }, } if benchmark, err = s.db.CreateBenchmark(ctx, benchmark); err != nil { return err From 9378097e8e7501a9899352c15399964c5582fa1b Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 15:18:45 -0700 Subject: [PATCH 30/66] labd/routers/experimentrouter: temporarily log the report --- labd/routers/experimentrouter/router.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 27c0cb86..423cc2db 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -16,6 +16,7 @@ package experimentrouter import ( "context" + "encoding/json" "fmt" "io/ioutil" "net/http" @@ -260,6 +261,13 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite return nil }) + if err == nil { + data, err := json.MarshalIndent(report, "", " ") + if err != nil { + return err + } + fmt.Println(string(data)) + } return err }) } From cfcdcac204fa172b5f463df9a72a60c4a7041423 Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 15:44:15 -0700 Subject: [PATCH 31/66] labd/routers/experimentrouter: write report via json --- labd/routers/experimentrouter/router.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 423cc2db..28d64505 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -16,7 +16,6 @@ package experimentrouter import ( "context" - "encoding/json" "fmt" "io/ioutil" "net/http" @@ -261,13 +260,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite return nil }) - if err == nil { - data, err := json.MarshalIndent(report, "", " ") - if err != nil { - return err - } - fmt.Println(string(data)) - } + daemon.WriteJSON(w, report) return err }) } From 4a53b28b44a497c3c5208241f160fff7e7f4e69e Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 16:12:59 -0700 Subject: [PATCH 32/66] labd/controlapi: use json indent helper --- labd/controlapi/experiment.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/labd/controlapi/experiment.go b/labd/controlapi/experiment.go index 7ec69f70..e2072924 100644 --- a/labd/controlapi/experiment.go +++ b/labd/controlapi/experiment.go @@ -32,7 +32,7 @@ type experimentAPI struct { } func (a *experimentAPI) Create(ctx context.Context, id string, edef metadata.ExperimentDefinition) (p2plab.Experiment, error) { - content, err := json.MarshalIndent(&edef, "", " ") + content, err := edef.ToJSONIndent() if err != nil { return nil, err } From 3ecf563520f69c62a11a47f128161439754a585b Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 16:33:04 -0700 Subject: [PATCH 33/66] cue/cue.mod: comment out a test that is being problematic?? --- cue/cue.mod/p2plab_example1.cue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cue/cue.mod/p2plab_example1.cue b/cue/cue.mod/p2plab_example1.cue index 6278dcac..76bfee9f 100644 --- a/cue/cue.mod/p2plab_example1.cue +++ b/cue/cue.mod/p2plab_example1.cue @@ -44,7 +44,7 @@ scen2:: Scenario & { "neighbors": "golang" } benchmark: { - "(and 'neighbors')": "golang" +// "(and 'neighbors')": "golang" "(not 'neighbors')": "mysql" } } From 327a3ecc08a9023c9daa9388f09bc7c168547368 Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 16:56:28 -0700 Subject: [PATCH 34/66] metadata: add reports to the experiment struct --- metadata/experiment.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metadata/experiment.go b/metadata/experiment.go index 43aa09cc..05f81144 100644 --- a/metadata/experiment.go +++ b/metadata/experiment.go @@ -34,6 +34,8 @@ type Experiment struct { Labels []string CreatedAt, UpdatedAt time.Time + + Reports []Report } type ExperimentStatus string From 7902abbce45788bc9b7ebbebc966ff62f2d5654d Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 17:02:27 -0700 Subject: [PATCH 35/66] labd/routers/experimentrouter: write json after all benchmarks are ran --- labd/routers/experimentrouter/router.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 28d64505..5ee26c18 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -260,10 +260,11 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite return nil }) - daemon.WriteJSON(w, report) + exp.Reports = append(exp.Reports, report) return err }) } + daemon.WriteJSON(w, &exp) return errg.Wait() } From d111b4ec9fcef3a555c4956f0ee2b03855cb7425 Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 4 May 2020 17:26:57 -0700 Subject: [PATCH 36/66] labd/routers/experimentrouter: use derived context to help cleanups --- labd/routers/experimentrouter/router.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 5ee26c18..1e91e1fb 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -103,6 +103,8 @@ func (s *router) getExperimentByName(ctx context.Context, w http.ResponseWriter, } func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() noReset := false if r.FormValue("no-reset") != "" { var err error From 135203b8d917d2f6570a68dff5cf4e9e6c140a6b Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 12:43:24 -0700 Subject: [PATCH 37/66] metadata: add helper functions to experiment --- metadata/experiment.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/metadata/experiment.go b/metadata/experiment.go index 05f81144..0b1345d7 100644 --- a/metadata/experiment.go +++ b/metadata/experiment.go @@ -38,6 +38,22 @@ type Experiment struct { Reports []Report } +// ToJSONIndent is like to ToJSON but formats the json data +func (e *Experiment) ToJSONIndent() ([]byte, error) { + return json.MarshalIndent(e, "", " ") +} + +// ToJSON is a helper function to convert an Experiment +// into it's JSON representation +func (e *Experiment) ToJSON() ([]byte, error) { + return json.Marshal(e) +} + +// FromJSON loads the experiment definition with the values from data +func (e *Experiment) FromJSON(data []byte) error { + return json.Unmarshal(data, e) +} + type ExperimentStatus string var ( @@ -54,6 +70,7 @@ type ExperimentDefinition struct { type IndependentVariable map[string]interface{} +// ToJSONIndent is like to ToJSON but formats the json data func (ed *ExperimentDefinition) ToJSONIndent() ([]byte, error) { return json.MarshalIndent(ed, "", " ") } From 4858dec4af9c866cf795eb60bbbc6bec91ec4874 Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 13:14:12 -0700 Subject: [PATCH 38/66] cmd/labapp/command: temporarily override the logger we use --- cmd/labapp/command/root.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cmd/labapp/command/root.go b/cmd/labapp/command/root.go index 1249912b..6fc1963c 100644 --- a/cmd/labapp/command/root.go +++ b/cmd/labapp/command/root.go @@ -19,6 +19,7 @@ import ( "context" "encoding/base64" "os" + "time" "github.com/Netflix/p2plab/labapp" "github.com/Netflix/p2plab/metadata" @@ -105,7 +106,10 @@ func appAction(c *cli.Context) error { if err != nil { return err } - + fh, err := os.Create("root/" + time.Now().String()) + if err != nil { + return err + } ctx := cliutil.CommandContext(c) ctx, tracer, closer := traceutil.New(ctx, "labapp", nil) defer closer.Close() @@ -125,8 +129,8 @@ func appAction(c *cli.Context) error { ctx = opentracing.ContextWithSpan(ctx, span) } } - - app, err := labapp.New(ctx, root, c.GlobalString("address"), c.GlobalInt("libp2p-port"), zerolog.Ctx(ctx), metadata.PeerDefinition{ + logger := zerolog.Ctx(ctx).Output(fh) + app, err := labapp.New(ctx, root, c.GlobalString("address"), c.GlobalInt("libp2p-port"), &logger, metadata.PeerDefinition{ Transports: c.GlobalStringSlice("libp2p-transports"), Muxers: c.GlobalStringSlice("libp2p-muxers"), SecurityTransports: c.GlobalStringSlice("libp2p-security-transports"), From f7155440b7c77e4995213fd3abb75b859bd988bd Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 15:14:43 -0700 Subject: [PATCH 39/66] cmd/labapp/command: close the file handler when context is cancelled --- cmd/labapp/command/root.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/labapp/command/root.go b/cmd/labapp/command/root.go index 6fc1963c..34461035 100644 --- a/cmd/labapp/command/root.go +++ b/cmd/labapp/command/root.go @@ -113,7 +113,10 @@ func appAction(c *cli.Context) error { ctx := cliutil.CommandContext(c) ctx, tracer, closer := traceutil.New(ctx, "labapp", nil) defer closer.Close() - + go func() { + <-ctx.Done() + fh.Close() + }() if c.IsSet("trace") { trace, err := base64.StdEncoding.DecodeString(c.GlobalString("trace")) if err != nil { From 26207cedfca0c1da41a9c3b0be4f9a4a956b282c Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 16:06:39 -0700 Subject: [PATCH 40/66] labagent: redirect agent log to disk --- labagent/labagent.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/labagent/labagent.go b/labagent/labagent.go index b6496661..b05478f3 100644 --- a/labagent/labagent.go +++ b/labagent/labagent.go @@ -17,6 +17,7 @@ package labagent import ( "context" "io" + "os" "path/filepath" "github.com/Netflix/p2plab/daemon" @@ -41,8 +42,13 @@ func New(root, addr, appRoot, appAddr string, logger *zerolog.Logger, opts ...La return nil, err } } - - client, err := httputil.NewClient(httputil.NewHTTPClient(), httputil.WithLogger(logger)) + os.MkdirAll(root, 0711) + fh, err := os.Create(root + "/labagent.log") + if err != nil { + return nil, err + } + loggr := logger.Output(fh) + client, err := httputil.NewClient(httputil.NewHTTPClient(), httputil.WithLogger(&loggr)) if err != nil { return nil, err } @@ -56,7 +62,8 @@ func New(root, addr, appRoot, appAddr string, logger *zerolog.Logger, opts ...La } var closers []io.Closer - daemon, err := daemon.New("labagent", addr, logger, + closers = append(closers, fh) + daemon, err := daemon.New("labagent", addr, &loggr, healthcheckrouter.New(), agentrouter.New(appAddr, s), ) From 4ff7fa0bad8afe3ee19495b758876a8c6629ad38 Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 16:07:30 -0700 Subject: [PATCH 41/66] labagent: set to debug log level --- labagent/labagent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/labagent/labagent.go b/labagent/labagent.go index b05478f3..0b5d528c 100644 --- a/labagent/labagent.go +++ b/labagent/labagent.go @@ -47,7 +47,7 @@ func New(root, addr, appRoot, appAddr string, logger *zerolog.Logger, opts ...La if err != nil { return nil, err } - loggr := logger.Output(fh) + loggr := logger.Output(fh).Level(zerolog.DebugLevel) client, err := httputil.NewClient(httputil.NewHTTPClient(), httputil.WithLogger(&loggr)) if err != nil { return nil, err From b9dc57af40dfcc2791eb9127b0df0b04f46e5cd4 Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 16:18:19 -0700 Subject: [PATCH 42/66] cmd/labapp/command: revert the logger changes --- cmd/labapp/command/root.go | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/cmd/labapp/command/root.go b/cmd/labapp/command/root.go index 34461035..59b71cd4 100644 --- a/cmd/labapp/command/root.go +++ b/cmd/labapp/command/root.go @@ -19,7 +19,6 @@ import ( "context" "encoding/base64" "os" - "time" "github.com/Netflix/p2plab/labapp" "github.com/Netflix/p2plab/metadata" @@ -106,17 +105,10 @@ func appAction(c *cli.Context) error { if err != nil { return err } - fh, err := os.Create("root/" + time.Now().String()) - if err != nil { - return err - } ctx := cliutil.CommandContext(c) ctx, tracer, closer := traceutil.New(ctx, "labapp", nil) defer closer.Close() - go func() { - <-ctx.Done() - fh.Close() - }() + if c.IsSet("trace") { trace, err := base64.StdEncoding.DecodeString(c.GlobalString("trace")) if err != nil { @@ -132,8 +124,7 @@ func appAction(c *cli.Context) error { ctx = opentracing.ContextWithSpan(ctx, span) } } - logger := zerolog.Ctx(ctx).Output(fh) - app, err := labapp.New(ctx, root, c.GlobalString("address"), c.GlobalInt("libp2p-port"), &logger, metadata.PeerDefinition{ + app, err := labapp.New(ctx, root, c.GlobalString("address"), c.GlobalInt("libp2p-port"), zerolog.Ctx(ctx), metadata.PeerDefinition{ Transports: c.GlobalStringSlice("libp2p-transports"), Muxers: c.GlobalStringSlice("libp2p-muxers"), SecurityTransports: c.GlobalStringSlice("libp2p-security-transports"), From c522c0bd3ff3e03f01b62a8532b60babe92d4fef Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 16:22:59 -0700 Subject: [PATCH 43/66] labd/routers/experimentrouter: us xid --- labd/routers/experimentrouter/router.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 1e91e1fb..a9328c8e 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -40,8 +40,8 @@ import ( "github.com/Netflix/p2plab/scenarios" "github.com/Netflix/p2plab/transformers" "github.com/containerd/containerd/errdefs" - "github.com/google/uuid" "github.com/pkg/errors" + "github.com/rs/xid" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" ) @@ -124,7 +124,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite return err } exp, err := s.db.CreateExperiment(ctx, metadata.Experiment{ - ID: uuid.New().String(), + ID: xid.New().String(), Definition: edef, Status: metadata.ExperimentRunning, }) @@ -141,14 +141,14 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite break } trial := trial - name := fmt.Sprintf("%s-%v", uuid.New().String(), i) + name := fmt.Sprintf("%s-%v", xid.New().String(), i) errg.Go(func() error { cluster, err := s.rhelper.CreateCluster(ctx, trial.Cluster, name, w) if err != nil { return err } zerolog.Ctx(ctx).Info().Msg("creating scenario") - scenID := uuid.New().String() + scenID := xid.New().String() scenario := metadata.Scenario{ ID: scenID, Definition: trial.Scenario, @@ -206,7 +206,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite if err != nil { return err } - bid := uuid.New().String() + bid := xid.New().String() benchmark := metadata.Benchmark{ ID: bid, Status: metadata.BenchmarkRunning, From 1ce20c72c682efcc77eecc11e4f551df7fae8e99 Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 17:21:39 -0700 Subject: [PATCH 44/66] labd/routers/experimentrouter: fix daemon not writing properly --- labd/routers/experimentrouter/router.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index a9328c8e..951b707a 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io/ioutil" + "log" "net/http" "os" "strconv" @@ -262,12 +263,17 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite return nil }) + fmt.Printf("%v\n", report) exp.Reports = append(exp.Reports, report) return err }) } + if len(exp.Reports) == 0 { + log.Fatal("no reports found") + } + err := errg.Wait() daemon.WriteJSON(w, &exp) - return errg.Wait() + return err } func (s *router) putExperimentsLabel(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { From 8939bc7acbffe58eded0e823a19a6b2aeca1c0e4 Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 17:23:54 -0700 Subject: [PATCH 45/66] labd/routers/experimentrouter: fix bad assign --- labd/routers/experimentrouter/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 951b707a..f6928271 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -271,7 +271,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite if len(exp.Reports) == 0 { log.Fatal("no reports found") } - err := errg.Wait() + err = errg.Wait() daemon.WriteJSON(w, &exp) return err } From 56cbee4034d860ec20ca517352fa4a542c9aaa64 Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 17:26:36 -0700 Subject: [PATCH 46/66] labd/routers/experimentrouter: remove early exit --- labd/routers/experimentrouter/router.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index f6928271..6a230ead 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "io/ioutil" - "log" "net/http" "os" "strconv" @@ -268,9 +267,6 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite return err }) } - if len(exp.Reports) == 0 { - log.Fatal("no reports found") - } err = errg.Wait() daemon.WriteJSON(w, &exp) return err From e8acbf72fcbd7e470837a5cf49ec267cb6076fd6 Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 17:30:32 -0700 Subject: [PATCH 47/66] labd/routers/experimentrouter: remove useless print --- labd/routers/experimentrouter/router.go | 1 - 1 file changed, 1 deletion(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 6a230ead..1571ade6 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -262,7 +262,6 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite return nil }) - fmt.Printf("%v\n", report) exp.Reports = append(exp.Reports, report) return err }) From 44530a55324be9be444578aa03a69ad4e883a794 Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 17:30:54 -0700 Subject: [PATCH 48/66] labagent/supervisor: does this fix it? --- labagent/supervisor/supervisor.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/labagent/supervisor/supervisor.go b/labagent/supervisor/supervisor.go index a101a185..48812c18 100644 --- a/labagent/supervisor/supervisor.go +++ b/labagent/supervisor/supervisor.go @@ -77,8 +77,10 @@ func New(root, appRoot, appAddr string, client *httputil.Client, fs *downloaders } func (s *supervisor) Supervise(ctx context.Context, id, link string, pdef metadata.PeerDefinition) error { + zerolog.Ctx(ctx).Info().Msg("starting supervise process") err := s.kill(ctx) if err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("failed to kill processes") return err } @@ -125,6 +127,7 @@ func (s *supervisor) peerDefinitionToFlags(id string, pdef metadata.PeerDefiniti } func (s *supervisor) start(ctx context.Context, flags []string) error { + zerolog.Ctx(ctx).Info().Msg("entered start") var actx context.Context actx, s.cancel = context.WithCancel(context.Background()) s.app = s.cmd(actx, flags...) @@ -134,7 +137,7 @@ func (s *supervisor) start(ctx context.Context, flags []string) error { } v := new(bytes.Buffer) - versionCmd := s.cmdWithStdio(ctx, v, ioutil.Discard, "--version") + versionCmd := s.cmdWithStdio(actx, v, ioutil.Discard, "--version") err = versionCmd.Run() if err != nil { return err From be65c583ef3abcf33fb5d93e3962e54cb3a983e5 Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 5 May 2020 18:52:43 -0700 Subject: [PATCH 49/66] labagent/supervisor: remove no longer needed logs --- labagent/supervisor/supervisor.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/labagent/supervisor/supervisor.go b/labagent/supervisor/supervisor.go index 48812c18..eb040f2d 100644 --- a/labagent/supervisor/supervisor.go +++ b/labagent/supervisor/supervisor.go @@ -77,10 +77,8 @@ func New(root, appRoot, appAddr string, client *httputil.Client, fs *downloaders } func (s *supervisor) Supervise(ctx context.Context, id, link string, pdef metadata.PeerDefinition) error { - zerolog.Ctx(ctx).Info().Msg("starting supervise process") err := s.kill(ctx) if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("failed to kill processes") return err } @@ -127,7 +125,6 @@ func (s *supervisor) peerDefinitionToFlags(id string, pdef metadata.PeerDefiniti } func (s *supervisor) start(ctx context.Context, flags []string) error { - zerolog.Ctx(ctx).Info().Msg("entered start") var actx context.Context actx, s.cancel = context.WithCancel(context.Background()) s.app = s.cmd(actx, flags...) From bb676948419bb15b07e08fab3066760a37d8ad48 Mon Sep 17 00:00:00 2001 From: postables Date: Thu, 7 May 2020 17:37:58 -0700 Subject: [PATCH 50/66] labd/routers/experimentrouter: add more logs for debugging purposes --- labd/routers/experimentrouter/router.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 1571ade6..61469449 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -143,11 +143,12 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite trial := trial name := fmt.Sprintf("%s-%v", xid.New().String(), i) errg.Go(func() error { + info := zerolog.Ctx(ctx).Info() cluster, err := s.rhelper.CreateCluster(ctx, trial.Cluster, name, w) if err != nil { return err } - zerolog.Ctx(ctx).Info().Msg("creating scenario") + info.Msg("creating scenario") scenID := xid.New().String() scenario := metadata.Scenario{ ID: scenID, @@ -160,9 +161,8 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite if err != nil { return err } - fmt.Printf("%+v\n", cluster) defer func() error { - zerolog.Ctx(ctx).Info().Msg("tearing down cluster") + info.Msg("tearing down cluster") cluster, err := s.db.GetCluster(ctx, name) if err != nil { return errors.Wrap(err, "failed to get cluster'") @@ -178,9 +178,10 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite if err := s.provider.DestroyNodeGroup(ctx, ng); err != nil { return errors.Wrap(err, "failed to destroy node group") } - zerolog.Ctx(ctx).Info().Msg("tore down cluster") + info.Msg("tore down cluster") return nil }() + info.Msg("creating nodes") mns, err := s.db.ListNodes(ctx, cluster.ID) if err != nil { return err @@ -195,13 +196,16 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite ns = append(ns, node) } if !noReset { + info.Msg("updating nodes") if err := nodes.Update(ctx, s.builder, ns); err != nil { return errors.Wrap(err, "failed to update cluster") } + info.Msg("connecting nodes") if err := nodes.Connect(ctx, ns); err != nil { return errors.Wrap(err, "failed to connect cluster") } } + info.Msg("generating scenario plan") plan, queries, err := scenarios.Plan(ctx, trial.Scenario, s.ts, s.seeder, lset) if err != nil { return err @@ -219,6 +223,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite bid, }, } + info.Msg("creating benchmark") if benchmark, err = s.db.CreateBenchmark(ctx, benchmark); err != nil { return err } @@ -226,6 +231,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite for _, addr := range s.seeder.Host().Addrs() { seederAddrs = append(seederAddrs, fmt.Sprintf("%s/p2p/%s", addr, s.seeder.Host().ID())) } + info.Msg("running scenario") execution, err := scenarios.Run(ctx, lset, plan, seederAddrs) if err != nil { return errors.Wrap(err, "failed to run scenario plan") @@ -237,6 +243,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite Nodes: execution.Report, Queries: queries, } + info.Msg("aggregating results") report.Aggregates = reports.ComputeAggregates(report.Nodes) jaegerUI := os.Getenv("JAEGER_UI") if jaegerUI != "" { @@ -245,7 +252,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite report.Summary.Trace = fmt.Sprintf("%s/trace/%s", jaegerUI, sc.TraceID()) } } - zerolog.Ctx(ctx).Info().Msg("Updating benchmark metadata") + info.Msg("Updating benchmark metadata") err = s.db.Update(ctx, func(tx *bolt.Tx) error { tctx := metadata.WithTransactionContext(ctx, tx) From 79cb1be6599f352ac59111f677741027b4ddd806 Mon Sep 17 00:00:00 2001 From: postables Date: Thu, 7 May 2020 18:21:20 -0700 Subject: [PATCH 51/66] labd/routers/experimentrouter: add another log --- labd/routers/experimentrouter/router.go | 1 + 1 file changed, 1 insertion(+) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 61469449..881c92fe 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -269,6 +269,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite return nil }) + info.Msg("updating reports") exp.Reports = append(exp.Reports, report) return err }) From 7f1efea5dc7bd6e0803b246737e8b23d94228ddd Mon Sep 17 00:00:00 2001 From: postables Date: Thu, 7 May 2020 18:54:06 -0700 Subject: [PATCH 52/66] labagent/supervisor: comment our kill to demonstrate bug --- labagent/supervisor/supervisor.go | 6 +++--- labd/routers/experimentrouter/router.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/labagent/supervisor/supervisor.go b/labagent/supervisor/supervisor.go index eb040f2d..2b086f9b 100644 --- a/labagent/supervisor/supervisor.go +++ b/labagent/supervisor/supervisor.go @@ -76,12 +76,12 @@ func New(root, appRoot, appAddr string, client *httputil.Client, fs *downloaders }, nil } -func (s *supervisor) Supervise(ctx context.Context, id, link string, pdef metadata.PeerDefinition) error { - err := s.kill(ctx) +func (s *supervisor) Supervise(ctx context.Context, id, link string, pdef metadata.PeerDefinition) (err error) { + /*err := s.kill(ctx) if err != nil { return err } - + */ flags := s.peerDefinitionToFlags(id, pdef) if link != "" { err = s.atomicReplaceBinary(ctx, link) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 881c92fe..091987ef 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -137,9 +137,9 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite // with port numbers being re-used. For example when we start the goroutines // for each of the benchmarks they use the same ports for all peers // and as such we run into port issues - if i > 0 { - break - } + //if i > 0 { + // break + //} trial := trial name := fmt.Sprintf("%s-%v", xid.New().String(), i) errg.Go(func() error { From 2ca83149afa10159f80393e6a4bab28cfed8d114 Mon Sep 17 00:00:00 2001 From: postables Date: Thu, 7 May 2020 19:21:17 -0700 Subject: [PATCH 53/66] labagent/supervisor: remove the kill cancel out --- labagent/supervisor/supervisor.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/labagent/supervisor/supervisor.go b/labagent/supervisor/supervisor.go index 2b086f9b..1a78d270 100644 --- a/labagent/supervisor/supervisor.go +++ b/labagent/supervisor/supervisor.go @@ -76,12 +76,11 @@ func New(root, appRoot, appAddr string, client *httputil.Client, fs *downloaders }, nil } -func (s *supervisor) Supervise(ctx context.Context, id, link string, pdef metadata.PeerDefinition) (err error) { - /*err := s.kill(ctx) +func (s *supervisor) Supervise(ctx context.Context, id, link string, pdef metadata.PeerDefinition) error { + err := s.kill(ctx) if err != nil { return err } - */ flags := s.peerDefinitionToFlags(id, pdef) if link != "" { err = s.atomicReplaceBinary(ctx, link) From 23ccb6042b323efff5ed95168f942f5b7cd4f063 Mon Sep 17 00:00:00 2001 From: postables Date: Thu, 7 May 2020 19:21:29 -0700 Subject: [PATCH 54/66] labd/routeres/experimentrouter: use xid to generate name --- labd/routers/experimentrouter/router.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 091987ef..631459e6 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -132,7 +132,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite return err } errg, ctx := errgroup.WithContext(ctx) - for i, trial := range exp.Definition.TrialDefinition { + for _, trial := range exp.Definition.TrialDefinition { // TODO(bonedaddy): if we dont do this, then we run into some issues // with port numbers being re-used. For example when we start the goroutines // for each of the benchmarks they use the same ports for all peers @@ -141,7 +141,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite // break //} trial := trial - name := fmt.Sprintf("%s-%v", xid.New().String(), i) + name := xid.New().String() errg.Go(func() error { info := zerolog.Ctx(ctx).Info() cluster, err := s.rhelper.CreateCluster(ctx, trial.Cluster, name, w) From 5a8a4d78f3f8f707d25d832662e3dc351149a56c Mon Sep 17 00:00:00 2001 From: postables Date: Thu, 7 May 2020 22:51:44 -0700 Subject: [PATCH 55/66] providers/inmemory: add a port helper --- providers/inmemory/port_helper.go | 32 +++++++++++++++++++++++++++++++ providers/inmemory/provider.go | 23 +++++++++++----------- 2 files changed, 43 insertions(+), 12 deletions(-) create mode 100644 providers/inmemory/port_helper.go diff --git a/providers/inmemory/port_helper.go b/providers/inmemory/port_helper.go new file mode 100644 index 00000000..0aa23173 --- /dev/null +++ b/providers/inmemory/port_helper.go @@ -0,0 +1,32 @@ +package inmemory + +import ( + "sync" + + "github.com/phayes/freeport" +) + +// portHelper enables synchronizing access to free ports +// across multiple different trials and benchmarks, and prevents +// accidental port re-use. +type portHelper struct { + inUse map[int]bool + mux sync.Mutex +} + +func (ph *portHelper) getPorts(num int) (int, int, error) { + ph.mux.Lock() + defer ph.mux.Unlock() + for { + freePorts, err := freeport.GetFreePorts(2) + if err != nil { + return 0, 0, err + } + if ph.inUse[freePorts[0]] || ph.inUse[freePorts[1]] { + continue + } + ph.inUse[freePorts[0]] = true + ph.inUse[freePorts[1]] = true + return freePorts[0], freePorts[1], nil + } +} diff --git a/providers/inmemory/provider.go b/providers/inmemory/provider.go index f77ecc6d..cb139655 100644 --- a/providers/inmemory/provider.go +++ b/providers/inmemory/provider.go @@ -23,16 +23,16 @@ import ( "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/labagent" "github.com/Netflix/p2plab/metadata" - "github.com/phayes/freeport" "github.com/rs/xid" "github.com/rs/zerolog" ) type provider struct { - root string - nodes map[string][]*node - logger *zerolog.Logger - agentOpts []labagent.LabagentOption + root string + nodes map[string][]*node + logger *zerolog.Logger + agentOpts []labagent.LabagentOption + portHelper *portHelper } func New(root string, db metadata.DB, logger *zerolog.Logger, agentOpts ...labagent.LabagentOption) (p2plab.NodeProvider, error) { @@ -42,10 +42,11 @@ func New(root string, db metadata.DB, logger *zerolog.Logger, agentOpts ...labag } p := &provider{ - root: root, - nodes: make(map[string][]*node), - logger: logger, - agentOpts: agentOpts, + root: root, + nodes: make(map[string][]*node), + logger: logger, + agentOpts: agentOpts, + portHelper: &portHelper{inUse: make(map[int]bool)}, } ctx := context.Background() @@ -76,12 +77,10 @@ func (p *provider) CreateNodeGroup(ctx context.Context, id string, cdef metadata var ns []metadata.Node for _, group := range cdef.Groups { for i := 0; i < group.Size; i++ { - freePorts, err := freeport.GetFreePorts(2) + agentPort, appPort, err := p.portHelper.getPorts(2) if err != nil { return nil, err } - agentPort, appPort := freePorts[0], freePorts[1] - id := xid.New().String() n, err := p.newNode(id, agentPort, appPort) if err != nil { From a87dcf8e49f06fc159d520593884f72ab212fadb Mon Sep 17 00:00:00 2001 From: postables Date: Fri, 8 May 2020 14:04:52 -0700 Subject: [PATCH 56/66] providers/inmemory: log free ports --- providers/inmemory/port_helper.go | 7 ++++++- providers/inmemory/provider.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/providers/inmemory/port_helper.go b/providers/inmemory/port_helper.go index 0aa23173..9ebf6c91 100644 --- a/providers/inmemory/port_helper.go +++ b/providers/inmemory/port_helper.go @@ -1,9 +1,11 @@ package inmemory import ( + "context" "sync" "github.com/phayes/freeport" + "github.com/rs/zerolog" ) // portHelper enables synchronizing access to free ports @@ -14,17 +16,20 @@ type portHelper struct { mux sync.Mutex } -func (ph *portHelper) getPorts(num int) (int, int, error) { +func (ph *portHelper) getPorts(ctx context.Context, num int) (int, int, error) { ph.mux.Lock() defer ph.mux.Unlock() for { + zerolog.Ctx(ctx).Info().Msg("getting free ports") freePorts, err := freeport.GetFreePorts(2) if err != nil { return 0, 0, err } if ph.inUse[freePorts[0]] || ph.inUse[freePorts[1]] { + zerolog.Ctx(ctx).Warn().Msgf("got in use ports, %v and %v, trying again", freePorts[0], freePorts[1]) continue } + zerolog.Ctx(ctx).Info().Msg("found available ports") ph.inUse[freePorts[0]] = true ph.inUse[freePorts[1]] = true return freePorts[0], freePorts[1], nil diff --git a/providers/inmemory/provider.go b/providers/inmemory/provider.go index cb139655..e068f75c 100644 --- a/providers/inmemory/provider.go +++ b/providers/inmemory/provider.go @@ -77,7 +77,7 @@ func (p *provider) CreateNodeGroup(ctx context.Context, id string, cdef metadata var ns []metadata.Node for _, group := range cdef.Groups { for i := 0; i < group.Size; i++ { - agentPort, appPort, err := p.portHelper.getPorts(2) + agentPort, appPort, err := p.portHelper.getPorts(ctx, 2) if err != nil { return nil, err } From 5ac5d6cf8b2e331a4d5b6038a7604f147fa16c1c Mon Sep 17 00:00:00 2001 From: postables Date: Fri, 8 May 2020 14:30:51 -0700 Subject: [PATCH 57/66] providers/inmemory: return ports when not in use --- providers/inmemory/port_helper.go | 19 +++++++++++++++++++ providers/inmemory/provider.go | 1 + 2 files changed, 20 insertions(+) diff --git a/providers/inmemory/port_helper.go b/providers/inmemory/port_helper.go index 9ebf6c91..d1de9873 100644 --- a/providers/inmemory/port_helper.go +++ b/providers/inmemory/port_helper.go @@ -35,3 +35,22 @@ func (ph *portHelper) getPorts(ctx context.Context, num int) (int, int, error) { return freePorts[0], freePorts[1], nil } } + +func (ph *portHelper) returnPorts(ctx context.Context, ports []int) { + ph.mux.Lock() + defer ph.mux.Unlock() + returned := 0 + for _, port := range ports { + if !ph.inUse[port] { + zerolog.Ctx(ctx).Warn().Msg("trying to return unused port") + continue + } + ph.inUse[port] = false + returned++ + } + if returned == 0 { + zerolog.Ctx(ctx).Warn().Msg("no ports retruend") + } else { + zerolog.Ctx(ctx).Info().Msgf("successfully returned %v ports", returned) + } +} diff --git a/providers/inmemory/provider.go b/providers/inmemory/provider.go index e068f75c..b1b8f7ee 100644 --- a/providers/inmemory/provider.go +++ b/providers/inmemory/provider.go @@ -115,6 +115,7 @@ func (p *provider) DestroyNodeGroup(ctx context.Context, ng *p2plab.NodeGroup) e if err != nil { return err } + p.portHelper.returnPorts(ctx, []int{n.AgentPort, n.AppPort}) } delete(p.nodes, ng.ID) From 38c4381dd1a3713a795e7b70f2045e886a70793d Mon Sep 17 00:00:00 2001 From: postables Date: Fri, 8 May 2020 14:52:59 -0700 Subject: [PATCH 58/66] labd/routers/experimentrouter: remove comment add log --- labd/routers/experimentrouter/router.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index 631459e6..a83b63d0 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -133,17 +133,11 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite } errg, ctx := errgroup.WithContext(ctx) for _, trial := range exp.Definition.TrialDefinition { - // TODO(bonedaddy): if we dont do this, then we run into some issues - // with port numbers being re-used. For example when we start the goroutines - // for each of the benchmarks they use the same ports for all peers - // and as such we run into port issues - //if i > 0 { - // break - //} trial := trial name := xid.New().String() errg.Go(func() error { info := zerolog.Ctx(ctx).Info() + info.Msg("creating cluster") cluster, err := s.rhelper.CreateCluster(ctx, trial.Cluster, name, w) if err != nil { return err From 3d489065ab0dd952d45a6881bc7c73145cbc3267 Mon Sep 17 00:00:00 2001 From: postables Date: Fri, 8 May 2020 15:33:54 -0700 Subject: [PATCH 59/66] providers/inmemory: temporarily force provider ot log to disk --- providers/inmemory/provider.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/providers/inmemory/provider.go b/providers/inmemory/provider.go index b1b8f7ee..bdd15fae 100644 --- a/providers/inmemory/provider.go +++ b/providers/inmemory/provider.go @@ -33,6 +33,7 @@ type provider struct { logger *zerolog.Logger agentOpts []labagent.LabagentOption portHelper *portHelper + logFile *os.File } func New(root string, db metadata.DB, logger *zerolog.Logger, agentOpts ...labagent.LabagentOption) (p2plab.NodeProvider, error) { @@ -40,13 +41,19 @@ func New(root string, db metadata.DB, logger *zerolog.Logger, agentOpts ...labag if err != nil { return nil, err } - + fh, err := os.Create(root + "/provider.log") + if err != nil { + return nil, err + } + lgr := logger.Output(fh).Level(zerolog.DebugLevel) + loggr := &lgr p := &provider{ root: root, nodes: make(map[string][]*node), - logger: logger, + logger: loggr, agentOpts: agentOpts, portHelper: &portHelper{inUse: make(map[int]bool)}, + logFile: fh, } ctx := context.Background() From daa1bd4e79b77e4c9c99d06af995935b9a5757cd Mon Sep 17 00:00:00 2001 From: postables Date: Fri, 8 May 2020 15:38:18 -0700 Subject: [PATCH 60/66] providers/inmemory: log if an error is encountered while closing ano de group --- providers/inmemory/provider.go | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/inmemory/provider.go b/providers/inmemory/provider.go index bdd15fae..f45711cd 100644 --- a/providers/inmemory/provider.go +++ b/providers/inmemory/provider.go @@ -120,6 +120,7 @@ func (p *provider) DestroyNodeGroup(ctx context.Context, ng *p2plab.NodeGroup) e for _, n := range p.nodes[ng.ID] { err := n.Close() if err != nil { + p.logger.Error().Err(err).Str("node.id", n.ID).Msg("error encountered while destroying node group") return err } p.portHelper.returnPorts(ctx, []int{n.AgentPort, n.AppPort}) From b93d6e6dea4e87b76416b51e9e9e3be120e9f2b6 Mon Sep 17 00:00:00 2001 From: postables Date: Fri, 8 May 2020 16:00:32 -0700 Subject: [PATCH 61/66] pkg/digestconv: prevent empty digest from causing a panic --- pkg/digestconv/digestconv.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/digestconv/digestconv.go b/pkg/digestconv/digestconv.go index aecfe75f..60835a28 100644 --- a/pkg/digestconv/digestconv.go +++ b/pkg/digestconv/digestconv.go @@ -24,6 +24,9 @@ import ( ) func DigestToCid(dgst digest.Digest) (cid.Cid, error) { + if dgst.String() == "" { + return cid.Cid{}, errors.New("bad digest") + } data, err := hex.DecodeString(dgst.Hex()) if err != nil { return cid.Cid{}, errors.Wrap(err, "failed to decode digest hex") From 506333f6fb25935a1b80c6c4bb661eb2b0dd3642 Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 11 May 2020 15:31:50 -0700 Subject: [PATCH 62/66] labagent: add latest changes --- labagent/labagent.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/labagent/labagent.go b/labagent/labagent.go index 0b5d528c..1f5800e9 100644 --- a/labagent/labagent.go +++ b/labagent/labagent.go @@ -43,12 +43,7 @@ func New(root, addr, appRoot, appAddr string, logger *zerolog.Logger, opts ...La } } os.MkdirAll(root, 0711) - fh, err := os.Create(root + "/labagent.log") - if err != nil { - return nil, err - } - loggr := logger.Output(fh).Level(zerolog.DebugLevel) - client, err := httputil.NewClient(httputil.NewHTTPClient(), httputil.WithLogger(&loggr)) + client, err := httputil.NewClient(httputil.NewHTTPClient(), httputil.WithLogger(logger)) if err != nil { return nil, err } @@ -62,8 +57,7 @@ func New(root, addr, appRoot, appAddr string, logger *zerolog.Logger, opts ...La } var closers []io.Closer - closers = append(closers, fh) - daemon, err := daemon.New("labagent", addr, &loggr, + daemon, err := daemon.New("labagent", addr, logger, healthcheckrouter.New(), agentrouter.New(appAddr, s), ) From 2d68c7867532757ff3a5dd8dac191f96352b2522 Mon Sep 17 00:00:00 2001 From: Edgar Lee Date: Wed, 13 May 2020 17:52:37 -0700 Subject: [PATCH 63/66] Fix experiment router race conditions and implementation problems --- cmd/labctl/command/debug.go | 54 +++++++ cmd/labctl/command/experiment.go | 21 ++- cue/cue.mod/p2plab_example1.cue | 4 +- cue/cue.mod/p2plab_example2.cue | 5 +- cue/parser/p2plab_instance.go | 2 +- cue/parser/parser.go | 9 ++ experiment.go | 2 +- labagent/labagent.go | 3 +- labagent/supervisor/supervisor.go | 33 ++-- labd/controlapi/experiment.go | 23 +-- labd/routers/clusterrouter/router.go | 45 +----- labd/routers/experimentrouter/router.go | 203 ++++++++++-------------- labd/routers/helpers/helpers.go | 52 +++++- metadata/experiment.go | 51 +++--- nodes/connect.go | 7 +- nodes/session.go | 43 ++--- nodes/update.go | 4 +- providers/inmemory/port_helper.go | 56 ------- providers/inmemory/provider.go | 63 +++++--- 19 files changed, 352 insertions(+), 328 deletions(-) delete mode 100644 providers/inmemory/port_helper.go diff --git a/cmd/labctl/command/debug.go b/cmd/labctl/command/debug.go index 445952e8..62076774 100644 --- a/cmd/labctl/command/debug.go +++ b/cmd/labctl/command/debug.go @@ -19,8 +19,11 @@ import ( "errors" "fmt" + "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/metadata" + "github.com/Netflix/p2plab/nodes" "github.com/Netflix/p2plab/pkg/cliutil" + "github.com/Netflix/p2plab/query" "github.com/urfave/cli" ) @@ -58,6 +61,19 @@ var debugCommand = cli.Command{ }, }, }, + { + Name: "connect", + Aliases: []string{"c"}, + Usage: "Connects a cluster together", + ArgsUsage: "", + Action: connectAction, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "query,q", + Usage: "Runs a query to filter the listed nodes.", + }, + }, + }, }, } @@ -103,3 +119,41 @@ func runTaskAction(c *cli.Context) error { return nil } + +func connectAction(c *cli.Context) error { + if c.NArg() != 1 { + return errors.New("cluster id must be provided") + } + + control, err := ResolveControl(c) + if err != nil { + return err + } + + var opts []p2plab.ListOption + ctx := cliutil.CommandContext(c) + if c.IsSet("query") { + q, err := query.Parse(ctx, c.String("query")) + if err != nil { + return err + } + + opts = append(opts, p2plab.WithQuery(q.String())) + } + + cluster := c.Args().First() + ns, err := control.Node().List(ctx, cluster) + if err != nil { + return err + } + if len(ns) == 0 { + return fmt.Errorf("No nodes found for %q", cluster) + } + + err = nodes.WaitHealthy(ctx, ns) + if err != nil { + return err + } + + return nodes.Connect(ctx, ns) +} diff --git a/cmd/labctl/command/experiment.go b/cmd/labctl/command/experiment.go index 9f9109fb..5ac08df2 100644 --- a/cmd/labctl/command/experiment.go +++ b/cmd/labctl/command/experiment.go @@ -34,7 +34,7 @@ var experimentCommand = cli.Command{ Subcommands: []cli.Command{ { Name: "create", - Aliases: []string{"s"}, + Aliases: []string{"c"}, Usage: "Creates an experiment from a definition file", ArgsUsage: "", Action: createExperimentAction, @@ -44,7 +44,7 @@ var experimentCommand = cli.Command{ Usage: "Name of the experiment, by default takes the name of the experiment definition.", }, &cli.BoolFlag{ - Name: "dry.run", + Name: "dry-run", Usage: "dry run the epxeriment creation, parsing the cue file and printing it to stdout", }, }, @@ -101,7 +101,7 @@ func createExperimentAction(c *cli.Context) error { return errors.New("experiment definition must be provided") } - p, err := CommandPrinter(c, printer.OutputID) + p, err := CommandPrinter(c, printer.OutputJSON) if err != nil { return err } @@ -116,26 +116,33 @@ func createExperimentAction(c *cli.Context) error { if err != nil { return err } - if c.Bool("dry.run") { - jedef, err := edef.ToJSONIndent() + + if c.Bool("dry-run") { + dt, err := edef.ToJSON() if err != nil { return err } - fmt.Printf("%+v\n", string(jedef)) + fmt.Println(string(dt)) return nil } + control, err := ResolveControl(c) if err != nil { return err } ctx := cliutil.CommandContext(c) - experiment, err := control.Experiment().Create(ctx, name, edef) + id, err := control.Experiment().Create(ctx, name, edef) if err != nil { return err } + experiment, err := control.Experiment().Get(ctx, id) + if err != nil { + return err + } zerolog.Ctx(ctx).Info().Msgf("Completed experiment %q", experiment.Metadata().ID) + return p.Print(experiment.Metadata()) } diff --git a/cue/cue.mod/p2plab_example1.cue b/cue/cue.mod/p2plab_example1.cue index 76bfee9f..361d8d0b 100644 --- a/cue/cue.mod/p2plab_example1.cue +++ b/cue/cue.mod/p2plab_example1.cue @@ -41,10 +41,9 @@ scen1:: Scenario & { scen2:: Scenario & { objects: [ items ] seed: { - "neighbors": "golang" + "neighbors": "mysql" } benchmark: { -// "(and 'neighbors')": "golang" "(not 'neighbors')": "mysql" } } @@ -60,5 +59,4 @@ experiment: Experiment & { scenario: scen2 } ] -// trials: [[clust1,scen1],[clust1,scen2]] } diff --git a/cue/cue.mod/p2plab_example2.cue b/cue/cue.mod/p2plab_example2.cue index 29d80a4a..3ad0171a 100644 --- a/cue/cue.mod/p2plab_example2.cue +++ b/cue/cue.mod/p2plab_example2.cue @@ -21,12 +21,13 @@ experiment: Experiment & { "neighbors": "image" } benchmark: { - "(not neighbors)": "image" + "(not 'neighbors')": "image" } } } for o in objects ] } + objects :: [ [{ image: { @@ -40,4 +41,4 @@ objects :: [ source: "docker.io/library/mysql:latest" } }], -] \ No newline at end of file +] diff --git a/cue/parser/p2plab_instance.go b/cue/parser/p2plab_instance.go index 5d91a1a3..8189fd21 100644 --- a/cue/parser/p2plab_instance.go +++ b/cue/parser/p2plab_instance.go @@ -20,7 +20,7 @@ func (p *P2PLabInstance) ToExperimentDefinition() (metadata.ExperimentDefinition return metadata.ExperimentDefinition{}, err } return metadata.ExperimentDefinition{ - TrialDefinition: trials, + Trials: trials, }, nil } diff --git a/cue/parser/parser.go b/cue/parser/parser.go index aa250be3..8f9b4456 100644 --- a/cue/parser/parser.go +++ b/cue/parser/parser.go @@ -19,6 +19,15 @@ var ( region: string // labels is an optional field labels?: [...string] + peer?: Peer + } + + Peer :: { + gitReference: string | *"HEAD" + transports: [...string] | *["tcp"] + muxers: [...string] | *["mplex"] + securityTransports: [...string] | *["secio"] + routing: string | *"nil" } // a cluster is a collection of 1 or more groups of nodes diff --git a/experiment.go b/experiment.go index 25a8f70b..dd9a42ec 100644 --- a/experiment.go +++ b/experiment.go @@ -23,7 +23,7 @@ import ( // ExperimentAPI is an unimplemented layer to run experiments, a collection // of benchmarks while varying some aspect. type ExperimentAPI interface { - Create(ctx context.Context, id string, edef metadata.ExperimentDefinition) (Experiment, error) + Create(ctx context.Context, name string, edef metadata.ExperimentDefinition) (id string, err error) Get(ctx context.Context, id string) (Experiment, error) diff --git a/labagent/labagent.go b/labagent/labagent.go index 1f5800e9..b6496661 100644 --- a/labagent/labagent.go +++ b/labagent/labagent.go @@ -17,7 +17,6 @@ package labagent import ( "context" "io" - "os" "path/filepath" "github.com/Netflix/p2plab/daemon" @@ -42,7 +41,7 @@ func New(root, addr, appRoot, appAddr string, logger *zerolog.Logger, opts ...La return nil, err } } - os.MkdirAll(root, 0711) + client, err := httputil.NewClient(httputil.NewHTTPClient(), httputil.WithLogger(logger)) if err != nil { return nil, err diff --git a/labagent/supervisor/supervisor.go b/labagent/supervisor/supervisor.go index 1a78d270..914f2d16 100644 --- a/labagent/supervisor/supervisor.go +++ b/labagent/supervisor/supervisor.go @@ -26,6 +26,7 @@ import ( "os" "os/exec" "path/filepath" + "sync" "syscall" "github.com/Netflix/p2plab/downloaders" @@ -42,13 +43,14 @@ type Supervisor interface { } type supervisor struct { - root string - appRoot string - appPort string - client *httputil.Client - fs *downloaders.Downloaders - app *exec.Cmd - cancel func() + root string + appRoot string + appPort string + app *exec.Cmd + client *httputil.Client + fs *downloaders.Downloaders + mu sync.Mutex + cancel func() } func New(root, appRoot, appAddr string, client *httputil.Client, fs *downloaders.Downloaders) (Supervisor, error) { @@ -68,19 +70,23 @@ func New(root, appRoot, appAddr string, client *httputil.Client, fs *downloaders } return &supervisor{ - root: root, - appRoot: appRoot, - appPort: appPort, - client: client, - fs: fs, + root: root, + appRoot: appRoot, + appPort: appPort, + client: client, + fs: fs, }, nil } func (s *supervisor) Supervise(ctx context.Context, id, link string, pdef metadata.PeerDefinition) error { + s.mu.Lock() + defer s.mu.Unlock() + err := s.kill(ctx) if err != nil { return err } + flags := s.peerDefinitionToFlags(id, pdef) if link != "" { err = s.atomicReplaceBinary(ctx, link) @@ -168,6 +174,7 @@ func (s *supervisor) wait(ctx context.Context, flags []string) error { zerolog.Ctx(ctx).Error().Err(ctx.Err()).Msg("context error is not nil") } if s.app.Process != nil { + zerolog.Ctx(ctx).Info().Msg("Forwarding kill signal to labapp") err := s.app.Process.Signal(syscall.SIGTERM) if err != nil { zerolog.Ctx(ctx).Error().Err(err).Msg("failed to SIGTERM labapp") @@ -224,7 +231,7 @@ func (s *supervisor) atomicReplaceBinary(ctx context.Context, link string) error defer span.Finish() span.SetTag("link", link) - zerolog.Ctx(ctx).Debug().Msg("Atomically replacing binary") + zerolog.Ctx(ctx).Debug().Str("root", s.root).Msg("Atomically replacing binary") u, err := url.Parse(link) if err != nil { return err diff --git a/labd/controlapi/experiment.go b/labd/controlapi/experiment.go index e2072924..17531a45 100644 --- a/labd/controlapi/experiment.go +++ b/labd/controlapi/experiment.go @@ -23,6 +23,7 @@ import ( "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/metadata" "github.com/Netflix/p2plab/pkg/httputil" + "github.com/Netflix/p2plab/pkg/logutil" "github.com/pkg/errors" ) @@ -31,29 +32,31 @@ type experimentAPI struct { url urlFunc } -func (a *experimentAPI) Create(ctx context.Context, id string, edef metadata.ExperimentDefinition) (p2plab.Experiment, error) { - content, err := edef.ToJSONIndent() +func (a *experimentAPI) Create(ctx context.Context, name string, edef metadata.ExperimentDefinition) (id string, err error) { + content, err := edef.ToJSON() if err != nil { - return nil, err + return id, err } req := a.client.NewRequest("POST", a.url("/experiments/create"), httputil.WithRetryMax(0)). - Option("id", id). + Option("id", name). Body(bytes.NewReader(content)) resp, err := req.Send(ctx) if err != nil { - return nil, err + return id, err } defer resp.Body.Close() - e := experiment{client: a.client} - err = json.NewDecoder(resp.Body).Decode(&e.metadata) - if err != nil { - return nil, err + logWriter := logutil.LogWriter(ctx) + if logWriter != nil { + err = logutil.WriteRemoteLogs(ctx, resp.Body, logWriter) + if err != nil { + return id, err + } } - return &e, nil + return resp.Header.Get(ResourceID), nil } func (a *experimentAPI) Get(ctx context.Context, id string) (p2plab.Experiment, error) { diff --git a/labd/routers/clusterrouter/router.go b/labd/routers/clusterrouter/router.go index f3bb3e24..e4dc413e 100644 --- a/labd/routers/clusterrouter/router.go +++ b/labd/routers/clusterrouter/router.go @@ -28,7 +28,6 @@ import ( "github.com/Netflix/p2plab/pkg/logutil" "github.com/Netflix/p2plab/pkg/stringutil" "github.com/Netflix/p2plab/query" - "github.com/pkg/errors" ) type router struct { @@ -87,7 +86,8 @@ func (s *router) postClustersCreate(ctx context.Context, w http.ResponseWriter, if err != nil { return err } - _, err = s.rhelper.CreateCluster(ctx, cdef, r.FormValue("name"), w) + + _, err = s.rhelper.CreateCluster(ctx, cdef, r.FormValue("name")) return err } @@ -111,49 +111,14 @@ func (s *router) putClustersLabel(ctx context.Context, w http.ResponseWriter, r func (s *router) deleteClusters(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { names := strings.Split(r.FormValue("names"), ",") - ctx, logger := logutil.WithResponseLogger(ctx, w) + ctx, _ = logutil.WithResponseLogger(ctx, w) // TODO: parallelize with different color loggers? for _, name := range names { - logger := logger.With().Str("name", name).Logger() - ctx = logger.WithContext(ctx) - - cluster, err := s.db.GetCluster(ctx, name) + err := s.rhelper.DeleteCluster(ctx, name) if err != nil { - return errors.Wrapf(err, "failed to get cluster %q", name) - } - - if cluster.Status != metadata.ClusterDestroying { - cluster.Status = metadata.ClusterDestroying - cluster, err = s.db.UpdateCluster(ctx, cluster) - if err != nil { - return errors.Wrap(err, "failed to update cluster status to destroying") - } - } - - ns, err := s.db.ListNodes(ctx, cluster.ID) - if err != nil { - return errors.Wrap(err, "failed to list nodes") - } - - ng := &p2plab.NodeGroup{ - ID: cluster.ID, - Nodes: ns, - } - - logger.Info().Msg("Destroying node group") - err = s.provider.DestroyNodeGroup(ctx, ng) - if err != nil { - return errors.Wrap(err, "failed to destroy node group") - } - - logger.Info().Msg("Deleting cluster metadata") - err = s.db.DeleteCluster(ctx, cluster.ID) - if err != nil { - return errors.Wrap(err, "failed to delete cluster metadata") + return err } - - logger.Info().Msg("Destroyed cluster") } return nil diff --git a/labd/routers/experimentrouter/router.go b/labd/routers/experimentrouter/router.go index a83b63d0..e1b7742a 100644 --- a/labd/routers/experimentrouter/router.go +++ b/labd/routers/experimentrouter/router.go @@ -16,33 +16,33 @@ package experimentrouter import ( "context" + "encoding/json" "fmt" - "io/ioutil" "net/http" "os" - "strconv" "strings" - - "github.com/Netflix/p2plab/nodes" - "github.com/Netflix/p2plab/reports" - "github.com/uber/jaeger-client-go" - bolt "go.etcd.io/bbolt" + "sync" "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/daemon" "github.com/Netflix/p2plab/labd/controlapi" "github.com/Netflix/p2plab/labd/routers/helpers" "github.com/Netflix/p2plab/metadata" + "github.com/Netflix/p2plab/nodes" "github.com/Netflix/p2plab/peer" "github.com/Netflix/p2plab/pkg/httputil" + "github.com/Netflix/p2plab/pkg/logutil" "github.com/Netflix/p2plab/pkg/stringutil" "github.com/Netflix/p2plab/query" + "github.com/Netflix/p2plab/reports" "github.com/Netflix/p2plab/scenarios" "github.com/Netflix/p2plab/transformers" "github.com/containerd/containerd/errdefs" "github.com/pkg/errors" "github.com/rs/xid" "github.com/rs/zerolog" + jaeger "github.com/uber/jaeger-client-go" + bolt "go.etcd.io/bbolt" "golang.org/x/sync/errgroup" ) @@ -73,7 +73,7 @@ func (s *router) Routes() []daemon.Route { return []daemon.Route{ // GET daemon.NewGetRoute("/experiments/json", s.getExperiments), - daemon.NewGetRoute("/experiments/{id}/json", s.getExperimentByName), + daemon.NewGetRoute("/experiments/{id}/json", s.getExperimentByID), // POST daemon.NewPostRoute("/experiments/create", s.postExperimentsCreate), // PUT @@ -92,8 +92,8 @@ func (s *router) getExperiments(ctx context.Context, w http.ResponseWriter, r *h return daemon.WriteJSON(w, &experiments) } -func (s *router) getExperimentByName(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { - id := vars["name"] +func (s *router) getExperimentByID(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + id := vars["id"] experiment, err := s.db.GetExperiment(ctx, id) if err != nil { return err @@ -103,83 +103,57 @@ func (s *router) getExperimentByName(ctx context.Context, w http.ResponseWriter, } func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - noReset := false - if r.FormValue("no-reset") != "" { - var err error - noReset, err = strconv.ParseBool(r.FormValue("no-reset")) - if err != nil { - return err - } - } - - defer r.Body.Close() - data, err := ioutil.ReadAll(r.Body) - if err != nil { - return err - } var edef metadata.ExperimentDefinition - if err := edef.FromJSON(data); err != nil { + err := json.NewDecoder(r.Body).Decode(&edef) + if err != nil { return err } - exp, err := s.db.CreateExperiment(ctx, metadata.Experiment{ - ID: xid.New().String(), + + eid := xid.New().String() + w.Header().Add(controlapi.ResourceID, eid) + + ctx, logger := logutil.WithResponseLogger(ctx, w) + logger.UpdateContext(func(c zerolog.Context) zerolog.Context { + return c.Str("eid", eid) + }) + + experiment, err := s.db.CreateExperiment(ctx, metadata.Experiment{ + ID: eid, Definition: edef, Status: metadata.ExperimentRunning, }) if err != nil { return err } - errg, ctx := errgroup.WithContext(ctx) - for _, trial := range exp.Definition.TrialDefinition { - trial := trial - name := xid.New().String() - errg.Go(func() error { - info := zerolog.Ctx(ctx).Info() - info.Msg("creating cluster") - cluster, err := s.rhelper.CreateCluster(ctx, trial.Cluster, name, w) - if err != nil { - return err - } - info.Msg("creating scenario") - scenID := xid.New().String() - scenario := metadata.Scenario{ - ID: scenID, - Definition: trial.Scenario, - Labels: []string{ - name, - }, - } - scenario, err = s.db.CreateScenario(ctx, scenario) + + var seederAddrs []string + for _, addr := range s.seeder.Host().Addrs() { + seederAddrs = append(seederAddrs, fmt.Sprintf("%s/p2p/%s", addr, s.seeder.Host().ID())) + } + + var mu sync.Mutex + experiment.Reports = make([]metadata.Report, len(experiment.Definition.Trials)) + + eg, ctx := errgroup.WithContext(ctx) + for i, trial := range experiment.Definition.Trials { + i, trial := i, trial + name := fmt.Sprintf("experiment_%s_trial_%d", eid, i) + + eg.Go(func() error { + cluster, err := s.rhelper.CreateCluster(ctx, trial.Cluster, name) if err != nil { return err } + defer func() error { - info.Msg("tearing down cluster") - cluster, err := s.db.GetCluster(ctx, name) - if err != nil { - return errors.Wrap(err, "failed to get cluster'") - } - ns, err := s.db.ListNodes(ctx, cluster.ID) - if err != nil { - return errors.Wrap(err, "failed to list nodes") - } - ng := &p2plab.NodeGroup{ - ID: cluster.ID, - Nodes: ns, - } - if err := s.provider.DestroyNodeGroup(ctx, ng); err != nil { - return errors.Wrap(err, "failed to destroy node group") - } - info.Msg("tore down cluster") - return nil + return s.rhelper.DeleteCluster(ctx, name) }() - info.Msg("creating nodes") + mns, err := s.db.ListNodes(ctx, cluster.ID) if err != nil { return err } + var ( ns []p2plab.Node lset = query.NewLabeledSet() @@ -189,47 +163,33 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite lset.Add(node) ns = append(ns, node) } - if !noReset { - info.Msg("updating nodes") - if err := nodes.Update(ctx, s.builder, ns); err != nil { - return errors.Wrap(err, "failed to update cluster") - } - info.Msg("connecting nodes") - if err := nodes.Connect(ctx, ns); err != nil { - return errors.Wrap(err, "failed to connect cluster") - } + + var ids []string + for _, labeled := range lset.Slice() { + ids = append(ids, labeled.ID()) } - info.Msg("generating scenario plan") - plan, queries, err := scenarios.Plan(ctx, trial.Scenario, s.ts, s.seeder, lset) + zerolog.Ctx(ctx).Info().Int("trial", i).Strs("ids", ids).Msg("Created cluster for experiment") + + err = nodes.Update(ctx, s.builder, ns) if err != nil { - return err + return errors.Wrap(err, "failed to update cluster") } - bid := xid.New().String() - benchmark := metadata.Benchmark{ - ID: bid, - Status: metadata.BenchmarkRunning, - Cluster: cluster, - Scenario: scenario, - Plan: plan, - Labels: []string{ - cluster.ID, - scenario.ID, - bid, - }, + + err = nodes.Connect(ctx, ns) + if err != nil { + return errors.Wrap(err, "failed to connect cluster") } - info.Msg("creating benchmark") - if benchmark, err = s.db.CreateBenchmark(ctx, benchmark); err != nil { + + plan, queries, err := scenarios.Plan(ctx, trial.Scenario, s.ts, s.seeder, lset) + if err != nil { return err } - var seederAddrs []string - for _, addr := range s.seeder.Host().Addrs() { - seederAddrs = append(seederAddrs, fmt.Sprintf("%s/p2p/%s", addr, s.seeder.Host().ID())) - } - info.Msg("running scenario") + execution, err := scenarios.Run(ctx, lset, plan, seederAddrs) if err != nil { - return errors.Wrap(err, "failed to run scenario plan") + return errors.Wrapf(err, "failed to run scenario plan for %q", cluster.ID) } + report := metadata.Report{ Summary: metadata.ReportSummary{ TotalTime: execution.End.Sub(execution.Start), @@ -237,7 +197,7 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite Nodes: execution.Report, Queries: queries, } - info.Msg("aggregating results") + report.Aggregates = reports.ComputeAggregates(report.Nodes) jaegerUI := os.Getenv("JAEGER_UI") if jaegerUI != "" { @@ -246,31 +206,28 @@ func (s *router) postExperimentsCreate(ctx context.Context, w http.ResponseWrite report.Summary.Trace = fmt.Sprintf("%s/trace/%s", jaegerUI, sc.TraceID()) } } - info.Msg("Updating benchmark metadata") - err = s.db.Update(ctx, func(tx *bolt.Tx) error { - tctx := metadata.WithTransactionContext(ctx, tx) - - err := s.db.CreateReport(tctx, benchmark.ID, report) - if err != nil { - return errors.Wrap(err, "failed to create report") - } - - benchmark.Status = metadata.BenchmarkDone - _, err = s.db.UpdateBenchmark(tctx, benchmark) - if err != nil { - return errors.Wrap(err, "failed to update benchmark") - } - return nil - }) - info.Msg("updating reports") - exp.Reports = append(exp.Reports, report) + mu.Lock() + experiment.Reports[i] = report + mu.Unlock() return err }) } - err = errg.Wait() - daemon.WriteJSON(w, &exp) - return err + + err = eg.Wait() + if err != nil { + return err + } + + experiment.Status = metadata.ExperimentDone + return s.db.Update(ctx, func(tx *bolt.Tx) error { + tctx := metadata.WithTransactionContext(ctx, tx) + _, err := s.db.UpdateExperiment(tctx, experiment) + if err != nil { + return err + } + return nil + }) } func (s *router) putExperimentsLabel(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { diff --git a/labd/routers/helpers/helpers.go b/labd/routers/helpers/helpers.go index 2413ebe4..6fd9d1e0 100644 --- a/labd/routers/helpers/helpers.go +++ b/labd/routers/helpers/helpers.go @@ -2,13 +2,13 @@ package helpers import ( "context" - "net/http" "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/labd/controlapi" "github.com/Netflix/p2plab/metadata" "github.com/Netflix/p2plab/nodes" "github.com/Netflix/p2plab/pkg/httputil" + "github.com/pkg/errors" "github.com/rs/zerolog" bolt "go.etcd.io/bbolt" ) @@ -28,7 +28,7 @@ func New(db metadata.DB, provider p2plab.NodeProvider, client *httputil.Client) } // CreateCluster enables creating the nodes in a cluster, waiting for them to be healthy before returning -func (h *Helper) CreateCluster(ctx context.Context, cdef metadata.ClusterDefinition, name string, w http.ResponseWriter) (metadata.Cluster, error) { +func (h *Helper) CreateCluster(ctx context.Context, cdef metadata.ClusterDefinition, name string) (metadata.Cluster, error) { var ( cluster = metadata.Cluster{ ID: name, @@ -52,13 +52,13 @@ func (h *Helper) CreateCluster(ctx context.Context, cdef metadata.ClusterDefinit if err != nil { return cluster, err } - zerolog.Ctx(ctx).Info().Msg("creating node group") + + zerolog.Ctx(ctx).Info().Str("cid", name).Msg("Creating node group") ng, err := h.provider.CreateNodeGroup(ctx, name, cdef) if err != nil { return cluster, err } - zerolog.Ctx(ctx).Info().Msg("updating metadata with new nodes") var mns []metadata.Node cluster.Status = metadata.ClusterConnecting if err := h.db.Update(ctx, func(tx *bolt.Tx) error { @@ -88,7 +88,49 @@ func (h *Helper) CreateCluster(ctx context.Context, cdef metadata.ClusterDefinit return cluster, err } - zerolog.Ctx(ctx).Info().Msg("updating cluster metadata") cluster.Status = metadata.ClusterCreated return h.db.UpdateCluster(ctx, cluster) } + +func (h *Helper) DeleteCluster(ctx context.Context, name string) error { + logger := zerolog.Ctx(ctx).With().Str("name", name).Logger() + ctx = logger.WithContext(ctx) + + cluster, err := h.db.GetCluster(ctx, name) + if err != nil { + return errors.Wrapf(err, "failed to get cluster %q", name) + } + + if cluster.Status != metadata.ClusterDestroying { + cluster.Status = metadata.ClusterDestroying + cluster, err = h.db.UpdateCluster(ctx, cluster) + if err != nil { + return errors.Wrap(err, "failed to update cluster status to destroying") + } + } + + ns, err := h.db.ListNodes(ctx, cluster.ID) + if err != nil { + return errors.Wrap(err, "failed to list nodes") + } + + ng := &p2plab.NodeGroup{ + ID: cluster.ID, + Nodes: ns, + } + + logger.Info().Msg("Destroying node group") + err = h.provider.DestroyNodeGroup(ctx, ng) + if err != nil { + return errors.Wrap(err, "failed to destroy node group") + } + + logger.Info().Msg("Deleting cluster metadata") + err = h.db.DeleteCluster(ctx, cluster.ID) + if err != nil { + return errors.Wrap(err, "failed to delete cluster metadata") + } + + logger.Info().Msg("Destroyed cluster") + return nil +} diff --git a/metadata/experiment.go b/metadata/experiment.go index 0b1345d7..5cbb5af7 100644 --- a/metadata/experiment.go +++ b/metadata/experiment.go @@ -17,6 +17,7 @@ package metadata import ( "context" "encoding/json" + "strconv" "time" "github.com/Netflix/p2plab/errdefs" @@ -31,22 +32,17 @@ type Experiment struct { Definition ExperimentDefinition + Reports []Report + Labels []string CreatedAt, UpdatedAt time.Time - - Reports []Report -} - -// ToJSONIndent is like to ToJSON but formats the json data -func (e *Experiment) ToJSONIndent() ([]byte, error) { - return json.MarshalIndent(e, "", " ") } // ToJSON is a helper function to convert an Experiment // into it's JSON representation func (e *Experiment) ToJSON() ([]byte, error) { - return json.Marshal(e) + return json.MarshalIndent(e, "", " ") } // FromJSON loads the experiment definition with the values from data @@ -64,21 +60,13 @@ var ( // ExperimentDefinition defines an experiment. type ExperimentDefinition struct { - IndependentVariable []IndependentVariable - TrialDefinition []TrialDefinition -} - -type IndependentVariable map[string]interface{} - -// ToJSONIndent is like to ToJSON but formats the json data -func (ed *ExperimentDefinition) ToJSONIndent() ([]byte, error) { - return json.MarshalIndent(ed, "", " ") + Trials []TrialDefinition } // ToJSON is a helper function to convert an ExperimentDefinition // into it's JSON representation func (ed *ExperimentDefinition) ToJSON() ([]byte, error) { - return json.Marshal(ed) + return json.MarshalIndent(ed, "", " ") } // FromJSON loads the experiment definition with the values from data @@ -273,6 +261,21 @@ func readExperiment(bkt *bolt.Bucket, experiment *Experiment) error { return err } + i := 0 + rbkt := bkt.Bucket([]byte(strconv.Itoa(i))) + for rbkt != nil { + var report Report + err = readReport(rbkt, &report) + if err != nil { + return err + } + + experiment.Reports = append(experiment.Reports, report) + + i++ + rbkt = bkt.Bucket([]byte(strconv.Itoa(i))) + } + return bkt.ForEach(func(k, v []byte) error { if v == nil { return nil @@ -318,6 +321,18 @@ func writeExperiment(bkt *bolt.Bucket, experiment *Experiment) error { return err } + for i, report := range experiment.Reports { + rbkt, err := bkt.CreateBucket([]byte(strconv.Itoa(i))) + if err != nil { + return err + } + + err = writeReport(rbkt, report) + if err != nil { + return err + } + } + for _, f := range []field{ {bucketKeyID, []byte(experiment.ID)}, {bucketKeyStatus, []byte(experiment.Status)}, diff --git a/nodes/connect.go b/nodes/connect.go index 389f6619..5f64fd80 100644 --- a/nodes/connect.go +++ b/nodes/connect.go @@ -132,5 +132,10 @@ func Connect(ctx context.Context, ns []p2plab.Node) error { } } - return connectPeers.Wait() + err = connectPeers.Wait() + if err != nil { + return err + } + + return nil } diff --git a/nodes/session.go b/nodes/session.go index 8a998736..89ffecc5 100644 --- a/nodes/session.go +++ b/nodes/session.go @@ -19,11 +19,9 @@ import ( "time" "github.com/Netflix/p2plab" - "github.com/Netflix/p2plab/errdefs" "github.com/Netflix/p2plab/pkg/logutil" "github.com/Netflix/p2plab/pkg/traceutil" opentracing "github.com/opentracing/opentracing-go" - "github.com/pkg/errors" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" ) @@ -33,27 +31,32 @@ func Session(ctx context.Context, ns []p2plab.Node, fn func(context.Context) err defer span.Finish() sctx := opentracing.ContextWithSpan(ctx, span) + var ids []string + for _, n := range ns { + ids = append(ids, n.ID()) + } + eg, gctx := errgroup.WithContext(sctx) zerolog.Ctx(ctx).Info().Msg("Starting a session for benchmarking") go logutil.Elapsed(gctx, 20*time.Second, "Starting a session for benchmarking") - cancels := make([]context.CancelFunc, len(ns)) - for i, n := range ns { - i, n := i, n - eg.Go(func() error { - lctx, cancel := context.WithCancel(gctx) - cancels[i] = cancel + // cancels := make([]context.CancelFunc, len(ns)) + // for i, n := range ns { + // i, n := i, n + // eg.Go(func() error { + // lctx, cancel := context.WithCancel(context.Background()) + // cancels[i] = cancel - pdef := n.Metadata().Peer - err := n.Update(lctx, n.ID(), "", pdef) - if err != nil && !errdefs.IsCancelled(err) { - return errors.Wrapf(err, "failed to update node %q", n.ID()) - } + // pdef := n.Metadata().Peer + // err := n.Update(lctx, n.ID(), "", pdef) + // if err != nil && !errdefs.IsCancelled(err) { + // return errors.Wrapf(err, "failed to update node %q", n.ID()) + // } - return nil - }) - } + // return nil + // }) + // } err := WaitHealthy(ctx, ns) if err != nil { @@ -65,10 +68,10 @@ func Session(ctx context.Context, ns []p2plab.Node, fn func(context.Context) err return nil, err } - zerolog.Ctx(ctx).Info().Msg("Ending the session") - for _, cancel := range cancels { - cancel() - } + // zerolog.Ctx(ctx).Info().Strs("nodes", ids).Msg("Ending the session") + // for _, cancel := range cancels { + // cancel() + // } err = eg.Wait() if err != nil { diff --git a/nodes/update.go b/nodes/update.go index 8f88024d..573ac7c3 100644 --- a/nodes/update.go +++ b/nodes/update.go @@ -60,9 +60,9 @@ func Update(ctx context.Context, builder p2plab.Builder, ns []p2plab.Node) error for _, n := range ns { n := n + pdef := n.Metadata().Peer + link := linkByCommit[commitByRef[pdef.GitReference]] updatePeers.Go(func() error { - pdef := n.Metadata().Peer - link := linkByCommit[commitByRef[pdef.GitReference]] return n.Update(gctx, n.ID(), link, pdef) }) } diff --git a/providers/inmemory/port_helper.go b/providers/inmemory/port_helper.go deleted file mode 100644 index d1de9873..00000000 --- a/providers/inmemory/port_helper.go +++ /dev/null @@ -1,56 +0,0 @@ -package inmemory - -import ( - "context" - "sync" - - "github.com/phayes/freeport" - "github.com/rs/zerolog" -) - -// portHelper enables synchronizing access to free ports -// across multiple different trials and benchmarks, and prevents -// accidental port re-use. -type portHelper struct { - inUse map[int]bool - mux sync.Mutex -} - -func (ph *portHelper) getPorts(ctx context.Context, num int) (int, int, error) { - ph.mux.Lock() - defer ph.mux.Unlock() - for { - zerolog.Ctx(ctx).Info().Msg("getting free ports") - freePorts, err := freeport.GetFreePorts(2) - if err != nil { - return 0, 0, err - } - if ph.inUse[freePorts[0]] || ph.inUse[freePorts[1]] { - zerolog.Ctx(ctx).Warn().Msgf("got in use ports, %v and %v, trying again", freePorts[0], freePorts[1]) - continue - } - zerolog.Ctx(ctx).Info().Msg("found available ports") - ph.inUse[freePorts[0]] = true - ph.inUse[freePorts[1]] = true - return freePorts[0], freePorts[1], nil - } -} - -func (ph *portHelper) returnPorts(ctx context.Context, ports []int) { - ph.mux.Lock() - defer ph.mux.Unlock() - returned := 0 - for _, port := range ports { - if !ph.inUse[port] { - zerolog.Ctx(ctx).Warn().Msg("trying to return unused port") - continue - } - ph.inUse[port] = false - returned++ - } - if returned == 0 { - zerolog.Ctx(ctx).Warn().Msg("no ports retruend") - } else { - zerolog.Ctx(ctx).Info().Msgf("successfully returned %v ports", returned) - } -} diff --git a/providers/inmemory/provider.go b/providers/inmemory/provider.go index f45711cd..3dd36b7d 100644 --- a/providers/inmemory/provider.go +++ b/providers/inmemory/provider.go @@ -19,21 +19,22 @@ import ( "fmt" "os" "path/filepath" + "sync" "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/labagent" "github.com/Netflix/p2plab/metadata" + "github.com/phayes/freeport" "github.com/rs/xid" "github.com/rs/zerolog" ) type provider struct { - root string - nodes map[string][]*node - logger *zerolog.Logger - agentOpts []labagent.LabagentOption - portHelper *portHelper - logFile *os.File + root string + nodes map[string][]*node + logger *zerolog.Logger + agentOpts []labagent.LabagentOption + mu sync.Mutex } func New(root string, db metadata.DB, logger *zerolog.Logger, agentOpts ...labagent.LabagentOption) (p2plab.NodeProvider, error) { @@ -41,19 +42,11 @@ func New(root string, db metadata.DB, logger *zerolog.Logger, agentOpts ...labag if err != nil { return nil, err } - fh, err := os.Create(root + "/provider.log") - if err != nil { - return nil, err - } - lgr := logger.Output(fh).Level(zerolog.DebugLevel) - loggr := &lgr p := &provider{ - root: root, - nodes: make(map[string][]*node), - logger: loggr, - agentOpts: agentOpts, - portHelper: &portHelper{inUse: make(map[int]bool)}, - logFile: fh, + root: root, + nodes: make(map[string][]*node), + logger: logger, + agentOpts: agentOpts, } ctx := context.Background() @@ -81,13 +74,28 @@ func New(root string, db metadata.DB, logger *zerolog.Logger, agentOpts ...labag } func (p *provider) CreateNodeGroup(ctx context.Context, id string, cdef metadata.ClusterDefinition) (*p2plab.NodeGroup, error) { - var ns []metadata.Node + p.mu.Lock() + defer p.mu.Unlock() + + numPorts := 0 + for _, group := range cdef.Groups { + numPorts += group.Size + } + + freePorts, err := freeport.GetFreePorts(numPorts * 2) + if err != nil { + return nil, err + } + + var ( + ns []metadata.Node + portIndex = 0 + ) for _, group := range cdef.Groups { for i := 0; i < group.Size; i++ { - agentPort, appPort, err := p.portHelper.getPorts(ctx, 2) - if err != nil { - return nil, err - } + agentPort, appPort := freePorts[portIndex], freePorts[portIndex+1] + portIndex += 2 + id := xid.New().String() n, err := p.newNode(id, agentPort, appPort) if err != nil { @@ -123,7 +131,6 @@ func (p *provider) DestroyNodeGroup(ctx context.Context, ng *p2plab.NodeGroup) e p.logger.Error().Err(err).Str("node.id", n.ID).Msg("error encountered while destroying node group") return err } - p.portHelper.returnPorts(ctx, []int{n.AgentPort, n.AppPort}) } delete(p.nodes, ng.ID) @@ -141,9 +148,17 @@ type node struct { func (p *provider) newNode(id string, agentPort, appPort int) (*node, error) { agentRoot := filepath.Join(p.root, id, "labagent") agentAddr := fmt.Sprintf(":%d", agentPort) + err := os.MkdirAll(agentRoot, 0711) + if err != nil { + return nil, err + } appRoot := filepath.Join(p.root, id, "labapp") appAddr := fmt.Sprintf("http://localhost:%d", appPort) + err = os.MkdirAll(appRoot, 0711) + if err != nil { + return nil, err + } la, err := labagent.New(agentRoot, agentAddr, appRoot, appAddr, p.logger, p.agentOpts...) if err != nil { From 63e3fc797507c91ec5757bdfae9e53ccb13a9784 Mon Sep 17 00:00:00 2001 From: postables Date: Fri, 15 May 2020 16:04:51 -0700 Subject: [PATCH 64/66] cue: remove unused comments, change peer from optional to specify.... --- cue/cue.mod/Makefile | 2 +- cue/cue.mod/p2plab.cue | 11 +++++++++++ cue/parser/parser.go | 5 +++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/cue/cue.mod/Makefile b/cue/cue.mod/Makefile index 2419c931..043ec4f4 100644 --- a/cue/cue.mod/Makefile +++ b/cue/cue.mod/Makefile @@ -1,3 +1,3 @@ .PHONY: build build: - cue export p2plab_example.cue p2plab.cue + cue export p2plab_example1.cue p2plab.cue diff --git a/cue/cue.mod/p2plab.cue b/cue/cue.mod/p2plab.cue index cffc3516..ee84b627 100644 --- a/cue/cue.mod/p2plab.cue +++ b/cue/cue.mod/p2plab.cue @@ -9,6 +9,17 @@ Group :: { region: string // labels is an optional field labels?: [...string] + // although not optional if left unspecified + // then we use the default values of Peer + peer: Peer | *Peer +} + +Peer :: { + gitReference: string | *"HEAD" + transports: [...string] | *["tcp"] + muxers: [...string] | *["mplex"] + securityTransports: [...string] | *["secio"] + routing: string | *"nil" } // a cluster is a collection of 1 or more groups of nodes diff --git a/cue/parser/parser.go b/cue/parser/parser.go index 8f9b4456..cd21dd11 100644 --- a/cue/parser/parser.go +++ b/cue/parser/parser.go @@ -19,7 +19,9 @@ var ( region: string // labels is an optional field labels?: [...string] - peer?: Peer + // although not optional if left unspecified + // then we use the default values of Peer + peer: Peer | *Peer } Peer :: { @@ -57,7 +59,6 @@ var ( Experiment :: { trials: [...Trial] - // trials: [ ...[Cluster,Scenario]] } ` ) From e9a907820d95f5d076a3352e82060b6d48e61fed Mon Sep 17 00:00:00 2001 From: postables Date: Fri, 15 May 2020 16:05:06 -0700 Subject: [PATCH 65/66] labd/routers/helpers: comment out handling which is now done by cue --- labd/routers/helpers/helpers.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/labd/routers/helpers/helpers.go b/labd/routers/helpers/helpers.go index 6fd9d1e0..c3625b9c 100644 --- a/labd/routers/helpers/helpers.go +++ b/labd/routers/helpers/helpers.go @@ -40,14 +40,16 @@ func (h *Helper) CreateCluster(ctx context.Context, cdef metadata.ClusterDefinit } err error ) + // temporarily remove until hearing back from hinshun about this change + // if change is undesirable simply uncomment this // TODO(bonedaddy): need a better way to set the peer definition // - with cue we might want to include this as a configurable param - for i, def := range cluster.Definition.Groups { + /*for i, def := range cluster.Definition.Groups { if def.Peer == nil { def.Peer = &metadata.DefaultPeerDefinition cluster.Definition.Groups[i] = def } - } + }*/ cluster, err = h.db.CreateCluster(ctx, cluster) if err != nil { return cluster, err From 1515eaf264c5791668034f7dec22870fc20c08b7 Mon Sep 17 00:00:00 2001 From: postables Date: Thu, 21 May 2020 15:12:30 -0700 Subject: [PATCH 66/66] nodes: I fixed the session bug??? --- nodes/session.go | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/nodes/session.go b/nodes/session.go index 89ffecc5..c4b5822c 100644 --- a/nodes/session.go +++ b/nodes/session.go @@ -19,9 +19,11 @@ import ( "time" "github.com/Netflix/p2plab" + "github.com/Netflix/p2plab/errdefs" "github.com/Netflix/p2plab/pkg/logutil" "github.com/Netflix/p2plab/pkg/traceutil" opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" ) @@ -40,23 +42,21 @@ func Session(ctx context.Context, ns []p2plab.Node, fn func(context.Context) err zerolog.Ctx(ctx).Info().Msg("Starting a session for benchmarking") go logutil.Elapsed(gctx, 20*time.Second, "Starting a session for benchmarking") + var cancels = make([]context.CancelFunc, 0, len(ns)) + for _, n := range ns { + n := n + eg.Go(func() error { + lctx, cancel := context.WithCancel(gctx) + cancels = append(cancels, cancel) + pdef := n.Metadata().Peer + err := n.Update(lctx, n.ID(), "", pdef) + if err != nil && !errdefs.IsCancelled(err) { + return errors.Wrapf(err, "failed to update node %q", n.ID()) + } - // cancels := make([]context.CancelFunc, len(ns)) - // for i, n := range ns { - // i, n := i, n - // eg.Go(func() error { - // lctx, cancel := context.WithCancel(context.Background()) - // cancels[i] = cancel - - // pdef := n.Metadata().Peer - // err := n.Update(lctx, n.ID(), "", pdef) - // if err != nil && !errdefs.IsCancelled(err) { - // return errors.Wrapf(err, "failed to update node %q", n.ID()) - // } - - // return nil - // }) - // } + return nil + }) + } err := WaitHealthy(ctx, ns) if err != nil { @@ -68,10 +68,10 @@ func Session(ctx context.Context, ns []p2plab.Node, fn func(context.Context) err return nil, err } - // zerolog.Ctx(ctx).Info().Strs("nodes", ids).Msg("Ending the session") - // for _, cancel := range cancels { - // cancel() - // } + zerolog.Ctx(ctx).Info().Strs("nodes", ids).Msg("Ending the session") + for _, cancel := range cancels { + cancel() + } err = eg.Wait() if err != nil {