Skip to content

Commit

Permalink
TWeak how HTTPClient is propagated, fix timing issue
Browse files Browse the repository at this point in the history
  • Loading branch information
lestrrat committed Sep 27, 2024
1 parent 4c95eb8 commit ccc0448
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 37 deletions.
22 changes: 18 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package httprc

import (
"context"
"net/http"
"sync"
"time"

Expand All @@ -13,6 +14,7 @@ import (
// Client is the main entry point for the httprc package.
type Client struct {
mu sync.Mutex
httpcl HTTPClient
numWorkers int
running bool
errSink ErrorSink
Expand All @@ -34,11 +36,14 @@ func NewClient(options ...NewClientOption) *Client {
//nolint:stylecheck
var traceSink TraceSink = tracesink.NewNop()
var wl Whitelist = InsecureWhitelist{}
var httpcl HTTPClient = http.DefaultClient

numWorkers := DefaultWorkers
//nolint:forcetypeassert
for _, option := range options {
switch option.Ident() {
case identHTTPClient{}:
httpcl = option.Value().(HTTPClient)
case identWorkers{}:
numWorkers = option.Value().(int)
case identErrorSink{}:
Expand All @@ -54,6 +59,7 @@ func NewClient(options ...NewClientOption) *Client {
numWorkers = 1
}
return &Client{
httpcl: httpcl,
numWorkers: numWorkers,
errSink: errSink,
traceSink: traceSink,
Expand Down Expand Up @@ -115,19 +121,27 @@ func (c *Client) Start(octx context.Context) (Controller, error) {
syncoutgoing := make(chan synchronousRequest, c.numWorkers)
wg.Add(c.numWorkers)
for range c.numWorkers {
go worker(ctx, &wg, outgoing, syncoutgoing, errSink, traceSink)
wrk := worker{
incoming: incoming,
next: outgoing,
nextsync: syncoutgoing,
errSink: errSink,
traceSink: traceSink,
httpcl: c.httpcl,
}
go wrk.Run(ctx, &wg)
}

tickDuration := oneDay
tickInterval := oneDay
ctrl := &controller{
cancel: cancel,
items: make(map[string]Resource),
outgoing: outgoing,
syncoutgoing: syncoutgoing,
incoming: incoming,
traceSink: traceSink,
tickDuration: tickDuration,
check: time.NewTicker(tickDuration),
tickInterval: tickInterval,
check: time.NewTicker(tickInterval),
shutdown: make(chan struct{}),
wl: c.wl,
}
Expand Down
30 changes: 25 additions & 5 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type controller struct {

syncoutgoing chan synchronousRequest
items map[string]Resource
tickDuration time.Duration
tickInterval time.Duration
shutdown chan struct{}

wl Whitelist
Expand Down Expand Up @@ -86,6 +86,9 @@ type addRequest ctrlRequest[error]
type rmRequest ctrlRequest[error]
type refreshRequest ctrlRequest[error]
type lookupRequest ctrlRequest[lookupReply]
type adjustIntervalRequest struct {
resource Resource
}

// Lookup returns a resource by its URL. If the resource does not exist, it
// will return an error.
Expand Down Expand Up @@ -190,6 +193,17 @@ func (c *controller) Refresh(ctx context.Context, u string) error {

func (c *controller) handleRequest(ctx context.Context, req any) {
switch req := req.(type) {
case adjustIntervalRequest:
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: got adjust request (time until next check: %s)", time.Until(req.resource.Next())))
interval := time.Until(req.resource.Next())
if interval < time.Second {
interval = time.Second
}
if c.tickInterval > interval {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: adjusting tick interval to %s", interval))
c.tickInterval = interval
c.check.Reset(interval)
}
case addRequest:
r := req.resource
if _, ok := c.items[r.URL()]; ok {
Expand All @@ -202,9 +216,9 @@ func (c *controller) handleRequest(ctx context.Context, req any) {

// force the next check to happen immediately
if d := r.ConstantInterval(); d > 0 {
c.tickDuration = d
c.tickInterval = d
} else if d := r.MinimumInterval(); d > 0 {
c.tickDuration = d
c.tickInterval = d
}

c.check.Reset(time.Nanosecond)
Expand Down Expand Up @@ -286,14 +300,20 @@ func (c *controller) loop(ctx context.Context, wg *sync.WaitGroup) {
case t := <-c.check.C:
// Always reset the ticker because the previous tick
// could have arrived by way of a forced tick
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: checking resources. Next check in %s", time.Now().Add(c.tickDuration)))
c.check.Reset(c.tickDuration)
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: checking resources. Next check in %s", time.Now().Add(c.tickInterval)))

minInterval := c.tickInterval
for _, item := range c.items {
if minInterval > item.MinimumInterval() {
minInterval = item.MinimumInterval()
}
if item.IsBusy() || item.Next().After(t) {
continue
}
sendWorker(ctx, c.outgoing, item)
}
c.tickInterval = minInterval
c.check.Reset(c.tickInterval)
case <-ctx.Done():
return
}
Expand Down
42 changes: 34 additions & 8 deletions httprc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,23 @@ func TestClient(t *testing.T) {
type Hello struct {
Hello string `json:"hello"`
}

start := time.Now()
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "max-age=1")
var version string
if time.Since(start) > 2*time.Second {
version = "2"
}
switch r.URL.Path {
case "/json/helloptr", "/json/hello", "/json/hellomap":
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"hello":"world"}`))
switch version {
case "2":
w.Write([]byte(`{"hello":"world2"}`))
default:
w.Write([]byte(`{"hello":"world"}`))
}
case "/int":
w.Header().Set("Content-Type", "text/plain")
w.Write([]byte(`42`))
Expand All @@ -41,38 +53,42 @@ func TestClient(t *testing.T) {
defer cancel()

options := []httprc.NewClientOption{
// httprc.WithWhitelist(httprc.NewInsecureWhitelist()),
// httprc.WithTraceSink(tracesink.NewSlog(slog.New(slog.NewJSONHandler(os.Stdout, nil)))),
}
cl := httprc.NewClient(options...)
ctrl, err := cl.Start(ctx)
require.NoError(t, err, `cl.Run should succeed`)
defer ctrl.Shutdown(time.Second)

testcases := []struct {
URL string
Create func() (httprc.Resource, error)
Expected any
URL string
Create func() (httprc.Resource, error)
Expected any
Expected2 any
}{
{
URL: srv.URL + "/json/helloptr",
Create: func() (httprc.Resource, error) {
return httprc.NewResource[*Hello](srv.URL+"/json/helloptr", httprc.JSONTransformer[*Hello]())
},
Expected: &Hello{Hello: "world"},
Expected: &Hello{Hello: "world"},
Expected2: &Hello{Hello: "world2"},
},
{
URL: srv.URL + "/json/hello",
Create: func() (httprc.Resource, error) {
return httprc.NewResource[Hello](srv.URL+"/json/hello", httprc.JSONTransformer[Hello]())
},
Expected: Hello{Hello: "world"},
Expected: Hello{Hello: "world"},
Expected2: Hello{Hello: "world2"},
},
{
URL: srv.URL + "/json/hellomap",
Create: func() (httprc.Resource, error) {
return httprc.NewResource[map[string]interface{}](srv.URL+"/json/hellomap", httprc.JSONTransformer[map[string]interface{}]())
},
Expected: map[string]interface{}{"hello": "world"},
Expected: map[string]interface{}{"hello": "world"},
Expected2: map[string]interface{}{"hello": "world2"},
},
{
URL: srv.URL + "/int",
Expand Down Expand Up @@ -117,11 +133,21 @@ func TestClient(t *testing.T) {
})
}

time.Sleep(5 * time.Second)
for _, tc := range testcases {
t.Run("Lookup "+tc.URL, func(t *testing.T) {
r, err := ctrl.Lookup(ctx, tc.URL)
require.NoError(t, err, `ctrl.Lookup should succeed`)
require.Equal(t, tc.URL, r.URL(), `r.URL should return expected value`)

var dst interface{}
require.NoError(t, r.Get(&dst), `r.Get should succeed`)

expected := tc.Expected2
if expected == nil {
expected = tc.Expected
}
require.Equal(t, dst, expected, `r.Resource should return expected value`)

Check failure on line 150 in httprc_test.go

View workflow job for this annotation

GitHub Actions / lint

expected-actual: need to reverse actual and expected values (testifylint)
})
}
}
Expand Down
19 changes: 17 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,27 @@ type newResourceOption struct {

func (newResourceOption) newResourceOption() {}

type NewClientResourceOption interface {
option.Interface
newResourceOption()
newClientOption()
}

type newClientResourceOption struct {
option.Interface
}

func (newClientResourceOption) newResourceOption() {}
func (newClientResourceOption) newClientOption() {}

type identHTTPClient struct{}

// WithHTTPClient specifies the HTTP client to use for the client.
// If not specified, the client will use http.DefaultClient.
func WithHTTPClient(cl HTTPClient) NewResourceOption {
return newResourceOption{option.New(identHTTPClient{}, cl)}
//
// This option can be passed to NewClient or NewResource.
func WithHTTPClient(cl HTTPClient) NewClientResourceOption {
return newClientResourceOption{option.New(identHTTPClient{}, cl)}
}

type identMinimumInterval struct{}
Expand Down
51 changes: 42 additions & 9 deletions resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type ResourceBase[T any] struct {
// This function will return an error if the URL is not a valid URL
// (i.e. it cannot be parsed by url.Parse), or if the transformer is nil.
func NewResource[T any](s string, transformer Transformer[T], options ...NewResourceOption) (*ResourceBase[T], error) {
var httpcl HTTPClient = http.DefaultClient
var httpcl HTTPClient
var interval time.Duration
minInterval := defaultMinInterval
//nolint:forcetypeassert
Expand Down Expand Up @@ -68,6 +68,9 @@ func NewResource[T any](s string, transformer Transformer[T], options ...NewReso
interval: interval,
ready: make(chan struct{}),
}
if httpcl != nil {
r.httpcl = httpcl
}
r.minInterval.Store(int64(minInterval))
r.SetNext(time.Unix(0, 0)) // initially, it should be fetched immediately
return r, nil
Expand Down Expand Up @@ -111,8 +114,14 @@ func (r *ResourceBase[T]) Get(dst interface{}) error {
// If you would rather wait until the resource is fetched, you can use the
// `Ready()` method to wait until the resource is ready (i.e. fetched at least once).
func (r *ResourceBase[T]) Resource() T {
//nolint:forcetypeassert
return r.r.Load().(T)
v := r.r.Load()
switch v := v.(type) {
case T:
return v
default:
var zero T
return zero
}
}

func (r *ResourceBase[T]) Next() time.Time {
Expand Down Expand Up @@ -170,22 +179,40 @@ func traceSinkFromContext(ctx context.Context) TraceSink {
return tracesink.Nop{}
}

type httpClientKey struct{}

func withHTTPClient(ctx context.Context, cl HTTPClient) context.Context {
return context.WithValue(ctx, httpClientKey{}, cl)
}

func httpClientFromContext(ctx context.Context) HTTPClient {
if v := ctx.Value(httpClientKey{}); v != nil {
//nolint:forcetypeassert
return v.(HTTPClient)
}
return http.DefaultClient
}

func (r *ResourceBase[T]) Sync(ctx context.Context) error {
traceSink := traceSinkFromContext(ctx)
httpcl := r.httpcl
if httpcl == nil {
httpcl = httpClientFromContext(ctx)
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.u, nil)
if err != nil {
return fmt.Errorf(`httprc.Resource.Sync: failed to create request: %w`, err)
}

traceSink.Put(ctx, fmt.Sprintf("httprc.Resource.Sync: fetching %q", r.u))
res, err := r.httpcl.Do(req)
res, err := httpcl.Do(req)
if err != nil {
return fmt.Errorf(`httprc.Resource.Sync: failed to execute HTTP request: %w`, err)
}
defer res.Body.Close()

next := calculateNextRefreshTime(res, r.interval, r.MinimumInterval())
next := calculateNextRefreshTime(ctx, traceSink, res, r.interval, r.MinimumInterval())
traceSink.Put(ctx, fmt.Sprintf("httprc.Resource.Sync: next refresh time for %q is %v", r.u, next))
r.SetNext(next)

Expand All @@ -206,7 +233,9 @@ func (r *ResourceBase[T]) Sync(ctx context.Context) error {
}

traceSink.Put(ctx, fmt.Sprintf("httprc.Resource.Sync: storing new value for %q", r.u))
traceSink.Put(ctx, fmt.Sprintf("before store: %#v", r.r.Load()))
r.r.Store(v)
traceSink.Put(ctx, fmt.Sprintf("after store: %#v", r.r.Load()))
r.once.Do(func() { close(r.ready) })
return nil
}
Expand All @@ -222,9 +251,10 @@ func (r *ResourceBase[T]) transform(ctx context.Context, res *http.Response) (re
return r.t.Transform(ctx, res)
}

func calculateNextRefreshTime(res *http.Response, interval, minInterval time.Duration) time.Time {
func calculateNextRefreshTime(ctx context.Context, traceSink TraceSink, res *http.Response, interval, minInterval time.Duration) time.Time {
now := time.Now()
if interval > 0 {
traceSink.Put(ctx, fmt.Sprintf("Explicit interval set, using value %s", interval))
return now.Add(interval)
}

Expand All @@ -234,11 +264,14 @@ func calculateNextRefreshTime(res *http.Response, interval, minInterval time.Dur
if err == nil {
maxAge, ok := dir.MaxAge()
if ok {
traceSink.Put(ctx, fmt.Sprintf("max-age header set (%d)", maxAge))
resDuration := time.Duration(maxAge) * time.Second
if resDuration > minInterval {
return now.Add(resDuration)
if resDuration >= minInterval {
traceSink.Put(ctx, fmt.Sprintf("max-age >= minimum interval, using minimum interval %s instead", minInterval))
return now.Add(minInterval)
}
return now.Add(minInterval)
traceSink.Put(ctx, "max-age < minimum interval, using max-age")
return now.Add(resDuration)
}
// fallthrough
}
Expand Down
Loading

0 comments on commit ccc0448

Please sign in to comment.