From f8d626b9f43ac83184af53a5cc06524e36f4c118 Mon Sep 17 00:00:00 2001 From: Tim Ross Date: Thu, 5 Dec 2024 17:11:00 -0500 Subject: [PATCH] Attempt to reduce flakiness of integration tests Closes #47156. All of the tests suffering from issues dialing hosts, and failing with a `failed to dial target host` error were incorrectly waiting for nodes to become visible before establishing connections. The main culprit for most of the failures was `waitForNodesToRegister`, though a few tests had a very similar hand rolled variant, which incorrectly returned when the nodes appeard in Auth. However, since the Proxy is the one performing dialing, they should have waited for the nodes to appear in the Proxy. To resolve, `waitForNodesToRegister` and all hand rolled equivalents have been removed in favor of `helpers.WaitForNodeCount` which correctly uses the `CachingAccessPoint` of the RemoteSite instead of `GetClient`. Additionally, `helpers.WaitForNodeCount` was updated to validate that the node watcher used for routing in the Proxy also contained the expected number of nodes. --- integration/helpers/trustedclusters.go | 20 +++- integration/integration_test.go | 135 +++++++------------------ 2 files changed, 53 insertions(+), 102 deletions(-) diff --git a/integration/helpers/trustedclusters.go b/integration/helpers/trustedclusters.go index a883fb8635a9e..790aabdb78957 100644 --- a/integration/helpers/trustedclusters.go +++ b/integration/helpers/trustedclusters.go @@ -124,18 +124,32 @@ func WaitForNodeCount(ctx context.Context, t *TeleInstance, clusterName string, if err != nil { return trace.Wrap(err) } + + // Validate that the site cache contains the expected count. accessPoint, err := remoteSite.CachingAccessPoint() if err != nil { return trace.Wrap(err) } + nodes, err := accessPoint.GetNodes(ctx, defaults.Namespace) if err != nil { return trace.Wrap(err) } - if len(nodes) == count { - return nil + if len(nodes) != count { + return trace.BadParameter("cache contained %v nodes, but wanted to find %v nodes", len(nodes), count) } - return trace.BadParameter("found %v nodes, but wanted to find %v nodes", len(nodes), count) + + // Validate that the site watcher contains the expected count. + watcher, err := remoteSite.NodeWatcher() + if err != nil { + return trace.Wrap(err) + } + + if watcher.ResourceCount() != count { + return trace.BadParameter("node watcher contained %v nodes, but wanted to find %v nodes", watcher.ResourceCount(), count) + } + + return nil }) if err != nil { return trace.Wrap(err) diff --git a/integration/integration_test.go b/integration/integration_test.go index b00e47d7fa404..cfc71f959eb1f 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -442,27 +442,9 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { ctx := context.Background() - // wait 10 seconds for both nodes to show up, otherwise + // wait for both nodes to show up, otherwise // we'll have trouble connecting to the node below. - waitForNodes := func(site authclient.ClientI, count int) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - for { - select { - case <-tickCh: - nodesInSite, err := site.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - if got, want := len(nodesInSite), count; got == want { - return nil - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find %v nodes", count) - } - } - } - err = waitForNodes(site, 2) + err = helpers.WaitForNodeCount(ctx, teleport, helpers.Site, 2) require.NoError(t, err) // should have no sessions: @@ -796,8 +778,6 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) { teleportSvr := suite.newTeleport(t, nil, true) defer teleportSvr.StopAll() - site := teleportSvr.GetSiteAPI(helpers.Site) - // addNode adds a node to the teleport instance, returning its uuid. // All nodes added this way have the same hostname. addNode := func() (string, error) { @@ -819,36 +799,11 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) { uuid1, err := addNode() require.NoError(t, err) - uuid2, err := addNode() + _, err = addNode() require.NoError(t, err) - // wait up to 10 seconds for supplied node names to show up. - waitForNodes := func(site authclient.ClientI, nodes ...string) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - Outer: - for _, nodeName := range nodes { - for { - select { - case <-tickCh: - nodesInSite, err := site.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - for _, node := range nodesInSite { - if node.GetName() == nodeName { - continue Outer - } - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find node %s", nodeName) - } - } - } - return nil - } - - err = waitForNodes(site, uuid1, uuid2) + // wait for supplied node names to show up. + err = helpers.WaitForNodeCount(ctx, teleportSvr, helpers.Site, 3) require.NoError(t, err) // attempting to run a command by hostname should generate NodeIsAmbiguous error. @@ -2150,7 +2105,8 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT tc.concurrentConns = 1 } - waitForNodesToRegister(t, teleport, helpers.Site) + err = helpers.WaitForNodeCount(ctx, teleport, helpers.Site, 1) + require.NoError(t, err) asyncErrors := make(chan error, 1) @@ -2169,7 +2125,11 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT tc.clientConfigOpts(&cc) } cl, err := teleport.NewClient(cc) - require.NoError(t, err) + if err != nil { + asyncErrors <- err + return + } + cl.Stdout = person cl.Stdin = person @@ -3140,6 +3100,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus cmd := []string{"echo", "hello world"} + // Wait for nodes to be visible before attempting connections + err = helpers.WaitForNodeCount(ctx, main, clusterAux, 2) + require.NoError(t, err) + // Try and connect to a node in the Aux cluster from the Main cluster using // direct dialing. creds, err := helpers.GenerateUserCreds(helpers.UserCredsRequest{ @@ -3225,6 +3189,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus require.Eventually(t, helpers.WaitForClusters(main.Tunnel, 1), 10*time.Second, 1*time.Second, "Two clusters do not see each other: tunnels are not working.") + // Wait for nodes to be visible before attempting connections + err = helpers.WaitForNodeCount(ctx, main, clusterAux, 2) + require.NoError(t, err) + // connection and client should recover and work again output = &bytes.Buffer{} tc.Stdout = output @@ -4027,7 +3995,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) { helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1) helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1) - waitForNodesToRegister(t, main, "cluster-remote") + err = helpers.WaitForNodeCount(ctx, main, "cluster-remote", 1) + require.NoError(t, err) // execute the connection via first proxy cfg := helpers.ClientConfig{ @@ -4078,7 +4047,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) { helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1) helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1) - waitForNodesToRegister(t, main, "cluster-remote") + err = helpers.WaitForNodeCount(ctx, main, "cluster-remote", 1) + require.NoError(t, err) // Requests going via main proxy should succeed. output, err = runCommand(t, main, []string{"echo", "hello world"}, cfg, 1) @@ -4860,11 +4830,8 @@ func testProxyHostKeyCheck(t *testing.T, suite *integrationTestSuite) { require.NoError(t, err) // Wait for the node to be visible before continuing. - require.EventuallyWithT(t, func(t *assert.CollectT) { - found, err := clt.GetNodes(context.Background(), defaults.Namespace) - assert.NoError(t, err) - assert.Len(t, found, 2) - }, 10*time.Second, 100*time.Millisecond) + err = helpers.WaitForNodeCount(context.Background(), instance, helpers.Site, 2) + require.NoError(t, err) _, err = runCommand(t, instance, []string{"echo hello"}, clientConfig, 1) @@ -5957,27 +5924,9 @@ func testList(t *testing.T, suite *integrationTestSuite) { clt := teleport.GetSiteAPI(helpers.Site) require.NotNil(t, clt) - // Wait 10 seconds for both nodes to show up to make sure they both have + // Wait for both nodes to show up to make sure they both have // registered themselves. - waitForNodes := func(clt authclient.ClientI, count int) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - for { - select { - case <-tickCh: - nodesInCluster, err := clt.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - if got, want := len(nodesInCluster), count; got == want { - return nil - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find %v nodes", count) - } - } - } - err = waitForNodes(clt, 2) + err = helpers.WaitForNodeCount(ctx, teleport, helpers.Site, 2) require.NoError(t, err) tests := []struct { @@ -6159,22 +6108,6 @@ func testCmdLabels(t *testing.T, suite *integrationTestSuite) { } } -func waitForNodesToRegister(t *testing.T, teleport *helpers.TeleInstance, site string) { - t.Helper() - require.EventuallyWithT(t, func(t *assert.CollectT) { - // once the tunnel is established we need to wait until we have a - // connection to the remote auth - site := teleport.GetSiteAPI(site) - if !assert.NotNil(t, site) { - return - } - // we need to wait until we know about the node because direct dial to - // unregistered servers is no longer supported - _, err := site.GetNode(context.Background(), defaults.Namespace, teleport.Config.HostUUID) - assert.NoError(t, err) - }, time.Second*30, 250*time.Millisecond) -} - // TestDataTransfer makes sure that a "session.data" event is emitted at the // end of a session that matches the amount of data that was transferred. func testDataTransfer(t *testing.T, suite *integrationTestSuite) { @@ -6188,6 +6121,9 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) { main := suite.newTeleport(t, nil, true) defer main.StopAll() + err := helpers.WaitForNodeCount(context.Background(), main, helpers.Site, 1) + require.NoError(t, err) + // Create a client to the above Teleport cluster. clientConfig := helpers.ClientConfig{ Login: suite.Me.Username, @@ -6196,8 +6132,6 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) { Port: helpers.Port(t, main.SSH), } - waitForNodesToRegister(t, main, helpers.Site) - // Write 1 MB to stdout. command := []string{"dd", "if=/dev/zero", "bs=1024", "count=1024"} output, err := runCommand(t, main, command, clientConfig, 1) @@ -7156,6 +7090,7 @@ func (s *integrationTestSuite) defaultServiceConfig() *servicecfg.Config { cfg.Log = s.Log cfg.CircuitBreakerConfig = breaker.NoopBreakerConfig() cfg.InstanceMetadataClient = imds.NewDisabledIMDSClient() + cfg.DebugService.Enabled = false return cfg } @@ -7779,7 +7714,8 @@ func testModeratedSFTP(t *testing.T, suite *integrationTestSuite) { _, err = authServer.CreateUser(ctx, moderatorUser) require.NoError(t, err) - waitForNodesToRegister(t, instance, helpers.Site) + err = helpers.WaitForNodeCount(context.Background(), instance, helpers.Site, 1) + require.NoError(t, err) // Start a shell so a moderated session is created peerClient, err := instance.NewClient(helpers.ClientConfig{ @@ -8037,7 +7973,8 @@ func testSFTP(t *testing.T, suite *integrationTestSuite) { teleport.StopAll() }) - waitForNodesToRegister(t, teleport, helpers.Site) + err := helpers.WaitForNodeCount(context.Background(), teleport, helpers.Site, 1) + require.NoError(t, err) teleportClient, err := teleport.NewClient(helpers.ClientConfig{ Login: suite.Me.Username,