diff --git a/docker-compose.yaml b/docker-compose.yaml index 60942d1c..213f9063 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -7,7 +7,7 @@ networks: services: # CB-Tumblebug cb-tumblebug: - image: cloudbaristaorg/cb-tumblebug:0.10.2 + image: cloudbaristaorg/cb-tumblebug:0.10.3 container_name: cb-tumblebug build: context: . diff --git a/src/core/infra/provisioning.go b/src/core/infra/provisioning.go index ad8f14fb..f18a2625 100644 --- a/src/core/infra/provisioning.go +++ b/src/core/infra/provisioning.go @@ -30,8 +30,6 @@ import ( validator "github.com/go-playground/validator/v10" "github.com/go-resty/resty/v2" "github.com/rs/zerolog/log" - "go.etcd.io/etcd/client/v3/concurrency" - "golang.org/x/net/context" ) // TbMciReqStructLevelValidation is func to validate fields in TbMciReqStruct @@ -958,54 +956,16 @@ func CreateMciDynamic(reqID string, nsId string, req *model.TbMciDynamicReq, dep return emptyMci, err } - // Create a persistent session for distributed lock - ctx := context.TODO() - session, err := kvstore.NewSession(ctx) - if err != nil { - log.Error().Err(err).Msg("Failed to create a session for dist-lock") - } - defer func() { - err := session.Close() - if err != nil { - log.Error().Err(err).Msg("Failed to close a session for dist-lock") - } - }() - log.Debug().Msgf("Created a session for dist-lock: %v", session) - - //If not, generate default resources dynamically. - // Parallel processing of VM requests - var wg sync.WaitGroup - var mutex sync.Mutex - errChan := make(chan error, len(vmRequest)) // Error channel to collect errors - - for _, k := range vmRequest { - wg.Add(1) - - // Launch a goroutine for each VM request - go func(vmReq model.TbVmDynamicReq) { - defer wg.Done() - - req, err := getVmReqFromDynamicReq(ctx, session, reqID, nsId, &vmReq) - // req, err := getVmReqFromDynamicReq(reqID, nsId, &vmReq) - if err != nil { - log.Error().Err(err).Msg("Failed to prepare resources for dynamic MCI creation") - errChan <- err - return - } - - // Safely append to the shared mciReq.Vm slice - mutex.Lock() - mciReq.Vm = append(mciReq.Vm, *req) - mutex.Unlock() - }(k) - } - - // Wait for all goroutines to complete - wg.Wait() - close(errChan) // Close the error channel after processing + /* + * [NOTE] + * 1. Generate default resources first + * 2. And then, parallel processing of VM requests + */ - // Check for any errors from the goroutines - for err := range errChan { + // Check if vmRequest has elements + if len(vmRequest) > 0 { + // Process the first vmRequest[0] synchronously + req, err := getVmReqFromDynamicReq(reqID, nsId, &vmRequest[0]) if err != nil { // Rollback if any error occurs log.Info().Msg("Rolling back created default resources") @@ -1017,6 +977,54 @@ func CreateMciDynamic(reqID string, nsId string, req *model.TbMciDynamicReq, dep return emptyMci, fmt.Errorf("rollback results [%s]: %w", ids, err) } } + mciReq.Vm = append(mciReq.Vm, *req) + + // Process the rest of vmRequest[1:] in goroutines + if len(vmRequest) > 1 { + var wg sync.WaitGroup + var mutex sync.Mutex + errChan := make(chan error, len(vmRequest)-1) // Error channel to collect errors + + for _, k := range vmRequest[1:] { + wg.Add(1) + + // Launch a goroutine for each VM request + go func(vmReq model.TbVmDynamicReq) { + defer wg.Done() + + req, err := getVmReqFromDynamicReq(reqID, nsId, &vmReq) + if err != nil { + log.Error().Err(err).Msg("Failed to prepare resources for dynamic MCI creation") + errChan <- err + return + } + + // Safely append to the shared mciReq.Vm slice + mutex.Lock() + mciReq.Vm = append(mciReq.Vm, *req) + mutex.Unlock() + }(k) + } + + // Wait for all goroutines to complete + wg.Wait() + close(errChan) // Close the error channel after processing + + // Check for any errors from the goroutines + for err := range errChan { + if err != nil { + // Rollback if any error occurs + log.Info().Msg("Rolling back created default resources") + time.Sleep(5 * time.Second) + if rollbackResult, rollbackErr := resource.DelAllSharedResources(nsId); rollbackErr != nil { + return emptyMci, fmt.Errorf("failed in rollback operation: %w", rollbackErr) + } else { + ids := strings.Join(rollbackResult.IdList, ", ") + return emptyMci, fmt.Errorf("rollback results [%s]: %w", ids, err) + } + } + } + } } // Log the prepared MCI request and update the progress @@ -1052,9 +1060,7 @@ func CreateMciVmDynamic(nsId string, mciId string, req *model.TbVmDynamicReq) (* return emptyMci, err } - ctx := context.TODO() - - vmReq, err := getVmReqFromDynamicReq(ctx, nil, "", nsId, req) + vmReq, err := getVmReqFromDynamicReq("", nsId, req) if err != nil { log.Error().Err(err).Msg("") return emptyMci, err @@ -1111,7 +1117,7 @@ func checkCommonResAvailableForVmDynamicReq(req *model.TbVmDynamicReq, nsId stri } // getVmReqForDynamicMci is func to getVmReqFromDynamicReq -func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, reqID string, nsId string, req *model.TbVmDynamicReq) (*model.TbVmReq, error) { +func getVmReqFromDynamicReq(reqID string, nsId string, req *model.TbVmDynamicReq) (*model.TbVmReq, error) { onDemand := true @@ -1158,39 +1164,6 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r } } - /* - * [Critial Section] - * - Verify and create vNets in parallel - * - Use distributed-lock, considering running multiple cb-tumblebugs. - */ - - var lock *concurrency.Mutex - if session != nil { - // Generate a resource key for vNet - vNetKey := common.GenResourceKey(nsId, model.StrVNet, resourceName) - lockKey := "/dist-lock/" + vNetKey - - lock, err = kvstore.NewLock(ctx, session, lockKey) - if err != nil { - log.Error().Err(err).Msg("Failed to get a dist-lock") - } - log.Debug().Msgf("Created a dist-lock: %v", lock) - - // Lock - err = lock.Lock(ctx) - if err != nil { - log.Error().Err(err).Msg("Failed to acquire a dist-lock") - } - // Unlock the lock when the function exits - defer func() { - err := lock.Unlock(ctx) - if err != nil { - log.Error().Err(err).Msg("Failed to release a dist-lock") - } - }() - log.Debug().Msgf("Acquired a dist-lock: %v", lock) - } - common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Setting vNet:" + resourceName, Time: time.Now()}) vmReq.VNetId = resourceName @@ -1202,27 +1175,25 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r return &model.TbVmReq{}, err } common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Loading default vNet:" + resourceName, Time: time.Now()}) - err2 := resource.CreateSharedResource(nsId, model.StrVNet, vmReq.ConnectionName) - if err2 != nil { - log.Error().Err(err2).Msg("Failed to create new default vNet " + vmReq.VNetId + " from " + vmReq.ConnectionName) - return &model.TbVmReq{}, err2 - } else { - log.Info().Msg("Created new default vNet: " + vmReq.VNetId) + + // Check if the default vNet exists + _, err := resource.GetResource(nsId, model.StrVNet, vmReq.ConnectionName) + log.Debug().Err(err).Msg("checked if the default vNet does NOT exist") + // Create a new default vNet if it does not exist + if err != nil && strings.Contains(err.Error(), "does not exist") { + err2 := resource.CreateSharedResource(nsId, model.StrVNet, vmReq.ConnectionName) + if err2 != nil { + log.Error().Err(err2).Msg("Failed to create new default vNet " + vmReq.VNetId + " from " + vmReq.ConnectionName) + return &model.TbVmReq{}, err2 + } else { + log.Info().Msg("Created new default vNet: " + vmReq.VNetId) + } } } else { log.Info().Msg("Found and utilize default vNet: " + vmReq.VNetId) } vmReq.SubnetId = resourceName - if session != nil { - // Unlock the lock - err := lock.Unlock(ctx) - if err != nil { - log.Error().Err(err).Msg("Failed to release the dist-lock") - } - log.Debug().Msgf("Released the dist-lock: %v", lock) - } - common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Setting SSHKey:" + resourceName, Time: time.Now()}) vmReq.SshKeyId = resourceName _, err = resource.GetResource(nsId, model.StrSSHKey, vmReq.SshKeyId) @@ -1233,12 +1204,19 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r return &model.TbVmReq{}, err } common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Loading default SSHKey:" + resourceName, Time: time.Now()}) - err2 := resource.CreateSharedResource(nsId, model.StrSSHKey, vmReq.ConnectionName) - if err2 != nil { - log.Error().Err(err2).Msg("Failed to create new default SSHKey " + vmReq.SshKeyId + " from " + vmReq.ConnectionName) - return &model.TbVmReq{}, err2 - } else { - log.Info().Msg("Created new default SSHKey: " + vmReq.VNetId) + + // Check if the default SSHKey exists + _, err := resource.GetResource(nsId, model.StrSSHKey, vmReq.ConnectionName) + log.Debug().Err(err).Msg("checked if the default SSHKey does NOT exist") + // Create a new default SSHKey if it does not exist + if err != nil && strings.Contains(err.Error(), "does not exist") { + err2 := resource.CreateSharedResource(nsId, model.StrSSHKey, vmReq.ConnectionName) + if err2 != nil { + log.Error().Err(err2).Msg("Failed to create new default SSHKey " + vmReq.SshKeyId + " from " + vmReq.ConnectionName) + return &model.TbVmReq{}, err2 + } else { + log.Info().Msg("Created new default SSHKey: " + vmReq.VNetId) + } } } else { log.Info().Msg("Found and utilize default SSHKey: " + vmReq.VNetId) @@ -1255,12 +1233,19 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r return &model.TbVmReq{}, err } common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Loading default securityGroup:" + resourceName, Time: time.Now()}) - err2 := resource.CreateSharedResource(nsId, model.StrSecurityGroup, vmReq.ConnectionName) - if err2 != nil { - log.Error().Err(err2).Msg("Failed to create new default securityGroup " + securityGroup + " from " + vmReq.ConnectionName) - return &model.TbVmReq{}, err2 - } else { - log.Info().Msg("Created new default securityGroup: " + securityGroup) + + // Check if the default security group exists + _, err := resource.GetResource(nsId, model.StrSecurityGroup, vmReq.ConnectionName) + // Create a new default security group if it does not exist + log.Debug().Err(err).Msg("checked if the default security group does NOT exist") + if err != nil && strings.Contains(err.Error(), "does not exist") { + err2 := resource.CreateSharedResource(nsId, model.StrSecurityGroup, vmReq.ConnectionName) + if err2 != nil { + log.Error().Err(err2).Msg("Failed to create new default securityGroup " + securityGroup + " from " + vmReq.ConnectionName) + return &model.TbVmReq{}, err2 + } else { + log.Info().Msg("Created new default securityGroup: " + securityGroup) + } } } else { log.Info().Msg("Found and utilize default securityGroup: " + securityGroup)