Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix already reserved panic on concurrent invokes #133

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 4 additions & 48 deletions lambda/rapidcore/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,59 +107,35 @@ type Server struct {
var _ interop.Server = (*Server)(nil)

func (s *Server) setRapidPhase(phase rapidPhase) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.rapidPhase = phase
}

func (s *Server) getRapidPhase() rapidPhase {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.rapidPhase
}

func (s *Server) setRuntimeState(state runtimeState) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.runtimeState = state
}

func (s *Server) getRuntimeState() runtimeState {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.runtimeState
}

func (s *Server) SetInvokeTimeout(timeout time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.invokeTimeout = timeout
}

func (s *Server) GetInvokeTimeout() time.Duration {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.invokeTimeout
}

func (s *Server) GetInvokeContext() *InvokeContext {
s.mutex.Lock()
defer s.mutex.Unlock()

ctx := *s.invokeCtx
return &ctx
}

func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID string) (*ReserveResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx != nil {
return nil, ErrAlreadyReserved
}
Expand Down Expand Up @@ -226,9 +202,6 @@ func (s *Server) awaitInitCompletion() {
}

func (s *Server) setReplyStream(w http.ResponseWriter, direct bool) (string, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx == nil {
return "", ErrNotReserved
}
Expand All @@ -248,9 +221,6 @@ func (s *Server) setReplyStream(w http.ResponseWriter, direct bool) (string, err

// Release closes the invocation, making server ready for reserve again
func (s *Server) Release() error {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx == nil {
return ErrNotReserved
}
Expand All @@ -267,9 +237,6 @@ func (s *Server) Release() error {

// GetCurrentInvokeID
func (s *Server) GetCurrentInvokeID() string {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx == nil {
return ""
}
Expand Down Expand Up @@ -350,8 +317,6 @@ func (s *Server) sendResponseUnsafe(invokeID string, additionalHeaders map[strin

func (s *Server) SendResponse(invokeID string, resp *interop.StreamableInvokeResponse) error {
s.setRuntimeState(runtimeInvokeResponseSent)
s.mutex.Lock()
defer s.mutex.Unlock()
runtimeCalledResponse := true
return s.sendResponseUnsafe(invokeID, resp.Headers, resp.Payload, resp.Trailers, resp.Request, runtimeCalledResponse)
}
Expand All @@ -372,8 +337,6 @@ func (s *Server) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error
func (s *Server) SendErrorResponse(invokeID string, resp *interop.ErrorInvokeResponse) error {
log.Debugf("Sending Error Response: %s", resp.FunctionError.Type)
s.setRuntimeState(runtimeInvokeError)
s.mutex.Lock()
defer s.mutex.Unlock()
additionalHeaders := map[string]string{
directinvoke.ContentTypeHeader: resp.Headers.ContentType,
directinvoke.ErrorTypeHeader: string(resp.FunctionError.Type),
Expand Down Expand Up @@ -501,14 +464,10 @@ func deadlineNsFromTimeoutMs(timeoutMs int64) int64 {
}

func (s *Server) setInitFailuresChan() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.initFailures = make(chan interop.InitFailure)
}

func (s *Server) getInitFailuresChan() chan interop.InitFailure {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.initFailures
}

Expand Down Expand Up @@ -593,14 +552,10 @@ func (s *Server) FastInvoke(w http.ResponseWriter, i *interop.Invoke, direct boo
}

func (s *Server) setCachedInitErrorResponse(errResp *interop.ErrorInvokeResponse) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.cachedInitErrorResponse = errResp
}

func (s *Server) getCachedInitErrorResponse() *interop.ErrorInvokeResponse {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.cachedInitErrorResponse
}

Expand All @@ -613,8 +568,6 @@ func (s *Server) trySendDefaultErrorResponse(resp *interop.ErrorInvokeResponse)
}

func (s *Server) CurrentToken() *interop.Token {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.invokeCtx == nil {
return nil
}
Expand All @@ -625,6 +578,9 @@ func (s *Server) CurrentToken() *interop.Token {
// Invoke is used by the Runtime Interface Emulator (Rapid Local)
// https://github.com/aws/aws-lambda-runtime-interface-emulator
func (s *Server) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error {
s.mutex.Lock()
defer s.mutex.Unlock()

resetCtx, resetCancel := context.WithCancel(context.Background())
defer resetCancel()

Expand Down Expand Up @@ -768,7 +724,7 @@ func (s *Server) awaitInitialized() (initCompletionResponse, error) {
// since it can be called twice when a caller wants to wait until init is complete
func (s *Server) AwaitInitialized() error {
if _, err := s.awaitInitialized(); err != nil {
if releaseErr := s.Release(); err != nil {
if releaseErr := s.Release(); releaseErr != nil {
log.Infof("Error releasing after init failure %s: %s", err, releaseErr)
}
s.setRuntimeState(runtimeInitFailed)
Expand Down