diff --git a/derp/derp_client.go b/derp/derp_client.go index c2e7337791b60..dcee68cad78ce 100644 --- a/derp/derp_client.go +++ b/derp/derp_client.go @@ -121,6 +121,8 @@ func newClient(privateKey key.NodePrivate, nc Conn, brw *bufio.ReadWriter, logf return c, nil } +func (c *Client) PublicKey() key.NodePublic { return c.publicKey } + func (c *Client) recvServerKey() error { var buf [40]byte t, flen, err := readFrame(c.br, 1<<10, buf[:]) diff --git a/derp/derp_test.go b/derp/derp_test.go index 72de265529ad1..9185194dd79cf 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -1311,6 +1311,72 @@ func TestLimiter(t *testing.T) { } } +// BenchmarkConcurrentStreams exercises mutex contention on a +// single Server instance with multiple concurrent client flows. +func BenchmarkConcurrentStreams(b *testing.B) { + serverPrivateKey := key.NewNode() + s := NewServer(serverPrivateKey, logger.Discard) + defer s.Close() + + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + b.Fatal(err) + } + defer ln.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for ctx.Err() == nil { + connIn, err := ln.Accept() + if err != nil { + if ctx.Err() != nil { + return + } + b.Error(err) + return + } + + brwServer := bufio.NewReadWriter(bufio.NewReader(connIn), bufio.NewWriter(connIn)) + go s.Accept(ctx, connIn, brwServer, "test-client") + } + }() + + newClient := func(t testing.TB) *Client { + t.Helper() + connOut, err := net.Dial("tcp", ln.Addr().String()) + if err != nil { + b.Fatal(err) + } + t.Cleanup(func() { connOut.Close() }) + + k := key.NewNode() + + brw := bufio.NewReadWriter(bufio.NewReader(connOut), bufio.NewWriter(connOut)) + client, err := NewClient(k, connOut, brw, logger.Discard) + if err != nil { + b.Fatalf("client: %v", err) + } + return client + } + + b.RunParallel(func(pb *testing.PB) { + c1, c2 := newClient(b), newClient(b) + const packetSize = 100 + msg := make([]byte, packetSize) + for pb.Next() { + if err := c1.Send(c2.PublicKey(), msg); err != nil { + b.Fatal(err) + } + _, err := c2.Recv() + if err != nil { + return + } + } + }) +} + func BenchmarkSendRecv(b *testing.B) { for _, size := range []int{10, 100, 1000, 10000} { b.Run(fmt.Sprintf("msgsize=%d", size), func(b *testing.B) { benchmarkSendRecvSize(b, size) })