Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix race conditions #4074

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// Establish starts the daemon, if necessary, and returns a client to it.
func Establish(ctx context.Context, apiClient flyutil.Client) (*Client, error) {
func Establish(ctx context.Context, apiClient wireguard.WebClient) (*Client, error) {
if err := wireguard.PruneInvalidPeers(ctx, apiClient); err != nil {
return nil, err
}
Expand Down
17 changes: 10 additions & 7 deletions internal/command/deploy/machines.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,16 @@ func argsFromManifest(manifest *DeployManifest, app *fly.AppCompact) MachineDepl
}

type machineDeployment struct {
apiClient flyutil.Client
flapsClient flapsutil.FlapsClient
io *iostreams.IOStreams
colorize *iostreams.ColorScheme
app *fly.AppCompact
appConfig *appconfig.Config
img string
// apiClient is a client to use web.
apiClient webClient
// flapsClient is a client to use flaps.
flapsClient flapsutil.FlapsClient
io *iostreams.IOStreams
colorize *iostreams.ColorScheme
app *fly.AppCompact
appConfig *appconfig.Config
img string
// machineSet is this application's machines.
machineSet machine.MachineSet
releaseCommandMachine machine.MachineSet
volumes map[string][]fly.Volume
Expand Down
50 changes: 24 additions & 26 deletions internal/command/deploy/machines_deploymachinesapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,32 +809,29 @@ func (md *machineDeployment) updateUsingRollingStrategy(parentCtx context.Contex
groupsPool.Go(func(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() (err error) {
poolSize := len(coldMachines)
if poolSize >= STOPPED_MACHINES_POOL_SIZE {
poolSize = STOPPED_MACHINES_POOL_SIZE
}

if len(coldMachines) > 0 {
// for cold machines, we can update all of them at once.
// there's no need for protection against downtime since the machines are already stopped
startIdx += len(coldMachines)
return md.updateEntriesGroup(ctx, group, coldMachines, sl, startIdx-len(coldMachines), poolSize)
}

return nil
})

eg.Go(func() (err error) {
// for warm machines, we update them in chunks of size, md.maxUnavailable.
// this is to prevent downtime/low-latency during deployments
startIdx += len(warmMachines)
poolSize := md.getPoolSize(len(warmMachines))
if len(warmMachines) > 0 {
return md.updateEntriesGroup(ctx, group, warmMachines, sl, startIdx-len(warmMachines), poolSize)
}
return nil
})
coldIdx := startIdx
if len(coldMachines) > 0 {
eg.Go(func() error {
// Capping the size just in case, it may be okay to stop all of them at once.
chunk := len(coldMachines)
if chunk >= STOPPED_MACHINES_POOL_SIZE {
chunk = STOPPED_MACHINES_POOL_SIZE
}
return md.updateEntriesGroup(ctx, group, coldMachines, sl, coldIdx, chunk)
})
}
startIdx += len(coldMachines)

warmIdx := startIdx
if len(warmMachines) > 0 {
eg.Go(func() error {
// Since these machines are still receiving traffic, the chunk size here is more conservative (lower)
// then the one above.
chunk := md.getPoolSize(len(warmMachines))
return md.updateEntriesGroup(ctx, group, warmMachines, sl, warmIdx, chunk)
})
}
startIdx += len(warmMachines)

return eg.Wait()
})
Expand Down Expand Up @@ -1106,6 +1103,7 @@ func (md *machineDeployment) spawnMachineInGroup(ctx context.Context, groupName
return lm, nil
}

// resolveProcessGroupChanges returns a diff between machines
func (md *machineDeployment) resolveProcessGroupChanges() ProcessGroupsDiff {
output := ProcessGroupsDiff{
groupsToRemove: map[string]int{},
Expand Down
40 changes: 39 additions & 1 deletion internal/command/deploy/machines_deploymachinesapp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package deploy
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/superfly/fly-go"
"github.com/superfly/flyctl/internal/appconfig"
"github.com/superfly/flyctl/internal/flapsutil"
"github.com/superfly/flyctl/internal/machine"
"github.com/superfly/flyctl/internal/mock"
"github.com/superfly/flyctl/iostreams"
)

func TestUpdateExistingMachinesWRecovery(t *testing.T) {
ios, _, _, _ := iostreams.Test()
client := &mockFlapsClient{}
client.machines = []*fly.Machine{{ID: "test-machine-id"}}
client.machines = []*fly.Machine{{ID: "test-machine-id", LeaseNonce: "foobar"}}
md := &machineDeployment{
app: &fly.AppCompact{},
io: ios,
Expand All @@ -34,3 +38,37 @@ func TestUpdateExistingMachinesWRecovery(t *testing.T) {
})
assert.Error(t, err, "failed to find machine test-machine-id")
}

func TestDeployMachinesApp(t *testing.T) {
ios, _, _, _ := iostreams.Test()
client := &mockFlapsClient{}
webClient := &mock.Client{
GetAppLogsFunc: func(ctx context.Context, appName, token, region, instanceID string) (entries []fly.LogEntry, nextToken string, err error) {
return nil, "", nil
},
}
client.machines = []*fly.Machine{
{ID: "m1", LeaseNonce: "m1-lease", Config: &fly.MachineConfig{Metadata: map[string]string{fly.MachineConfigMetadataKeyFlyProcessGroup: "app"}}},
{ID: "m2", LeaseNonce: "m2-lease", Config: &fly.MachineConfig{Metadata: map[string]string{fly.MachineConfigMetadataKeyFlyProcessGroup: "app"}}},
{ID: "m3", LeaseNonce: "m3-lease", Config: &fly.MachineConfig{Metadata: map[string]string{fly.MachineConfigMetadataKeyFlyProcessGroup: "app"}}},
{ID: "m4", LeaseNonce: "m4-lease", Config: &fly.MachineConfig{Metadata: map[string]string{fly.MachineConfigMetadataKeyFlyProcessGroup: "app"}}},
}
md := &machineDeployment{
app: &fly.AppCompact{},
io: ios,
colorize: ios.ColorScheme(),
flapsClient: client,
apiClient: webClient,
strategy: "canary",
appConfig: &appconfig.Config{},
machineSet: machine.NewMachineSet(client, ios, client.machines, false),
skipSmokeChecks: true,
waitTimeout: 1 * time.Second,
}

ctx := context.Background()
ctx = iostreams.NewContext(ctx, ios)
ctx = flapsutil.NewContextWithClient(ctx, client)
err := md.deployMachinesApp(ctx)
assert.NoError(t, err)
}
54 changes: 48 additions & 6 deletions internal/command/deploy/mock_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"sync"
"time"

fly "github.com/superfly/fly-go"
Expand All @@ -22,12 +23,19 @@ type mockFlapsClient struct {
breakUncordon bool
breakSetMetadata bool
breakList bool
breakDestroy bool
breakLease bool

machines []*fly.Machine
// mu to protect the members below.
mu sync.Mutex
machines []*fly.Machine
leases map[string]struct{}
nextMachineID int
}

func (m *mockFlapsClient) AcquireLease(ctx context.Context, machineID string, ttl *int) (*fly.MachineLease, error) {
return nil, fmt.Errorf("failed to acquire lease for %s", machineID)
nonce := fmt.Sprintf("%x-lease", machineID)
return m.RefreshLease(ctx, machineID, ttl, nonce)
}

func (m *mockFlapsClient) Cordon(ctx context.Context, machineID string, nonce string) (err error) {
Expand Down Expand Up @@ -63,7 +71,10 @@ func (m *mockFlapsClient) DeleteVolume(ctx context.Context, volumeId string) (*f
}

func (m *mockFlapsClient) Destroy(ctx context.Context, input fly.RemoveMachineInput, nonce string) (err error) {
return fmt.Errorf("failed to destroy %s", input.ID)
if m.breakDestroy {
return fmt.Errorf("failed to destroy %s", input.ID)
}
return nil
}

func (m *mockFlapsClient) Exec(ctx context.Context, machineID string, in *fly.MachineExecRequest) (*fly.MachineExecResponse, error) {
Expand Down Expand Up @@ -119,10 +130,17 @@ func (m *mockFlapsClient) Kill(ctx context.Context, machineID string) (err error
}

func (m *mockFlapsClient) Launch(ctx context.Context, builder fly.LaunchMachineInput) (*fly.Machine, error) {
m.mu.Lock()
defer m.mu.Unlock()

if m.breakLaunch {
return nil, fmt.Errorf("failed to launch %s", builder.ID)
}
return &fly.Machine{}, nil
m.nextMachineID += 1
return &fly.Machine{
ID: fmt.Sprintf("%x", m.nextMachineID),
LeaseNonce: fmt.Sprintf("%x-launch-lease", m.nextMachineID),
}, nil
}

func (m *mockFlapsClient) List(ctx context.Context, state string) ([]*fly.Machine, error) {
Expand All @@ -149,11 +167,35 @@ func (m *mockFlapsClient) NewRequest(ctx context.Context, method, path string, i
}

func (m *mockFlapsClient) RefreshLease(ctx context.Context, machineID string, ttl *int, nonce string) (*fly.MachineLease, error) {
return nil, fmt.Errorf("failed to refresh lease for %s", machineID)
m.mu.Lock()
defer m.mu.Unlock()

if m.breakLease {
return nil, fmt.Errorf("failed to acquire lease for %s", machineID)
}

if m.leases == nil {
m.leases = make(map[string]struct{})
}
m.leases[machineID] = struct{}{}

return &fly.MachineLease{
Status: "success",
Data: &fly.MachineLeaseData{Nonce: nonce},
}, nil
}

func (m *mockFlapsClient) ReleaseLease(ctx context.Context, machineID, nonce string) error {
return fmt.Errorf("failed to release lease for %s", machineID)
m.mu.Lock()
defer m.mu.Unlock()

_, exists := m.leases[machineID]
if !exists {
return fmt.Errorf("failed to release lease for %s", machineID)
}
delete(m.leases, machineID)

return nil
}

func (m *mockFlapsClient) Restart(ctx context.Context, in fly.RestartMachineInput, nonce string) (err error) {
Expand Down
3 changes: 3 additions & 0 deletions internal/command/deploy/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,9 @@ func (md *machineDeployment) acquireLeases(ctx context.Context, machineTuples []
ctx, span := tracing.GetTracer().Start(ctx, "acquire_leases")

leaseGroup := errgroup.Group{}
if poolSize <= 0 {
panic("pool size must be > 0")
}
leaseGroup.SetLimit(poolSize)

for _, machineTuple := range machineTuples {
Expand Down
28 changes: 28 additions & 0 deletions internal/command/deploy/web_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package deploy

import (
"context"
"net"

fly "github.com/superfly/fly-go"
"github.com/superfly/flyctl/logs"
)

// webClient is a subset of web API that is needed for the deploy package.
type webClient interface {
AddCertificate(ctx context.Context, appName, hostname string) (*fly.AppCertificate, *fly.HostnameCheck, error)
AllocateIPAddress(ctx context.Context, appName string, addrType string, region string, org *fly.Organization, network string) (*fly.IPAddress, error)
GetIPAddresses(ctx context.Context, appName string) ([]fly.IPAddress, error)
AllocateSharedIPAddress(ctx context.Context, appName string) (net.IP, error)

LatestImage(ctx context.Context, appName string) (string, error)

CreateRelease(ctx context.Context, input fly.CreateReleaseInput) (*fly.CreateReleaseResponse, error)
UpdateRelease(ctx context.Context, input fly.UpdateReleaseInput) (*fly.UpdateReleaseResponse, error)

GetApp(ctx context.Context, appName string) (*fly.App, error)
GetOrganizationBySlug(ctx context.Context, slug string) (*fly.Organization, error)

logs.WebClient
blueGreenWebClient
}
13 changes: 12 additions & 1 deletion internal/machine/leasable_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/jpillora/backoff"
Expand Down Expand Up @@ -49,10 +50,14 @@ type leasableMachine struct {
io *iostreams.IOStreams
colorize *iostreams.ColorScheme
machine *fly.Machine
leaseNonce string
leaseRefreshCancelFunc context.CancelFunc
destroyed bool
showLogs bool

// mu protects leaseNonce. A leasableMachine shouldn't be shared between
// goroutines, but StartBackgroundLeaseRefresh breaks the rule.
mu sync.Mutex
leaseNonce string
}

// TODO: make sure the other functions handle showLogs correctly
Expand Down Expand Up @@ -466,6 +471,9 @@ func (lm *leasableMachine) AcquireLease(ctx context.Context, duration time.Durat
}

func (lm *leasableMachine) RefreshLease(ctx context.Context, duration time.Duration) error {
lm.mu.Lock()
defer lm.mu.Unlock()

seconds := int(duration.Seconds())
refreshedLease, err := lm.flapsClient.RefreshLease(ctx, lm.machine.ID, &seconds, lm.leaseNonce)
if err != nil {
Expand Down Expand Up @@ -509,6 +517,9 @@ func (lm *leasableMachine) refreshLeaseUntilCanceled(ctx context.Context, durati
}

func (lm *leasableMachine) ReleaseLease(ctx context.Context) error {
lm.mu.Lock()
defer lm.mu.Unlock()

nonce := lm.leaseNonce
lm.resetLease()
if nonce == "" {
Expand Down
17 changes: 14 additions & 3 deletions internal/statuslogger/noninteractive.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package statuslogger
import (
"fmt"
"strings"
"sync"

"github.com/superfly/flyctl/iostreams"
)

type noninteractiveLogger struct {
io *iostreams.IOStreams
lines []*noninteractiveLine
// mu protects io.
mu sync.Mutex
io *iostreams.IOStreams

logNumbers bool
showStatus bool
lines []*noninteractiveLine
}

func (nl *noninteractiveLogger) Line(i int) StatusLine {
Expand All @@ -36,7 +40,14 @@ func (line *noninteractiveLine) Log(s string) {
buf += formatIndex(line.lineNum, len(line.logger.lines)) + " "
}
buf += s
fmt.Fprintln(line.logger.io.Out, buf)

line.println(buf)
}

func (line *noninteractiveLine) println(s string) {
line.logger.mu.Lock()
defer line.logger.mu.Unlock()
fmt.Fprintln(line.logger.io.Out, s)
}

func (line *noninteractiveLine) Logf(format string, args ...interface{}) {
Expand Down
6 changes: 5 additions & 1 deletion internal/wireguard/wg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (

var cleanDNSPattern = regexp.MustCompile(`[^a-zA-Z0-9\\-]`)

type WebClient interface {
ValidateWireGuardPeers(ctx context.Context, peerIPs []string) (invalid []string, err error)
}

func generatePeerName(ctx context.Context, apiClient flyutil.Client) (string, error) {
user, err := apiClient.GetCurrentUser(ctx)
if err != nil {
Expand Down Expand Up @@ -181,7 +185,7 @@ func setWireGuardStateForOrg(ctx context.Context, orgSlug, network string, s *wg
return setWireGuardState(ctx, states)
}

func PruneInvalidPeers(ctx context.Context, apiClient flyutil.Client) error {
func PruneInvalidPeers(ctx context.Context, apiClient WebClient) error {
state, err := GetWireGuardState()
if err != nil {
return nil
Expand Down
Loading
Loading