This repository has been archived by the owner on Nov 23, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 163
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fixes #246 - moved event stream processing to engine api
Signed-off-by: Joshua Horwitz <[email protected]>
- Loading branch information
Showing
5 changed files
with
318 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package events | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"io/ioutil" | ||
"reflect" | ||
"testing" | ||
|
||
"golang.org/x/net/context" | ||
) | ||
|
||
func TestEventHandler(t *testing.T) { | ||
eh := InitEventHandler() | ||
|
||
overriddenCalled := false | ||
eh.Handle("action", func(event Message) { | ||
overriddenCalled = true | ||
}) | ||
|
||
actionCalled := false | ||
eh.Handle("action", func(event Message) { | ||
actionCalled = true | ||
}) | ||
|
||
eh.Get("action")(Message{}) | ||
|
||
if overriddenCalled { | ||
t.Fatal("Expected handler that got overridden not to be called but was") | ||
} | ||
|
||
if !actionCalled { | ||
t.Fatal("Expected action handler to be called but it was not") | ||
} | ||
} | ||
|
||
func TestDecodeEvents(t *testing.T) { | ||
event := Message{ | ||
ID: "id", | ||
Status: "status", | ||
From: "from", | ||
Type: "type", | ||
Action: "action", | ||
Actor: Actor{ | ||
ID: "id", | ||
Attributes: map[string]string{ | ||
"label": "foo=bar", | ||
}, | ||
}, | ||
Time: 1, | ||
TimeNano: 1, | ||
} | ||
|
||
data, err := json.Marshal(event) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
eventq, errq := DecodeEvents(ctx, ioutil.NopCloser(bytes.NewBuffer(data))) | ||
|
||
select { | ||
case e := <-eventq: | ||
if !reflect.DeepEqual(event, e) { | ||
t.Fatal("Expected serialized event to be equal to the decoded event") | ||
} | ||
cancel() | ||
case err := <-errq: | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
package events | ||
|
||
import ( | ||
"encoding/json" | ||
"io" | ||
"sync" | ||
|
||
"golang.org/x/net/context" | ||
) | ||
|
||
// EventProcessor gets called when an event comes in | ||
// for a registered action | ||
type EventProcessor func(event Message) | ||
|
||
// EventHandler is abstract interface for user to customize | ||
// own handle functions of each type of events | ||
type EventHandler interface { | ||
// Handle registers the event processor for the given action. | ||
// Multiple calls to handle with same action will result in overriding | ||
// the pervious event processor for the same action. | ||
Handle(action string, ep EventProcessor) | ||
|
||
// Get the handler for the given action or nil if the action is not registered. | ||
Get(action string) EventProcessor | ||
} | ||
|
||
type eventHandler struct { | ||
handlers map[string]EventProcessor | ||
mu sync.Mutex | ||
} | ||
|
||
func (eh *eventHandler) Handle(action string, ep EventProcessor) { | ||
eh.mu.Lock() | ||
eh.handlers[action] = ep | ||
eh.mu.Unlock() | ||
} | ||
|
||
func (eh *eventHandler) Get(action string) EventProcessor { | ||
eh.mu.Lock() | ||
defer eh.mu.Unlock() | ||
return eh.handlers[action] | ||
} | ||
|
||
// InitEventHandler initializes and returns a new EventHandler | ||
func InitEventHandler() EventHandler { | ||
return &eventHandler{ | ||
handlers: make(map[string]EventProcessor), | ||
} | ||
} | ||
|
||
// DecodeEvents decodes events from the input stream asynchronously. It's the responsibility of the | ||
// caller to close the input reader. Processing can be stopped by canceling the context. | ||
// All decoded events will be placed on the returned message channel in order | ||
// they are received. Any errors will be returned on the error | ||
// channel. When an error is encountered processing will be stopped. An io.EOF will be | ||
// returned on the error channel once the end of the input has been reached. | ||
func DecodeEvents(ctx context.Context, input io.Reader) (<-chan Message, <-chan error) { | ||
errChan := make(chan error) | ||
eventsq := make(chan Message) | ||
|
||
go func(rdr io.Reader) { | ||
|
||
defer close(errChan) | ||
defer close(eventsq) | ||
|
||
dec := json.NewDecoder(input) | ||
var event Message | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
// exit | ||
return | ||
default: | ||
// Halt processing on any errors to signal the stream needs | ||
// to be restarted. | ||
if err := dec.Decode(&event); err != nil { | ||
errChan <- err | ||
return | ||
} | ||
|
||
select { | ||
case eventsq <- event: | ||
case <-ctx.Done(): | ||
// exit without sending decoded message | ||
return | ||
|
||
} | ||
} | ||
} | ||
}(input) | ||
|
||
return eventsq, errChan | ||
} |