From e0b00800ee70739fe1a76a89d7f093ca7e8b835f Mon Sep 17 00:00:00 2001 From: Joshua Horwitz Date: Wed, 13 Jul 2016 18:33:15 -0400 Subject: [PATCH] Fixes #246 - moved event stream processing to engine api Signed-off-by: Joshua Horwitz --- client/events.go | 46 ++++++++++++++ client/events_test.go | 104 +++++++++++++++++++++++++++++++ types/client.go | 1 + types/events/event_utils_test.go | 73 ++++++++++++++++++++++ types/events/events_utils.go | 94 ++++++++++++++++++++++++++++ 5 files changed, 318 insertions(+) create mode 100644 types/events/event_utils_test.go create mode 100644 types/events/events_utils.go diff --git a/client/events.go b/client/events.go index f22a18e1..e0b27ec0 100644 --- a/client/events.go +++ b/client/events.go @@ -8,6 +8,7 @@ import ( "golang.org/x/net/context" "github.com/docker/engine-api/types" + eventtypes "github.com/docker/engine-api/types/events" "github.com/docker/engine-api/types/filters" timetypes "github.com/docker/engine-api/types/time" ) @@ -46,3 +47,48 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io. } return serverResponse.body, nil } + +// EventsWithHandler decodes the stream of events handing them off to the appropriate +// handlers for processing. It's up to the caller to stop the processing by canceling +// the context. The error channel will be closed once the end of the input stream has been +// reached. If an error is received all processing will be stopped. +func (cli *Client) EventsWithHandler(ctx context.Context, options types.EventsOptions, handler eventtypes.EventHandler) <-chan error { + errChan := make(chan error) + + go func() { + defer close(errChan) + + stream, err := cli.Events(ctx, options) + if err != nil { + errChan <- err + return + } + + defer stream.Close() + + eventsq, errq := eventtypes.DecodeEvents(ctx, stream) + + for { + select { + case <-ctx.Done(): + // exit + return + case err := <-errq: + if err == io.EOF { + // end of input stream reached + return + } + + errChan <- err + return + case event := <-eventsq: + processer := handler.Get(event.Action) + if processer != nil { + processer(event) + } + } + } + }() + + return errChan +} diff --git a/client/events_test.go b/client/events_test.go index cb459384..62a63f01 100644 --- a/client/events_test.go +++ b/client/events_test.go @@ -2,6 +2,7 @@ package client import ( "bytes" + "encoding/json" "fmt" "io/ioutil" "net/http" @@ -11,6 +12,7 @@ import ( "golang.org/x/net/context" "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/events" "github.com/docker/engine-api/types/filters" ) @@ -124,3 +126,105 @@ func TestEvents(t *testing.T) { } } } + +func TestEventsWithHandler(t *testing.T) { + + expectedURL := "/events" + + filters := filters.NewArgs() + filters.Add("type", events.ContainerEventType) + expectedFiltersJSON := fmt.Sprintf(`{"type":{"%s":true}}`, events.ContainerEventType) + + eventsCases := []struct { + options types.EventsOptions + events []events.Message + expectedEvents map[string]bool + expectedQueryParams map[string]string + }{ + { + options: types.EventsOptions{ + Filters: filters, + }, + expectedQueryParams: map[string]string{ + "filters": expectedFiltersJSON, + }, + events: []events.Message{ + events.Message{ + Type: "container", + ID: "1", + Action: "create", + }, + events.Message{ + Type: "container", + ID: "2", + Action: "die", + }, + events.Message{ + Type: "container", + ID: "3", + Action: "create", + }, + }, + expectedEvents: map[string]bool{ + "1": true, + "2": true, + "3": true, + }, + }, + } + + for _, eventsCase := range eventsCases { + client := &Client{ + transport: newMockClient(nil, func(req *http.Request) (*http.Response, error) { + if !strings.HasPrefix(req.URL.Path, expectedURL) { + return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL) + } + query := req.URL.Query() + for key, expected := range eventsCase.expectedQueryParams { + actual := query.Get(key) + if actual != expected { + return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual) + } + } + + buffer := new(bytes.Buffer) + + for _, e := range eventsCase.events { + b, _ := json.Marshal(e) + buffer.Write(b) + } + + return &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(buffer), + }, nil + }), + } + + handler := events.InitEventHandler() + + cb := func(e events.Message) { + + _, ok := eventsCase.expectedEvents[e.ID] + if !ok { + t.Fatalf("event received not expected with action %s & id %s", e.Action, e.ID) + } + + delete(eventsCase.expectedEvents, e.ID) + } + + handler.Handle("create", cb) + handler.Handle("die", cb) + + errChan := client.EventsWithHandler(context.Background(), eventsCase.options, handler) + + err := <-errChan + if err != nil { + t.Fatal(err) + } + + if len(eventsCase.expectedEvents) != 0 { + t.Fatalf("expected all events to have been called but still have left: %v", eventsCase.expectedEvents) + } + } +} diff --git a/types/client.go b/types/client.go index def3f061..0432099e 100644 --- a/types/client.go +++ b/types/client.go @@ -46,6 +46,7 @@ type ContainerExecInspect struct { // ContainerListOptions holds parameters to list containers with. type ContainerListOptions struct { Quiet bool + Health bool Size bool All bool Latest bool diff --git a/types/events/event_utils_test.go b/types/events/event_utils_test.go new file mode 100644 index 00000000..b84ec28b --- /dev/null +++ b/types/events/event_utils_test.go @@ -0,0 +1,73 @@ +package events + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "reflect" + "testing" + + "golang.org/x/net/context" +) + +func TestEventHandler(t *testing.T) { + eh := InitEventHandler() + + overriddenCalled := false + eh.Handle("action", func(event Message) { + overriddenCalled = true + }) + + actionCalled := false + eh.Handle("action", func(event Message) { + actionCalled = true + }) + + eh.Get("action")(Message{}) + + if overriddenCalled { + t.Fatal("Expected handler that got overridden not to be called but was") + } + + if !actionCalled { + t.Fatal("Expected action handler to be called but it was not") + } +} + +func TestDecodeEvents(t *testing.T) { + event := Message{ + ID: "id", + Status: "status", + From: "from", + Type: "type", + Action: "action", + Actor: Actor{ + ID: "id", + Attributes: map[string]string{ + "label": "foo=bar", + }, + }, + Time: 1, + TimeNano: 1, + } + + data, err := json.Marshal(event) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + eventq, errq := DecodeEvents(ctx, ioutil.NopCloser(bytes.NewBuffer(data))) + + select { + case e := <-eventq: + if !reflect.DeepEqual(event, e) { + t.Fatal("Expected serialized event to be equal to the decoded event") + } + cancel() + case err := <-errq: + if err != nil { + t.Fatal(err) + } + } +} diff --git a/types/events/events_utils.go b/types/events/events_utils.go new file mode 100644 index 00000000..04f8fbde --- /dev/null +++ b/types/events/events_utils.go @@ -0,0 +1,94 @@ +package events + +import ( + "encoding/json" + "io" + "sync" + + "golang.org/x/net/context" +) + +// EventProcessor gets called when an event comes in +// for a registered action +type EventProcessor func(event Message) + +// EventHandler is abstract interface for user to customize +// own handle functions of each type of events +type EventHandler interface { + // Handle registers the event processor for the given action. + // Multiple calls to handle with same action will result in overriding + // the pervious event processor for the same action. + Handle(action string, ep EventProcessor) + + // Get the handler for the given action or nil if the action is not registered. + Get(action string) EventProcessor +} + +type eventHandler struct { + handlers map[string]EventProcessor + mu sync.Mutex +} + +func (eh *eventHandler) Handle(action string, ep EventProcessor) { + eh.mu.Lock() + eh.handlers[action] = ep + eh.mu.Unlock() +} + +func (eh *eventHandler) Get(action string) EventProcessor { + eh.mu.Lock() + defer eh.mu.Unlock() + return eh.handlers[action] +} + +// InitEventHandler initializes and returns a new EventHandler +func InitEventHandler() EventHandler { + return &eventHandler{ + handlers: make(map[string]EventProcessor), + } +} + +// DecodeEvents decodes events from the input stream asynchronously. It's the responsibility of the +// caller to close the input reader. Processing can be stopped by canceling the context. +// All decoded events will be placed on the returned message channel in order +// they are received. Any errors will be returned on the error +// channel. When an error is encountered processing will be stopped. An io.EOF will be +// returned on the error channel once the end of the input has been reached. +func DecodeEvents(ctx context.Context, input io.Reader) (<-chan Message, <-chan error) { + errChan := make(chan error) + eventsq := make(chan Message) + + go func(rdr io.Reader) { + + defer close(errChan) + defer close(eventsq) + + dec := json.NewDecoder(rdr) + var event Message + + for { + select { + case <-ctx.Done(): + // exit + return + default: + // Halt processing on any errors to signal the stream needs + // to be restarted. + if err := dec.Decode(&event); err != nil { + errChan <- err + return + } + + select { + case eventsq <- event: + case <-ctx.Done(): + // exit without sending decoded message + return + + } + } + } + }(input) + + return eventsq, errChan +}