Skip to content

Commit

Permalink
Revert recent API change to TunnelServiceHandler (#13)
Browse files Browse the repository at this point in the history
Let's not make any API changes until after we get flow control implemented and released.
  • Loading branch information
jhump authored Oct 19, 2023
1 parent f912134 commit 695f991
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
2 changes: 1 addition & 1 deletion internal/cmd/tunneltestsvr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
return vals[0]
},
})
tunnelpb.RegisterTunnelServiceServer(svr, tunnelSvc)
tunnelpb.RegisterTunnelServiceServer(svr, tunnelSvc.Service())
gen.RegisterTunnelTestServiceServer(svr, &tunnelTester{tunnelSvc: tunnelSvc})

// Over the tunnel, we just expose this simple test service
Expand Down
31 changes: 25 additions & 6 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
//
// See NewTunnelServiceHandler.
type TunnelServiceHandler struct {
tunnelpb.UnimplementedTunnelServiceServer

handlers grpchan.HandlerMap
noReverseTunnels bool
onReverseTunnelConnect func(TunnelChannel)
Expand Down Expand Up @@ -96,6 +94,14 @@ func (s *TunnelServiceHandler) RegisterService(desc *grpc.ServiceDesc, srv inter
s.handlers.RegisterService(desc, srv)
}

// Service returns the actual tunnel service implementation to register with a
// [grpc.ServiceRegistrar].
func (s *TunnelServiceHandler) Service() tunnelpb.TunnelServiceServer {
return &tunnelServiceHandler{
h: s,
}
}

// InitiateShutdown starts the graceful shutdown process and returns
// immediately. This should be called when the server wants to shut down. This
// complements the normal process initiated by calling the GracefulStop method
Expand All @@ -107,10 +113,10 @@ func (s *TunnelServiceHandler) InitiateShutdown() {
s.stopping.Store(true)
}

// OpenTunnel creates a forward tunnel from the RPC client to this server. Any
// openTunnel creates a forward tunnel from the RPC client to this server. Any
// services registered with this handler will be accessible to RPCs issued over
// the tunnel.
func (s *TunnelServiceHandler) OpenTunnel(stream tunnelpb.TunnelService_OpenTunnelServer) error {
func (s *TunnelServiceHandler) openTunnel(stream tunnelpb.TunnelService_OpenTunnelServer) error {
if len(s.handlers) == 0 {
return status.Error(codes.Unimplemented, "forward tunnels not supported")
}
Expand All @@ -126,12 +132,12 @@ func (s *TunnelServiceHandler) OpenTunnel(stream tunnelpb.TunnelService_OpenTunn
return serveTunnel(stream, md, s.handlers, s.stopping.Load)
}

// OpenReverseTunnel creates a reverse tunnel from this server to the RPC client.
// openReverseTunnel creates a reverse tunnel from this server to the RPC client.
// This handler can be used as an RPC client connection, via AsChannel and
// KeyAsChannel, to send RPCs to the client on the other end of the reverse
// tunnel. The RPC client acts as the server and uses NewReverseTunnelServer to
// create a server and register exposed RPC services with it.
func (s *TunnelServiceHandler) OpenReverseTunnel(stream tunnelpb.TunnelService_OpenReverseTunnelServer) error {
func (s *TunnelServiceHandler) openReverseTunnel(stream tunnelpb.TunnelService_OpenReverseTunnelServer) error {
if s.noReverseTunnels {
return status.Error(codes.Unimplemented, "reverse tunnels not supported")
}
Expand Down Expand Up @@ -183,6 +189,19 @@ func (s *TunnelServiceHandler) unregister(ch *tunnelChannel) {
}
}

type tunnelServiceHandler struct {
tunnelpb.UnimplementedTunnelServiceServer
h *TunnelServiceHandler
}

func (s *tunnelServiceHandler) OpenTunnel(stream tunnelpb.TunnelService_OpenTunnelServer) error {
return s.h.openTunnel(stream)
}

func (s *tunnelServiceHandler) OpenReverseTunnel(stream tunnelpb.TunnelService_OpenReverseTunnelServer) error {
return s.h.openReverseTunnel(stream)
}

type reverseChannels struct {
mu sync.Mutex
avail chan struct{}
Expand Down
8 changes: 4 additions & 4 deletions tunnel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ func TestTunnelServiceHandler(t *testing.T) {
grpchantesting.RegisterTestServiceServer(ts, &svr)
// recursive: tunnels can be run on top of tunnels
// (not realistic, but fun exercise to verify soundness of protocol)
tunnelpb.RegisterTunnelServiceServer(ts, ts)
tunnelpb.RegisterTunnelServiceServer(ts, ts.Service())

l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to listen")
gs := grpc.NewServer()
tunnelpb.RegisterTunnelServiceServer(gs, ts)
tunnelpb.RegisterTunnelServiceServer(gs, ts.Service())
serveDone := make(chan struct{})
go func() {
defer close(serveDone)
Expand Down Expand Up @@ -104,7 +104,7 @@ func runTests(ctx context.Context, t *testing.T, nested bool, cli tunnelpb.Tunne
revSvr := NewReverseTunnelServer(cli)
if !nested {
// we need this to run the nested/recursive tunnel test
tunnelpb.RegisterTunnelServiceServer(revSvr, ts)
tunnelpb.RegisterTunnelServiceServer(revSvr, ts.Service())
}
grpchantesting.RegisterTestServiceServer(revSvr, testSvr)
serveDone := make(chan struct{})
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestTunnelServiceHandler_Concurrency(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to listen")
gs := grpc.NewServer()
tunnelpb.RegisterTunnelServiceServer(gs, ts)
tunnelpb.RegisterTunnelServiceServer(gs, ts.Service())
serveDone := make(chan struct{})
go func() {
defer close(serveDone)
Expand Down

0 comments on commit 695f991

Please sign in to comment.