Skip to content

Commit

Permalink
implement Lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
lestrrat committed Sep 27, 2024
1 parent 8a76cef commit 42d0429
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 30 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (c *Client) Start(octx context.Context) (Controller, error) {
traceSink = proxy
}

incoming := make(chan ctrlRequest, c.numWorkers)
incoming := make(chan any, c.numWorkers)
outgoing := make(chan Resource, c.numWorkers)
syncoutgoing := make(chan synchronousRequest, c.numWorkers)
wg.Add(c.numWorkers)
Expand Down
86 changes: 57 additions & 29 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

type Controller interface {
AddResource(Resource) error
Lookup(string) (Resource, error)
RemoveResource(string) error
Refresh(string) error
ShutdownContext(context.Context) error
Expand All @@ -20,7 +21,7 @@ type controller struct {
cancel context.CancelFunc
check *time.Ticker
// incoming accepts new control requests from external sources
incoming chan ctrlRequest
incoming chan any
// outgoing sends Syncer objects to the worker pool
outgoing chan Resource

Expand Down Expand Up @@ -60,18 +61,39 @@ func (c *controller) ShutdownContext(ctx context.Context) error {
}
}

const (
addResource = iota
rmResource
refreshResource
)

type ctrlRequest struct {
op int
reply chan error
type ctrlRequest[T any] struct {
reply chan T
resource Resource
u string
}
type lookupReply struct {
r Resource
err error
}

type addRequest ctrlRequest[error]
type rmRequest ctrlRequest[error]
type refreshRequest ctrlRequest[error]
type lookupRequest ctrlRequest[lookupReply]

// Lookup returns a resource by its URL. If the resource does not exist, it
// will return an error.
//
// Unfortunately, due to the way typed parameters are handled in Go, we can only
// return a Resource object (and not a ResourceBase[T] object). This means that
// you will either need to use the `Resource.Get()` method or use a type
// assertion to obtain a `ResourceBase[T]` to get to the actual object you are
// looking for
func (c *controller) Lookup(u string) (Resource, error) {
// to avoid having to acquire locks, we do this asynchronously
reply := make(chan lookupReply, 1)
c.incoming <- lookupRequest{
reply: reply,
u: u,
}
r := <-reply
return r.r, r.err
}

// AddResource adds a new resource to the controller. If the resource already
// exists, it will return an error.
Expand All @@ -81,8 +103,7 @@ func (c *controller) AddResource(r Resource) error {
}

reply := make(chan error, 1)
c.incoming <- ctrlRequest{
op: addResource,
c.incoming <- addRequest{
reply: reply,
resource: r,
}
Expand All @@ -93,8 +114,7 @@ func (c *controller) AddResource(r Resource) error {
// not exist, it will return an error.
func (c *controller) RemoveResource(u string) error {
reply := make(chan error, 1)
c.incoming <- ctrlRequest{
op: rmResource,
c.incoming <- rmRequest{
reply: reply,
u: u,
}
Expand All @@ -107,17 +127,16 @@ func (c *controller) RemoveResource(u string) error {
// This function is synchronous, and will block until the resource has been refreshed.
func (c *controller) Refresh(u string) error {
reply := make(chan error, 1)
c.incoming <- ctrlRequest{
op: refreshResource,
c.incoming <- refreshRequest{
reply: reply,
u: u,
}
return <-reply
}

func (c *controller) handleRequest(ctx context.Context, req ctrlRequest) {
switch req.op {
case addResource:
func (c *controller) handleRequest(ctx context.Context, req any) {
switch req := req.(type) {
case addRequest:
r := req.resource
for _, item := range c.items {
if item.URL() == r.URL() {
Expand All @@ -128,7 +147,7 @@ func (c *controller) handleRequest(ctx context.Context, req ctrlRequest) {
}

c.items = append(c.items, r)
sendReply(ctx, req.reply, nil)
closeReply(req.reply)

// force the next check to happen immediately
if d := r.ConstantInterval(); d > 0 {
Expand All @@ -138,7 +157,7 @@ func (c *controller) handleRequest(ctx context.Context, req ctrlRequest) {
}

c.check.Reset(time.Nanosecond)
case rmResource:
case rmRequest:
u := req.u
minInterval := oneDay
loc := -1
Expand All @@ -158,9 +177,9 @@ func (c *controller) handleRequest(ctx context.Context, req ctrlRequest) {
}

c.items = slices.Delete(c.items, loc, loc+1)
sendReply(ctx, req.reply, nil)
closeReply[error](req.reply)
c.check.Reset(minInterval)
case refreshResource:
case refreshRequest:
u := req.u
for _, item := range c.items {
if item.URL() != u {
Expand All @@ -174,6 +193,15 @@ func (c *controller) handleRequest(ctx context.Context, req ctrlRequest) {
return
}
sendReply(ctx, req.reply, errResourceNotFound)
case lookupRequest:
u := req.u
for _, item := range c.items {
if item.URL() == u {
sendReply(ctx, req.reply, lookupReply{r: item})
return
}
}
sendReply(ctx, req.reply, lookupReply{err: errResourceNotFound})
}
}

Expand All @@ -193,15 +221,15 @@ func sendWorkerSynchronous(ctx context.Context, ch chan synchronousRequest, r sy
}
}

func sendReply(ctx context.Context, ch chan error, err error) {
defer close(ch)
if err == nil {
return
}
func closeReply[T any](ch chan T) {
close(ch)
}

func sendReply[T any](ctx context.Context, ch chan T, v T) {
defer closeReply[T](ch)
select {
case <-ctx.Done():
case ch <- err:
case ch <- v:
}
}

Expand Down
8 changes: 8 additions & 0 deletions httprc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ func TestClient(t *testing.T) {
require.Equal(t, tc.Expected, dst, `r.Get should return expected value`)
})
}

for _, tc := range testcases {
t.Run("Lookup "+tc.URL, func(t *testing.T) {
r, err := ctrl.Lookup(tc.URL)
require.NoError(t, err, `ctrl.Lookup should succeed`)
require.Equal(t, tc.URL, r.URL(), `r.URL should return expected value`)
})
}
}

func TestRefresh(t *testing.T) {
Expand Down

0 comments on commit 42d0429

Please sign in to comment.