Skip to content
This repository has been archived by the owner on Nov 23, 2019. It is now read-only.

Commit

Permalink
Fixes #246 - moved event stream processing to engine api
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Horwitz <[email protected]>
  • Loading branch information
jhorwit2 committed Jul 19, 2016
1 parent 98348ad commit e0b0080
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 0 deletions.
46 changes: 46 additions & 0 deletions client/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"golang.org/x/net/context"

"github.com/docker/engine-api/types"
eventtypes "github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
timetypes "github.com/docker/engine-api/types/time"
)
Expand Down Expand Up @@ -46,3 +47,48 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.
}
return serverResponse.body, nil
}

// EventsWithHandler decodes the stream of events handing them off to the appropriate
// handlers for processing. It's up to the caller to stop the processing by canceling
// the context. The error channel will be closed once the end of the input stream has been
// reached. If an error is received all processing will be stopped.
func (cli *Client) EventsWithHandler(ctx context.Context, options types.EventsOptions, handler eventtypes.EventHandler) <-chan error {
errChan := make(chan error)

go func() {
defer close(errChan)

stream, err := cli.Events(ctx, options)
if err != nil {
errChan <- err
return
}

defer stream.Close()

eventsq, errq := eventtypes.DecodeEvents(ctx, stream)

for {
select {
case <-ctx.Done():
// exit
return
case err := <-errq:
if err == io.EOF {
// end of input stream reached
return
}

errChan <- err
return
case event := <-eventsq:
processer := handler.Get(event.Action)
if processer != nil {
processer(event)
}
}
}
}()

return errChan
}
104 changes: 104 additions & 0 deletions client/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -11,6 +12,7 @@ import (
"golang.org/x/net/context"

"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
)

Expand Down Expand Up @@ -124,3 +126,105 @@ func TestEvents(t *testing.T) {
}
}
}

func TestEventsWithHandler(t *testing.T) {

expectedURL := "/events"

filters := filters.NewArgs()
filters.Add("type", events.ContainerEventType)
expectedFiltersJSON := fmt.Sprintf(`{"type":{"%s":true}}`, events.ContainerEventType)

eventsCases := []struct {
options types.EventsOptions
events []events.Message
expectedEvents map[string]bool
expectedQueryParams map[string]string
}{
{
options: types.EventsOptions{
Filters: filters,
},
expectedQueryParams: map[string]string{
"filters": expectedFiltersJSON,
},
events: []events.Message{
events.Message{
Type: "container",
ID: "1",
Action: "create",
},
events.Message{
Type: "container",
ID: "2",
Action: "die",
},
events.Message{
Type: "container",
ID: "3",
Action: "create",
},
},
expectedEvents: map[string]bool{
"1": true,
"2": true,
"3": true,
},
},
}

for _, eventsCase := range eventsCases {
client := &Client{
transport: newMockClient(nil, func(req *http.Request) (*http.Response, error) {
if !strings.HasPrefix(req.URL.Path, expectedURL) {
return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL)
}
query := req.URL.Query()
for key, expected := range eventsCase.expectedQueryParams {
actual := query.Get(key)
if actual != expected {
return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual)
}
}

buffer := new(bytes.Buffer)

for _, e := range eventsCase.events {
b, _ := json.Marshal(e)
buffer.Write(b)
}

return &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(buffer),
}, nil
}),
}

handler := events.InitEventHandler()

cb := func(e events.Message) {

_, ok := eventsCase.expectedEvents[e.ID]
if !ok {
t.Fatalf("event received not expected with action %s & id %s", e.Action, e.ID)
}

delete(eventsCase.expectedEvents, e.ID)
}

handler.Handle("create", cb)
handler.Handle("die", cb)

errChan := client.EventsWithHandler(context.Background(), eventsCase.options, handler)

err := <-errChan
if err != nil {
t.Fatal(err)
}

if len(eventsCase.expectedEvents) != 0 {
t.Fatalf("expected all events to have been called but still have left: %v", eventsCase.expectedEvents)
}
}
}
1 change: 1 addition & 0 deletions types/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ContainerExecInspect struct {
// ContainerListOptions holds parameters to list containers with.
type ContainerListOptions struct {
Quiet bool
Health bool
Size bool
All bool
Latest bool
Expand Down
73 changes: 73 additions & 0 deletions types/events/event_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package events

import (
"bytes"
"encoding/json"
"io/ioutil"
"reflect"
"testing"

"golang.org/x/net/context"
)

func TestEventHandler(t *testing.T) {
eh := InitEventHandler()

overriddenCalled := false
eh.Handle("action", func(event Message) {
overriddenCalled = true
})

actionCalled := false
eh.Handle("action", func(event Message) {
actionCalled = true
})

eh.Get("action")(Message{})

if overriddenCalled {
t.Fatal("Expected handler that got overridden not to be called but was")
}

if !actionCalled {
t.Fatal("Expected action handler to be called but it was not")
}
}

func TestDecodeEvents(t *testing.T) {
event := Message{
ID: "id",
Status: "status",
From: "from",
Type: "type",
Action: "action",
Actor: Actor{
ID: "id",
Attributes: map[string]string{
"label": "foo=bar",
},
},
Time: 1,
TimeNano: 1,
}

data, err := json.Marshal(event)
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithCancel(context.Background())
eventq, errq := DecodeEvents(ctx, ioutil.NopCloser(bytes.NewBuffer(data)))

select {
case e := <-eventq:
if !reflect.DeepEqual(event, e) {
t.Fatal("Expected serialized event to be equal to the decoded event")
}
cancel()
case err := <-errq:
if err != nil {
t.Fatal(err)
}
}
}
94 changes: 94 additions & 0 deletions types/events/events_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package events

import (
"encoding/json"
"io"
"sync"

"golang.org/x/net/context"
)

// EventProcessor gets called when an event comes in
// for a registered action
type EventProcessor func(event Message)

// EventHandler is abstract interface for user to customize
// own handle functions of each type of events
type EventHandler interface {
// Handle registers the event processor for the given action.
// Multiple calls to handle with same action will result in overriding
// the pervious event processor for the same action.
Handle(action string, ep EventProcessor)

// Get the handler for the given action or nil if the action is not registered.
Get(action string) EventProcessor
}

type eventHandler struct {
handlers map[string]EventProcessor
mu sync.Mutex
}

func (eh *eventHandler) Handle(action string, ep EventProcessor) {
eh.mu.Lock()
eh.handlers[action] = ep
eh.mu.Unlock()
}

func (eh *eventHandler) Get(action string) EventProcessor {
eh.mu.Lock()
defer eh.mu.Unlock()
return eh.handlers[action]
}

// InitEventHandler initializes and returns a new EventHandler
func InitEventHandler() EventHandler {
return &eventHandler{
handlers: make(map[string]EventProcessor),
}
}

// DecodeEvents decodes events from the input stream asynchronously. It's the responsibility of the
// caller to close the input reader. Processing can be stopped by canceling the context.
// All decoded events will be placed on the returned message channel in order
// they are received. Any errors will be returned on the error
// channel. When an error is encountered processing will be stopped. An io.EOF will be
// returned on the error channel once the end of the input has been reached.
func DecodeEvents(ctx context.Context, input io.Reader) (<-chan Message, <-chan error) {
errChan := make(chan error)
eventsq := make(chan Message)

go func(rdr io.Reader) {

defer close(errChan)
defer close(eventsq)

dec := json.NewDecoder(rdr)
var event Message

for {
select {
case <-ctx.Done():
// exit
return
default:
// Halt processing on any errors to signal the stream needs
// to be restarted.
if err := dec.Decode(&event); err != nil {
errChan <- err
return
}

select {
case eventsq <- event:
case <-ctx.Done():
// exit without sending decoded message
return

}
}
}
}(input)

return eventsq, errChan
}

0 comments on commit e0b0080

Please sign in to comment.