Skip to content
This repository has been archived by the owner on Sep 6, 2022. It is now read-only.

Commit

Permalink
Reduce MVCC conflicts during compute plans (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aurélien Gasser authored Feb 5, 2021
1 parent f48f934 commit 006a45c
Show file tree
Hide file tree
Showing 23 changed files with 572 additions and 267 deletions.
2 changes: 1 addition & 1 deletion chaincode/algo_aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestAggregateAlgo(t *testing.T) {
Checksum: inpAlgo.DescriptionChecksum,
StorageAddress: inpAlgo.DescriptionStorageAddress,
},
Owner: worker,
Owner: workerA,
Permissions: outputPermissions{
Process: Permission{Public: true, AuthorizedIDs: []string{}},
},
Expand Down
2 changes: 1 addition & 1 deletion chaincode/algo_composite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestCompositeAlgo(t *testing.T) {
Checksum: inpAlgo.DescriptionChecksum,
StorageAddress: inpAlgo.DescriptionStorageAddress,
},
Owner: worker,
Owner: workerA,
Permissions: outputPermissions{
Process: Permission{Public: true, AuthorizedIDs: []string{}},
},
Expand Down
2 changes: 1 addition & 1 deletion chaincode/algo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestAlgo(t *testing.T) {
Checksum: inpAlgo.DescriptionChecksum,
StorageAddress: inpAlgo.DescriptionStorageAddress,
},
Owner: worker,
Owner: workerA,
Permissions: outputPermissions{
Process: Permission{Public: true, AuthorizedIDs: []string{}},
},
Expand Down
175 changes: 71 additions & 104 deletions chaincode/compute_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func createComputePlanInternal(db *LedgerDB, inp inputComputePlan, tag string, m
len(inp.CompositeTraintuples) +
len(inp.Testtuples)
if count == 0 {
resp.Fill(inp.Key, computePlan, []string{})
resp.Fill(inp.Key, computePlan, []string{}, 0, 0)
return resp, nil
}
return updateComputePlanInternal(db, inp)
Expand Down Expand Up @@ -248,7 +248,12 @@ func updateComputePlanInternal(db *LedgerDB, inp inputComputePlan) (resp outputC
if err != nil {
return resp, err
}
resp.Fill(inp.Key, computePlan, NewIDs)
doneCount, tupleCount, err := computePlan.getTupleCounts(db)
if err != nil {
return resp, err
}

resp.Fill(inp.Key, computePlan, NewIDs, doneCount, tupleCount)
return resp, err
}

Expand Down Expand Up @@ -303,7 +308,12 @@ func getOutComputePlan(db *LedgerDB, key string) (resp outputComputePlan, err er
return resp, err
}

resp.Fill(key, computePlan, []string{})
doneCount, tupleCount, err := computePlan.getTupleCounts(db)
if err != nil {
return resp, err
}

resp.Fill(key, computePlan, []string{}, doneCount, tupleCount)
return resp, err
}

Expand All @@ -324,18 +334,30 @@ func cancelComputePlan(db *LedgerDB, args []string) (resp outputComputePlan, err
return outputComputePlan{}, err
}

err = db.AddComputePlanEvent(inp.Key, computeplan.State.Status, computeplan.State.IntermediaryModelsInUse)
models, err := computeplan.removeAllIntermediaryModels(db)
if err != nil {
return outputComputePlan{}, err
}
resp.Fill(inp.Key, computeplan, []string{})

err = db.AddComputePlanEvent(inp.Key, computeplan.State.Status, models)
if err != nil {
return outputComputePlan{}, err
}

doneCount, tupleCount, err := computeplan.getTupleCounts(db)
if err != nil {
return resp, err
}
resp.Fill(inp.Key, computeplan, []string{}, doneCount, tupleCount)
return resp, nil
}

// Create adds a Compute Plan to the ledger and registers it in the compute plan index
func (cp *ComputePlan) Create(db *LedgerDB, key string) error {
cp.Key = key
cp.StateKey = GetRandomHash()
cp.AssetType = ComputePlanType
cp.Workers = []string{}
err := db.Add(key, cp)
if err != nil {
return err
Expand Down Expand Up @@ -366,47 +388,69 @@ func (cp *ComputePlan) SaveState(db *LedgerDB) error {

// UpdateState check the tuple status (from an updated tuple or a new one)
// and, if required, it updates the compute plan' status and/or its doneCount.
// It returns true if there is any change to the compute plan, false otherwise.
func (cp *ComputePlan) UpdateState(tupleStatus string) bool {
// It returns true and the list of models to delete if there is any change to the compute plan, false and empty list otherwise.
func (cp *ComputePlan) UpdateState(db *LedgerDB, tupleStatus string, worker string) (bool, []string, error) {
switch cp.State.Status {
case StatusFailed, StatusCanceled:
case StatusDone:
// We might add tuples to a done compute plan
if stringInSlice(tupleStatus, []string{StatusWaiting, StatusTodo}) {
cp.State.Status = tupleStatus
return true
return true, []string{}, nil
}
case StatusDoing:
switch tupleStatus {
case StatusFailed:
cp.State.Status = tupleStatus
return true
return true, []string{}, nil
case StatusDone:
cp.State.DoneCount++
if cp.State.DoneCount == cp.State.TupleCount {
cp.State.Status = tupleStatus
// In order for the CP to transition to the "done" state, each worker must have all
// its tuples in the "done" state. Checking the state of all the workers is
// expensive and can lead to MVCC conflicts because workers each write to their
// respective states concurrently. To mitigate this issue, we first check if the
// *current* worker has finished processing all of its tuples. Only if that's the
// case do we inspect the state of other workers.
cp.incrementWorkerDoneCount(db, worker)
wStateKey := cp.getCPWorkerStateKey(worker)
wState, err := db.GetCPWorkerState(wStateKey)
if err != nil {
return false, []string{}, err
}
if wState.DoneCount == wState.TupleCount {
doneCount, tupleCount, err := cp.getTupleCounts(db)
if err != nil {
return false, []string{}, err
}
if doneCount == tupleCount {
modelsToDelete, err := cp.removeAllIntermediaryModels(db)
if err != nil {
return false, []string{}, err
}
cp.State.Status = StatusDone
return true, modelsToDelete, nil
}
}
return true
return false, []string{}, nil
}
case StatusTodo:
if tupleStatus == StatusDoing {
cp.State.Status = tupleStatus
return true
return true, []string{}, nil
}
case StatusWaiting:
if tupleStatus == StatusTodo {
cp.State.Status = tupleStatus
return true
return true, []string{}, nil
}
case "":
cp.State.Status = tupleStatus
return true
return true, []string{}, nil
}
return false
return false, []string{}, nil
}

// AddTuple add the tuple key to the compute plan and update it accordingly
func (cp *ComputePlan) AddTuple(tupleType AssetType, key, status string) {
func (cp *ComputePlan) AddTuple(db *LedgerDB, tupleType AssetType, key, status string, worker string) error {
switch tupleType {
case TraintupleType:
cp.TraintupleKeys = append(cp.TraintupleKeys, key)
Expand All @@ -417,112 +461,35 @@ func (cp *ComputePlan) AddTuple(tupleType AssetType, key, status string) {
case TesttupleType:
cp.TesttupleKeys = append(cp.TesttupleKeys, key)
}
cp.State.TupleCount++
cp.UpdateState(status)
cp.incrementWorkerTupleCount(db, worker)
_, _, err := cp.UpdateState(db, status, worker)
return err
}

// UpdateComputePlanState retreive the compute plan if the ID is not empty,
// check if the updated status change anything and save it if it's the case
func UpdateComputePlanState(db *LedgerDB, ComputePlanKey, tupleStatus, tupleKey string) error {
func UpdateComputePlanState(db *LedgerDB, ComputePlanKey, tupleStatus, tupleKey string, worker string) error {
if ComputePlanKey == "" {
return nil
}
cp, err := db.GetComputePlan(ComputePlanKey)
if err != nil {
return err
}
stateUpdated := cp.UpdateState(tupleStatus)
doneModels := []string{}
if tupleStatus == StatusDone && cp.CleanModels {
doneModels, err = cp.UpdateIntermediaryModelsInuse(db)
stateUpdated, modelsToDelete, err := cp.UpdateState(db, tupleStatus, worker)
if err != nil {
return err
}

if err != nil {
return err
}
if stateUpdated || len(doneModels) != 0 {
err = db.AddComputePlanEvent(ComputePlanKey, cp.State.Status, doneModels)
if stateUpdated || len(modelsToDelete) > 0 {
err = db.AddComputePlanEvent(ComputePlanKey, cp.State.Status, modelsToDelete)
if err != nil {
return err
}
return cp.SaveState(db)
}
return nil
}

// TryAddIntermediaryModel will reference the model key if the compute plan key
// is not empty and if it's an intermediary model meaning without any children
func TryAddIntermediaryModel(db *LedgerDB, ComputePlanKey, tupleKey, modelKey string) error {
if ComputePlanKey == "" {
return nil
}
cp, err := db.GetComputePlan(ComputePlanKey)
if err != nil {
return err
}
if !cp.CleanModels {
return nil
}
allChildKeys, err := db.GetIndexKeys("tuple~inModel~key", []string{"tuple", tupleKey})
if err != nil {
return err
}
if len(allChildKeys) == 0 {
// If a tuple has no children it's considered final and should not be
// listed in the index
return nil
}
cp.State.IntermediaryModelsInUse = append(cp.State.IntermediaryModelsInUse, modelKey)

return cp.SaveState(db)
}

// UpdateIntermediaryModelsInuse check all models listed as intermediary. If any of
// them are 'done', meaning that there is no train like tuples or testtuples
// planned to use this model. If that the case its key will be added to the
// returned slice and remove from the compute plan's one.
func (cp *ComputePlan) UpdateIntermediaryModelsInuse(db *LedgerDB) ([]string, error) {
if !cp.CleanModels {
return []string{}, nil
}
var doneModels, inUseModels []string
for _, modelKey := range cp.State.IntermediaryModelsInUse {
done := true
keys, err := db.GetIndexKeys("tuple~modelKey~key", []string{"tuple", modelKey})
if err != nil {
return []string{}, err
}
if len(keys) == 0 {
// This occurs for the keys added during the same transaction. But
// thoses models can just be added to the in use ones
inUseModels = append(inUseModels, modelKey)
continue
}
tupleKey := keys[0]
tupleChildKeys, err := db.GetIndexKeys("tuple~inModel~key", []string{"tuple", tupleKey})
if err != nil {
return []string{}, err
}
testtupleKeys, err := db.GetIndexKeys("testtuple~traintuple~certified~key", []string{"testtuple", tupleKey})
if err != nil {
return []string{}, err
}
allKeys := append(tupleChildKeys, testtupleKeys...)
for _, key := range allKeys {
tuple, err := db.GetGenericTuple(key)
if err != nil {
return []string{}, err
}
if tuple.Status != StatusDone {
inUseModels = append(inUseModels, modelKey)
done = false
break
}
}
if done {
doneModels = append(doneModels, modelKey)
}
}
cp.State.IntermediaryModelsInUse = inUseModels
return doneModels, nil
}
Loading

0 comments on commit 006a45c

Please sign in to comment.