Skip to content

Commit

Permalink
Enhance handling massive system calls
Browse files Browse the repository at this point in the history
  • Loading branch information
cb-github-robot authored Mar 25, 2024
2 parents d4ba597 + 10023ac commit 5fe2a70
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 77 deletions.
71 changes: 65 additions & 6 deletions src/core/common/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ type CacheItem[T any] struct {
ExpiresAt time.Time
}

// clientCache is a map for cache items of intenal calls
var clientCache = sync.Map{}

// clientRequestCounter is a map for request counters of intenal calls
var clientRequestCounter = sync.Map{}

const (
// ShortDuration is a duration for short-term cache
ShortDuration = 2 * time.Second
Expand All @@ -55,6 +59,30 @@ func SetUseBody(requestBody interface{}) bool {
return true
}

// limitConcurrentRequests limits the number of Concurrent requests to the given limit
func limitConcurrentRequests(requestKey string, limit int) bool {
count, _ := clientRequestCounter.LoadOrStore(requestKey, 0)
currentCount := count.(int)

if currentCount >= limit {
fmt.Printf("[%s] requests for %s \n", currentCount, requestKey)
return false
}

clientRequestCounter.Store(requestKey, currentCount+1)
return true
}

// requestDone decreases the request counter
func requestDone(requestKey string) {
count, _ := clientRequestCounter.Load(requestKey)
currentCount := count.(int)

if currentCount > 0 {
clientRequestCounter.Store(requestKey, currentCount-1)
}
}

// ExecuteHttpRequest performs the HTTP request and fills the result (var requestBody interface{} = nil for empty body)
func ExecuteHttpRequest[B any, T any](
client *resty.Client,
Expand All @@ -68,7 +96,7 @@ func ExecuteHttpRequest[B any, T any](
) error {

// Generate cache key for GET method only
cacheKey := ""
requestKey := ""
if method == "GET" {

if useBody {
Expand All @@ -78,13 +106,13 @@ func ExecuteHttpRequest[B any, T any](
return fmt.Errorf("JSON marshaling failed: %w", err)
}
// Create cache key using both URL and body
cacheKey = fmt.Sprintf("%s_%s_%s", method, url, string(bodyString))
requestKey = fmt.Sprintf("%s_%s_%s", method, url, string(bodyString))
} else {
// Create cache key using only URL
cacheKey = fmt.Sprintf("%s_%s", method, url)
requestKey = fmt.Sprintf("%s_%s", method, url)
}

if item, found := clientCache.Load(cacheKey); found {
if item, found := clientCache.Load(requestKey); found {
cachedItem := item.(CacheItem[T]) // Generic type
if time.Now().Before(cachedItem.ExpiresAt) {
fmt.Println("Cache hit! Expires: ", time.Now().Sub(cachedItem.ExpiresAt))
Expand All @@ -96,7 +124,35 @@ func ExecuteHttpRequest[B any, T any](
return nil
} else {
fmt.Println("Cache item expired!")
clientCache.Delete(cacheKey)
clientCache.Delete(requestKey)
}
}

// Limit the number of concurrent requests
concurrencyLimit := 10
retryWait := 5 * time.Second
retryLimit := 3
retryCount := 0
// try to wait for the upcomming cached result when sending que is full
for {
if !limitConcurrentRequests(requestKey, concurrencyLimit) {
if retryCount >= retryLimit {
fmt.Printf("Too many same requests: %s\n", requestKey)
return fmt.Errorf("Too many same requests: %s", requestKey)
}
time.Sleep(retryWait)

if item, found := clientCache.Load(requestKey); found {
cachedItem := item.(CacheItem[T])
*result = cachedItem.Response
// release the request count for parallel requests limit
requestDone(requestKey)
fmt.Println("Got the cached result while waiting")
return nil
}
retryCount++
} else {
break
}
}
}
Expand Down Expand Up @@ -147,11 +203,14 @@ func ExecuteHttpRequest[B any, T any](
//val := reflect.ValueOf(result).Elem()
//newCacheItem := val.Interface()

// release the request count for parallel requests limit
requestDone(requestKey)

// Check if result is nil
if result == nil {
fmt.Println("Warning: result is nil, not caching.")
} else {
clientCache.Store(cacheKey, CacheItem[T]{Response: *result, ExpiresAt: time.Now().Add(cacheDuration)})
clientCache.Store(requestKey, CacheItem[T]{Response: *result, ExpiresAt: time.Now().Add(cacheDuration)})
fmt.Println("Cached successfully!")
}
}
Expand Down
39 changes: 13 additions & 26 deletions src/core/mcis/manageInfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,26 +965,29 @@ func ListMcisStatus(nsId string) ([]McisStatusInfo, error) {

// GetVmCurrentPublicIp is func to get VM public IP
func GetVmCurrentPublicIp(nsId string, mcisId string, vmId string) (TbVmStatusInfo, error) {

fmt.Println("[GetVmStatus]" + vmId)
key := common.GenMcisKey(nsId, mcisId, vmId)
errorInfo := TbVmStatusInfo{}
errorInfo.Status = StatusFailed

key := common.GenMcisKey(nsId, mcisId, vmId)
keyValue, err := common.CBStore.Get(key)
if err != nil || keyValue == nil {
fmt.Println(err)
common.CBLog.Error(err)
return errorInfo, err
}

temp := TbVmInfo{}
unmarshalErr := json.Unmarshal([]byte(keyValue.Value), &temp)
if unmarshalErr != nil {
fmt.Println("unmarshalErr:", unmarshalErr)
err = json.Unmarshal([]byte(keyValue.Value), &temp)
if err != nil {
common.CBLog.Error(err)
return errorInfo, err
}
fmt.Println("\n[Calling SPIDER] START")
fmt.Println("CspVmId: " + temp.CspViewVmDetail.IId.NameId)

cspVmId := temp.CspViewVmDetail.IId.NameId
if cspVmId == "" {
err = fmt.Errorf("cspVmId is empty (VmId: %s)", vmId)
common.CBLog.Error(err)
return errorInfo, err
}

type statusResponse struct {
Status string
Expand Down Expand Up @@ -1014,8 +1017,6 @@ func GetVmCurrentPublicIp(nsId string, mcisId string, vmId string) (TbVmStatusIn
common.MediumDuration,
)

errorInfo.Status = StatusFailed

if err != nil {
common.CBLog.Error(err)
return errorInfo, err
Expand Down Expand Up @@ -1123,7 +1124,7 @@ func FetchVmStatus(nsId string, mcisId string, vmId string) (TbVmStatusInfo, err
callResult := statusResponse{}
callResult.Status = ""

if temp.Status != StatusTerminated {
if temp.Status != StatusTerminated && cspVmId != "" {
client := resty.New()
url := common.SpiderRestUrl + "/vmstatus/" + cspVmId
method := "GET"
Expand Down Expand Up @@ -1323,20 +1324,6 @@ func GetMcisVmStatus(nsId string, mcisId string, vmId string) (*TbVmStatusInfo,
return temp, err
}

fmt.Println("[status VM]")

vmKey := common.GenMcisKey(nsId, mcisId, vmId)
vmKeyValue, err := common.CBStore.Get(vmKey)
if err != nil {
err = fmt.Errorf("in CoreGetMcisVmStatus(); CBStore.Get() returned an error")
common.CBLog.Error(err)
// return nil, err
}

if vmKeyValue == nil {
return nil, fmt.Errorf("Cannot find " + vmKey)
}

vmStatusResponse, err := FetchVmStatus(nsId, mcisId, vmId)

if err != nil {
Expand Down
45 changes: 0 additions & 45 deletions src/core/mcis/provisioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,30 +907,10 @@ func CreateMcis(nsId string, req *TbMcisReq, option string) (*TbMcisInfo, error)
// returns InvalidValidationError for bad validation input, nil or ValidationErrors ( []FieldError )
err = validate.Struct(req)
if err != nil {

// this check is only needed when your code could produce
// an invalid value for validation such as interface with nil
// value most including myself do not usually have code like this.
if _, ok := err.(*validator.InvalidValidationError); ok {
fmt.Println(err)
return nil, err
}

// for _, err := range err.(validator.ValidationErrors) {

// fmt.Println(err.Namespace()) // can differ when a custom TagNameFunc is registered or
// fmt.Println(err.Field()) // by passing alt name to ReportError like below
// fmt.Println(err.StructNamespace())
// fmt.Println(err.StructField())
// fmt.Println(err.Tag())
// fmt.Println(err.ActualTag())
// fmt.Println(err.Kind())
// fmt.Println(err.Type())
// fmt.Println(err.Value())
// fmt.Println(err.Param())
// fmt.Println()
// }

return nil, err
}

Expand Down Expand Up @@ -978,17 +958,6 @@ func CreateMcis(nsId string, req *TbMcisReq, option string) (*TbMcisInfo, error)
return nil, err
}

keyValue, err := common.CBStore.Get(key)
if err != nil {
common.CBLog.Error(err)
err = fmt.Errorf("In CreateMcis(); CBStore.Get() returned an error.")
common.CBLog.Error(err)
// return nil, err
}

fmt.Println("<" + keyValue.Key + "> \n" + keyValue.Value)
fmt.Println("===========================")

// Check whether VM names meet requirement.
for _, k := range vmRequest {
err = common.CheckString(k.Name)
Expand Down Expand Up @@ -1031,16 +1000,6 @@ func CreateMcis(nsId string, req *TbMcisReq, option string) (*TbMcisInfo, error)
if err != nil {
common.CBLog.Error(err)
}
keyValue, err := common.CBStore.Get(key)
if err != nil {
common.CBLog.Error(err)
err = fmt.Errorf("In CreateMcis(); CBStore.Get() returned an error.")
common.CBLog.Error(err)
// return nil, err
}

fmt.Println("<" + keyValue.Key + "> \n" + keyValue.Value)
fmt.Println("===========================")

}

Expand Down Expand Up @@ -1520,13 +1479,9 @@ func AddVmToMcis(wg *sync.WaitGroup, nsId string, mcisId string, vmInfoData *TbV
return err
}

fmt.Printf("\n[AddVmToMcis Before request vmInfoData]\n")

//instanceIds, publicIPs := CreateVm(&vmInfoData)
err = CreateVm(nsId, mcisId, vmInfoData, option)

fmt.Printf("\n[AddVmToMcis After request vmInfoData]\n")

if err != nil {
vmInfoData.Status = StatusFailed
vmInfoData.SystemMessage = err.Error()
Expand Down

0 comments on commit 5fe2a70

Please sign in to comment.