diff --git a/integration/helpers/helpers.go b/integration/helpers/helpers.go index 37fa6876d2255..0896621e5fb35 100644 --- a/integration/helpers/helpers.go +++ b/integration/helpers/helpers.go @@ -132,10 +132,7 @@ func ExternalSSHCommand(o CommandOptions) (*exec.Cmd, error) { } // Create an exec.Command and tell it where to find the SSH agent. - cmd, err := exec.Command(sshpath, execArgs...), nil - if err != nil { - return nil, trace.Wrap(err) - } + cmd := exec.Command(sshpath, execArgs...) cmd.Env = []string{fmt.Sprintf("SSH_AUTH_SOCK=%v", o.SocketPath)} return cmd, nil diff --git a/integration/helpers/instance.go b/integration/helpers/instance.go index 275837306b9d3..cdff68ef04e2a 100644 --- a/integration/helpers/instance.go +++ b/integration/helpers/instance.go @@ -45,8 +45,10 @@ import ( "github.com/gravitational/teleport/api/breaker" clientproto "github.com/gravitational/teleport/api/client/proto" + apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/utils/keys" + "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/auth/keygen" "github.com/gravitational/teleport/lib/auth/state" @@ -1765,3 +1767,51 @@ func (i *TeleInstance) StopAll() error { i.Log.Infof("Stopped all teleport services for site %q", i.Secrets.SiteName) return trace.NewAggregate(errors...) } + +// WaitForNodeCount waits for a certain number of nodes in the provided cluster +// to be visible to the Proxy. This should be called prior to any client dialing +// of nodes to be sure that the node is registered and routable. +func (i *TeleInstance) WaitForNodeCount(ctx context.Context, cluster string, count int) error { + const ( + deadline = time.Second * 30 + iterWaitTime = time.Second + ) + + err := retryutils.RetryStaticFor(deadline, iterWaitTime, func() error { + site, err := i.Tunnel.GetSite(cluster) + if err != nil { + return trace.Wrap(err) + } + + // Validate that the site cache contains the expected count. + accessPoint, err := site.CachingAccessPoint() + if err != nil { + return trace.Wrap(err) + } + + nodes, err := accessPoint.GetNodes(ctx, apidefaults.Namespace) + if err != nil { + return trace.Wrap(err) + } + if len(nodes) != count { + return trace.BadParameter("cache contained %v nodes, but wanted to find %v nodes", len(nodes), count) + } + + // Validate that the site watcher contains the expected count. + watcher, err := site.NodeWatcher() + if err != nil { + return trace.Wrap(err) + } + + if watcher.ResourceCount() != count { + return trace.BadParameter("node watcher contained %v nodes, but wanted to find %v nodes", watcher.ResourceCount(), count) + } + + return nil + }) + if err != nil { + return trace.Wrap(err) + } + + return nil +} diff --git a/integration/helpers/trustedclusters.go b/integration/helpers/trustedclusters.go index a883fb8635a9e..1b3f43b61507c 100644 --- a/integration/helpers/trustedclusters.go +++ b/integration/helpers/trustedclusters.go @@ -30,9 +30,7 @@ import ( "github.com/stretchr/testify/require" "github.com/gravitational/teleport" - "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/auth" "github.com/gravitational/teleport/lib/reversetunnelclient" ) @@ -112,37 +110,6 @@ func WaitForClusters(tun reversetunnelclient.Server, expected int) func() bool { } } -// WaitForNodeCount waits for a certain number of nodes to show up in the remote site. -func WaitForNodeCount(ctx context.Context, t *TeleInstance, clusterName string, count int) error { - const ( - deadline = time.Second * 30 - iterWaitTime = time.Second - ) - - err := retryutils.RetryStaticFor(deadline, iterWaitTime, func() error { - remoteSite, err := t.Tunnel.GetSite(clusterName) - if err != nil { - return trace.Wrap(err) - } - accessPoint, err := remoteSite.CachingAccessPoint() - if err != nil { - return trace.Wrap(err) - } - nodes, err := accessPoint.GetNodes(ctx, defaults.Namespace) - if err != nil { - return trace.Wrap(err) - } - if len(nodes) == count { - return nil - } - return trace.BadParameter("found %v nodes, but wanted to find %v nodes", len(nodes), count) - }) - if err != nil { - return trace.Wrap(err) - } - return nil -} - // WaitForActiveTunnelConnections waits for remote cluster to report a minimum number of active connections func WaitForActiveTunnelConnections(t *testing.T, tunnel reversetunnelclient.Server, clusterName string, expectedCount int) { require.EventuallyWithT(t, func(t *assert.CollectT) { diff --git a/integration/integration_test.go b/integration/integration_test.go index b00e47d7fa404..6cb45c1d6c403 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -442,27 +442,9 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { ctx := context.Background() - // wait 10 seconds for both nodes to show up, otherwise + // wait for both nodes to show up, otherwise // we'll have trouble connecting to the node below. - waitForNodes := func(site authclient.ClientI, count int) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - for { - select { - case <-tickCh: - nodesInSite, err := site.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - if got, want := len(nodesInSite), count; got == want { - return nil - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find %v nodes", count) - } - } - } - err = waitForNodes(site, 2) + err = teleport.WaitForNodeCount(ctx, helpers.Site, 2) require.NoError(t, err) // should have no sessions: @@ -796,8 +778,6 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) { teleportSvr := suite.newTeleport(t, nil, true) defer teleportSvr.StopAll() - site := teleportSvr.GetSiteAPI(helpers.Site) - // addNode adds a node to the teleport instance, returning its uuid. // All nodes added this way have the same hostname. addNode := func() (string, error) { @@ -819,36 +799,11 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) { uuid1, err := addNode() require.NoError(t, err) - uuid2, err := addNode() + _, err = addNode() require.NoError(t, err) - // wait up to 10 seconds for supplied node names to show up. - waitForNodes := func(site authclient.ClientI, nodes ...string) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - Outer: - for _, nodeName := range nodes { - for { - select { - case <-tickCh: - nodesInSite, err := site.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - for _, node := range nodesInSite { - if node.GetName() == nodeName { - continue Outer - } - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find node %s", nodeName) - } - } - } - return nil - } - - err = waitForNodes(site, uuid1, uuid2) + // wait for supplied node names to show up. + err = teleportSvr.WaitForNodeCount(ctx, helpers.Site, 3) require.NoError(t, err) // attempting to run a command by hostname should generate NodeIsAmbiguous error. @@ -2150,7 +2105,8 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT tc.concurrentConns = 1 } - waitForNodesToRegister(t, teleport, helpers.Site) + err = teleport.WaitForNodeCount(ctx, helpers.Site, 1) + require.NoError(t, err) asyncErrors := make(chan error, 1) @@ -2169,7 +2125,11 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT tc.clientConfigOpts(&cc) } cl, err := teleport.NewClient(cc) - require.NoError(t, err) + if err != nil { + asyncErrors <- err + return + } + cl.Stdout = person cl.Stdin = person @@ -3140,6 +3100,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus cmd := []string{"echo", "hello world"} + // Wait for nodes to be visible before attempting connections + err = main.WaitForNodeCount(ctx, clusterAux, 2) + require.NoError(t, err) + // Try and connect to a node in the Aux cluster from the Main cluster using // direct dialing. creds, err := helpers.GenerateUserCreds(helpers.UserCredsRequest{ @@ -3225,6 +3189,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus require.Eventually(t, helpers.WaitForClusters(main.Tunnel, 1), 10*time.Second, 1*time.Second, "Two clusters do not see each other: tunnels are not working.") + // Wait for nodes to be visible before attempting connections + err = main.WaitForNodeCount(ctx, clusterAux, 2) + require.NoError(t, err) + // connection and client should recover and work again output = &bytes.Buffer{} tc.Stdout = output @@ -3631,7 +3599,7 @@ func testTrustedTunnelNode(t *testing.T, suite *integrationTestSuite) { "Two clusters do not see each other: tunnels are not working.") // Wait for both nodes to show up before attempting to dial to them. - err = helpers.WaitForNodeCount(ctx, main, clusterAux, 2) + err = main.WaitForNodeCount(ctx, clusterAux, 2) require.NoError(t, err) cmd := []string{"echo", "hello world"} @@ -4027,7 +3995,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) { helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1) helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1) - waitForNodesToRegister(t, main, "cluster-remote") + err = main.WaitForNodeCount(ctx, "cluster-remote", 1) + require.NoError(t, err) // execute the connection via first proxy cfg := helpers.ClientConfig{ @@ -4078,7 +4047,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) { helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1) helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1) - waitForNodesToRegister(t, main, "cluster-remote") + err = main.WaitForNodeCount(ctx, "cluster-remote", 1) + require.NoError(t, err) // Requests going via main proxy should succeed. output, err = runCommand(t, main, []string{"echo", "hello world"}, cfg, 1) @@ -4860,11 +4830,8 @@ func testProxyHostKeyCheck(t *testing.T, suite *integrationTestSuite) { require.NoError(t, err) // Wait for the node to be visible before continuing. - require.EventuallyWithT(t, func(t *assert.CollectT) { - found, err := clt.GetNodes(context.Background(), defaults.Namespace) - assert.NoError(t, err) - assert.Len(t, found, 2) - }, 10*time.Second, 100*time.Millisecond) + err = instance.WaitForNodeCount(context.Background(), helpers.Site, 2) + require.NoError(t, err) _, err = runCommand(t, instance, []string{"echo hello"}, clientConfig, 1) @@ -5957,27 +5924,9 @@ func testList(t *testing.T, suite *integrationTestSuite) { clt := teleport.GetSiteAPI(helpers.Site) require.NotNil(t, clt) - // Wait 10 seconds for both nodes to show up to make sure they both have + // Wait for both nodes to show up to make sure they both have // registered themselves. - waitForNodes := func(clt authclient.ClientI, count int) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - for { - select { - case <-tickCh: - nodesInCluster, err := clt.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - if got, want := len(nodesInCluster), count; got == want { - return nil - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find %v nodes", count) - } - } - } - err = waitForNodes(clt, 2) + err = teleport.WaitForNodeCount(ctx, helpers.Site, 2) require.NoError(t, err) tests := []struct { @@ -6159,22 +6108,6 @@ func testCmdLabels(t *testing.T, suite *integrationTestSuite) { } } -func waitForNodesToRegister(t *testing.T, teleport *helpers.TeleInstance, site string) { - t.Helper() - require.EventuallyWithT(t, func(t *assert.CollectT) { - // once the tunnel is established we need to wait until we have a - // connection to the remote auth - site := teleport.GetSiteAPI(site) - if !assert.NotNil(t, site) { - return - } - // we need to wait until we know about the node because direct dial to - // unregistered servers is no longer supported - _, err := site.GetNode(context.Background(), defaults.Namespace, teleport.Config.HostUUID) - assert.NoError(t, err) - }, time.Second*30, 250*time.Millisecond) -} - // TestDataTransfer makes sure that a "session.data" event is emitted at the // end of a session that matches the amount of data that was transferred. func testDataTransfer(t *testing.T, suite *integrationTestSuite) { @@ -6188,6 +6121,9 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) { main := suite.newTeleport(t, nil, true) defer main.StopAll() + err := main.WaitForNodeCount(context.Background(), helpers.Site, 1) + require.NoError(t, err) + // Create a client to the above Teleport cluster. clientConfig := helpers.ClientConfig{ Login: suite.Me.Username, @@ -6196,8 +6132,6 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) { Port: helpers.Port(t, main.SSH), } - waitForNodesToRegister(t, main, helpers.Site) - // Write 1 MB to stdout. command := []string{"dd", "if=/dev/zero", "bs=1024", "count=1024"} output, err := runCommand(t, main, command, clientConfig, 1) @@ -7156,6 +7090,7 @@ func (s *integrationTestSuite) defaultServiceConfig() *servicecfg.Config { cfg.Log = s.Log cfg.CircuitBreakerConfig = breaker.NoopBreakerConfig() cfg.InstanceMetadataClient = imds.NewDisabledIMDSClient() + cfg.DebugService.Enabled = false return cfg } @@ -7779,7 +7714,8 @@ func testModeratedSFTP(t *testing.T, suite *integrationTestSuite) { _, err = authServer.CreateUser(ctx, moderatorUser) require.NoError(t, err) - waitForNodesToRegister(t, instance, helpers.Site) + err = instance.WaitForNodeCount(context.Background(), helpers.Site, 1) + require.NoError(t, err) // Start a shell so a moderated session is created peerClient, err := instance.NewClient(helpers.ClientConfig{ @@ -8037,7 +7973,8 @@ func testSFTP(t *testing.T, suite *integrationTestSuite) { teleport.StopAll() }) - waitForNodesToRegister(t, teleport, helpers.Site) + err := teleport.WaitForNodeCount(context.Background(), helpers.Site, 1) + require.NoError(t, err) teleportClient, err := teleport.NewClient(helpers.ClientConfig{ Login: suite.Me.Username, diff --git a/integration/proxy/proxy_helpers.go b/integration/proxy/proxy_helpers.go index 422695a363d45..e0e3c7b587224 100644 --- a/integration/proxy/proxy_helpers.go +++ b/integration/proxy/proxy_helpers.go @@ -216,7 +216,7 @@ func (p *Suite) addNodeToLeafCluster(t *testing.T, tunnelNodeHostname string) { "Two clusters do not see each other: tunnels are not working.") // Wait for both nodes to show up before attempting to dial to them. - err = helpers.WaitForNodeCount(context.Background(), p.root, p.leaf.Secrets.SiteName, 2) + err = p.root.WaitForNodeCount(context.Background(), p.leaf.Secrets.SiteName, 2) require.NoError(t, err) } diff --git a/integration/proxy/proxy_test.go b/integration/proxy/proxy_test.go index 43d6254911a3b..0dcf986d70109 100644 --- a/integration/proxy/proxy_test.go +++ b/integration/proxy/proxy_test.go @@ -1614,7 +1614,7 @@ func TestALPNProxyHTTPProxyNoProxyDial(t *testing.T) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) defer cancel() - err = helpers.WaitForNodeCount(ctx, rc, "root.example.com", 1) + err = rc.WaitForNodeCount(ctx, "root.example.com", 1) require.NoError(t, err) require.Zero(t, ph.Count()) @@ -1624,7 +1624,7 @@ func TestALPNProxyHTTPProxyNoProxyDial(t *testing.T) { require.NoError(t, os.Unsetenv("no_proxy")) _, err = rc.StartNode(makeNodeConfig("second-root-node", rcProxyAddr)) require.NoError(t, err) - err = helpers.WaitForNodeCount(ctx, rc, "root.example.com", 2) + err = rc.WaitForNodeCount(ctx, "root.example.com", 2) require.NoError(t, err) require.NotZero(t, ph.Count()) @@ -1723,7 +1723,7 @@ func TestALPNProxyHTTPProxyBasicAuthDial(t *testing.T) { startErrC <- err }() require.NoError(t, <-startErrC) - require.NoError(t, helpers.WaitForNodeCount(context.Background(), rc, rc.Secrets.SiteName, 1)) + require.NoError(t, rc.WaitForNodeCount(context.Background(), rc.Secrets.SiteName, 1)) require.Greater(t, ph.Count(), 0) }