Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve shared resource handling mechanism on MCI dynamic creation #1947

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ networks:
services:
# CB-Tumblebug
cb-tumblebug:
image: cloudbaristaorg/cb-tumblebug:0.10.2
image: cloudbaristaorg/cb-tumblebug:0.10.3
container_name: cb-tumblebug
build:
context: .
Expand Down
211 changes: 98 additions & 113 deletions src/core/infra/provisioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
validator "github.com/go-playground/validator/v10"
"github.com/go-resty/resty/v2"
"github.com/rs/zerolog/log"
"go.etcd.io/etcd/client/v3/concurrency"
"golang.org/x/net/context"
)

// TbMciReqStructLevelValidation is func to validate fields in TbMciReqStruct
Expand Down Expand Up @@ -958,54 +956,16 @@ func CreateMciDynamic(reqID string, nsId string, req *model.TbMciDynamicReq, dep
return emptyMci, err
}

// Create a persistent session for distributed lock
ctx := context.TODO()
session, err := kvstore.NewSession(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to create a session for dist-lock")
}
defer func() {
err := session.Close()
if err != nil {
log.Error().Err(err).Msg("Failed to close a session for dist-lock")
}
}()
log.Debug().Msgf("Created a session for dist-lock: %v", session)

//If not, generate default resources dynamically.
// Parallel processing of VM requests
var wg sync.WaitGroup
var mutex sync.Mutex
errChan := make(chan error, len(vmRequest)) // Error channel to collect errors

for _, k := range vmRequest {
wg.Add(1)

// Launch a goroutine for each VM request
go func(vmReq model.TbVmDynamicReq) {
defer wg.Done()

req, err := getVmReqFromDynamicReq(ctx, session, reqID, nsId, &vmReq)
// req, err := getVmReqFromDynamicReq(reqID, nsId, &vmReq)
if err != nil {
log.Error().Err(err).Msg("Failed to prepare resources for dynamic MCI creation")
errChan <- err
return
}

// Safely append to the shared mciReq.Vm slice
mutex.Lock()
mciReq.Vm = append(mciReq.Vm, *req)
mutex.Unlock()
}(k)
}

// Wait for all goroutines to complete
wg.Wait()
close(errChan) // Close the error channel after processing
/*
* [NOTE]
* 1. Generate default resources first
* 2. And then, parallel processing of VM requests
*/

// Check for any errors from the goroutines
for err := range errChan {
// Check if vmRequest has elements
if len(vmRequest) > 0 {
// Process the first vmRequest[0] synchronously
req, err := getVmReqFromDynamicReq(reqID, nsId, &vmRequest[0])
if err != nil {
// Rollback if any error occurs
log.Info().Msg("Rolling back created default resources")
Expand All @@ -1017,6 +977,54 @@ func CreateMciDynamic(reqID string, nsId string, req *model.TbMciDynamicReq, dep
return emptyMci, fmt.Errorf("rollback results [%s]: %w", ids, err)
}
}
mciReq.Vm = append(mciReq.Vm, *req)

// Process the rest of vmRequest[1:] in goroutines
if len(vmRequest) > 1 {
var wg sync.WaitGroup
var mutex sync.Mutex
errChan := make(chan error, len(vmRequest)-1) // Error channel to collect errors

for _, k := range vmRequest[1:] {
wg.Add(1)

// Launch a goroutine for each VM request
go func(vmReq model.TbVmDynamicReq) {
defer wg.Done()

req, err := getVmReqFromDynamicReq(reqID, nsId, &vmReq)
if err != nil {
log.Error().Err(err).Msg("Failed to prepare resources for dynamic MCI creation")
errChan <- err
return
}

// Safely append to the shared mciReq.Vm slice
mutex.Lock()
mciReq.Vm = append(mciReq.Vm, *req)
mutex.Unlock()
}(k)
}

// Wait for all goroutines to complete
wg.Wait()
close(errChan) // Close the error channel after processing

// Check for any errors from the goroutines
for err := range errChan {
if err != nil {
// Rollback if any error occurs
log.Info().Msg("Rolling back created default resources")
time.Sleep(5 * time.Second)
if rollbackResult, rollbackErr := resource.DelAllSharedResources(nsId); rollbackErr != nil {
return emptyMci, fmt.Errorf("failed in rollback operation: %w", rollbackErr)
} else {
ids := strings.Join(rollbackResult.IdList, ", ")
return emptyMci, fmt.Errorf("rollback results [%s]: %w", ids, err)
}
}
}
}
}

// Log the prepared MCI request and update the progress
Expand Down Expand Up @@ -1052,9 +1060,7 @@ func CreateMciVmDynamic(nsId string, mciId string, req *model.TbVmDynamicReq) (*
return emptyMci, err
}

ctx := context.TODO()

vmReq, err := getVmReqFromDynamicReq(ctx, nil, "", nsId, req)
vmReq, err := getVmReqFromDynamicReq("", nsId, req)
if err != nil {
log.Error().Err(err).Msg("")
return emptyMci, err
Expand Down Expand Up @@ -1111,7 +1117,7 @@ func checkCommonResAvailableForVmDynamicReq(req *model.TbVmDynamicReq, nsId stri
}

// getVmReqForDynamicMci is func to getVmReqFromDynamicReq
func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, reqID string, nsId string, req *model.TbVmDynamicReq) (*model.TbVmReq, error) {
func getVmReqFromDynamicReq(reqID string, nsId string, req *model.TbVmDynamicReq) (*model.TbVmReq, error) {

onDemand := true

Expand Down Expand Up @@ -1158,39 +1164,6 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r
}
}

/*
* [Critial Section]
* - Verify and create vNets in parallel
* - Use distributed-lock, considering running multiple cb-tumblebugs.
*/

var lock *concurrency.Mutex
if session != nil {
// Generate a resource key for vNet
vNetKey := common.GenResourceKey(nsId, model.StrVNet, resourceName)
lockKey := "/dist-lock/" + vNetKey

lock, err = kvstore.NewLock(ctx, session, lockKey)
if err != nil {
log.Error().Err(err).Msg("Failed to get a dist-lock")
}
log.Debug().Msgf("Created a dist-lock: %v", lock)

// Lock
err = lock.Lock(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to acquire a dist-lock")
}
// Unlock the lock when the function exits
defer func() {
err := lock.Unlock(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to release a dist-lock")
}
}()
log.Debug().Msgf("Acquired a dist-lock: %v", lock)
}

common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Setting vNet:" + resourceName, Time: time.Now()})

vmReq.VNetId = resourceName
Expand All @@ -1202,27 +1175,25 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r
return &model.TbVmReq{}, err
}
common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Loading default vNet:" + resourceName, Time: time.Now()})
err2 := resource.CreateSharedResource(nsId, model.StrVNet, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default vNet " + vmReq.VNetId + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default vNet: " + vmReq.VNetId)

// Check if the default vNet exists
_, err := resource.GetResource(nsId, model.StrVNet, vmReq.ConnectionName)
log.Debug().Err(err).Msg("checked if the default vNet does NOT exist")
// Create a new default vNet if it does not exist
if err != nil && strings.Contains(err.Error(), "does not exist") {
err2 := resource.CreateSharedResource(nsId, model.StrVNet, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default vNet " + vmReq.VNetId + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default vNet: " + vmReq.VNetId)
}
}
} else {
log.Info().Msg("Found and utilize default vNet: " + vmReq.VNetId)
}
vmReq.SubnetId = resourceName

if session != nil {
// Unlock the lock
err := lock.Unlock(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to release the dist-lock")
}
log.Debug().Msgf("Released the dist-lock: %v", lock)
}

common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Setting SSHKey:" + resourceName, Time: time.Now()})
vmReq.SshKeyId = resourceName
_, err = resource.GetResource(nsId, model.StrSSHKey, vmReq.SshKeyId)
Expand All @@ -1233,12 +1204,19 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r
return &model.TbVmReq{}, err
}
common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Loading default SSHKey:" + resourceName, Time: time.Now()})
err2 := resource.CreateSharedResource(nsId, model.StrSSHKey, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default SSHKey " + vmReq.SshKeyId + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default SSHKey: " + vmReq.VNetId)

// Check if the default SSHKey exists
_, err := resource.GetResource(nsId, model.StrSSHKey, vmReq.ConnectionName)
log.Debug().Err(err).Msg("checked if the default SSHKey does NOT exist")
// Create a new default SSHKey if it does not exist
if err != nil && strings.Contains(err.Error(), "does not exist") {
err2 := resource.CreateSharedResource(nsId, model.StrSSHKey, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default SSHKey " + vmReq.SshKeyId + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default SSHKey: " + vmReq.VNetId)
}
}
} else {
log.Info().Msg("Found and utilize default SSHKey: " + vmReq.VNetId)
Expand All @@ -1255,12 +1233,19 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r
return &model.TbVmReq{}, err
}
common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Loading default securityGroup:" + resourceName, Time: time.Now()})
err2 := resource.CreateSharedResource(nsId, model.StrSecurityGroup, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default securityGroup " + securityGroup + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default securityGroup: " + securityGroup)

// Check if the default security group exists
_, err := resource.GetResource(nsId, model.StrSecurityGroup, vmReq.ConnectionName)
// Create a new default security group if it does not exist
log.Debug().Err(err).Msg("checked if the default security group does NOT exist")
if err != nil && strings.Contains(err.Error(), "does not exist") {
err2 := resource.CreateSharedResource(nsId, model.StrSecurityGroup, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default securityGroup " + securityGroup + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default securityGroup: " + securityGroup)
}
}
} else {
log.Info().Msg("Found and utilize default securityGroup: " + securityGroup)
Expand Down