Skip to content

Commit

Permalink
feat: use docker api instead of exec (fixes #1169) (#1488)
Browse files Browse the repository at this point in the history
  • Loading branch information
gak authored May 15, 2024
1 parent cad97da commit 9860ee3
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 83 deletions.
30 changes: 9 additions & 21 deletions cmd/ftl/cmd_schema_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package main
import (
"context"
"fmt"
"github.com/alecthomas/types/optional"
"net"
"os"
"path/filepath"
"strconv"

"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/ollama"

"github.com/TBD54566975/ftl/internal/exec"
"github.com/TBD54566975/ftl/internal/container"
"github.com/TBD54566975/ftl/internal/log"
)

Expand Down Expand Up @@ -136,47 +136,35 @@ func query(ctx context.Context, prompt string) error {
func (s *schemaImportCmd) setup(ctx context.Context) error {
logger := log.FromContext(ctx)

nameFlag := fmt.Sprintf("name=^/%s$", ollamaContainerName)
output, err := exec.Capture(ctx, ".", "docker", "ps", "-a", "--filter", nameFlag, "--format", "{{.Names}}")
exists, err := container.DoesExist(ctx, ollamaContainerName)
if err != nil {
logger.Errorf(err, "%s", output)
return err
}

port := strconv.Itoa(s.OllamaPort)

if len(output) == 0 {
if !exists {
logger.Debugf("Creating docker container '%s' for ollama", ollamaContainerName)

// check if port s.OllamaPort is already in use
l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", s.OllamaPort))
if err == nil {
return fmt.Errorf("port %d is already in use", s.OllamaPort)
if err != nil {
return fmt.Errorf("port %d is already in use: %w", s.OllamaPort, err)
}
_ = l.Close()

err = exec.Command(ctx, log.Debug, "./", "docker", "run",
"-d", // run detached so we can follow with other commands
"-v", ollamaVolume,
"-p", port+":11434",
"--name", ollamaContainerName,
"ollama/ollama").RunBuffered(ctx)
err = container.Run(ctx, "ollama/ollama", ollamaContainerName, s.OllamaPort, 11434, optional.Some(ollamaVolume))
if err != nil {
return err
}
} else {
// Start the existing container
_, err = exec.Capture(ctx, ".", "docker", "start", ollamaContainerName)
err = container.Start(ctx, ollamaContainerName)
if err != nil {
return err
}
}

// Initialize Ollama
err = exec.Command(ctx, log.Debug, "./", "docker", "exec",
ollamaContainerName,
"ollama",
"run", ollamaModel).RunBuffered(ctx)
err = container.Exec(ctx, ollamaContainerName, "ollama", "run", ollamaModel)
if err != nil {
return err
}
Expand Down
75 changes: 13 additions & 62 deletions cmd/ftl/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
osExec "os/exec" //nolint:depguard
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

Expand All @@ -23,11 +22,11 @@ import (
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/container"
"github.com/TBD54566975/ftl/internal/exec"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/slices"
)

type serveCmd struct {
Expand Down Expand Up @@ -238,17 +237,15 @@ func isServeRunning(logger *log.Logger) (bool, error) {
func (s *serveCmd) setupDB(ctx context.Context) (string, error) {
logger := log.FromContext(ctx)

nameFlag := fmt.Sprintf("name=^/%s$", ftlContainerName)
output, err := exec.Capture(ctx, ".", "docker", "ps", "-a", "--filter", nameFlag, "--format", "{{.Names}}")
recreate := s.Recreate
port := s.DBPort

exists, err := container.DoesExist(ctx, ftlContainerName)
if err != nil {
logger.Errorf(err, "%s", output)
return "", err
}

recreate := s.Recreate
port := strconv.Itoa(s.DBPort)

if len(output) == 0 {
if !exists {
logger.Debugf("Creating docker container '%s' for postgres db", ftlContainerName)

// check if port s.DBPort is already in use
Expand All @@ -257,54 +254,34 @@ func (s *serveCmd) setupDB(ctx context.Context) (string, error) {
return "", fmt.Errorf("port %d is already in use", s.DBPort)
}

err = exec.Command(ctx, log.Debug, "./", "docker", "run",
"-d", // run detached so we can follow with other commands
"--name", ftlContainerName,
"--user", "postgres",
"--restart", "always",
"-e", "POSTGRES_PASSWORD=secret",
"-p", port+":5432",
"--health-cmd=pg_isready",
"--health-interval=1s",
"--health-timeout=60s",
"--health-retries=60",
"--health-start-period=80s",
"postgres:latest", "postgres",
).RunBuffered(ctx)
err = container.RunDB(ctx, ftlContainerName, s.DBPort)
if err != nil {
return "", err
}

recreate = true
} else {
// Start the existing container
_, err = exec.Capture(ctx, ".", "docker", "start", ftlContainerName)
err = container.Start(ctx, ftlContainerName)
if err != nil {
return "", err
}

// Grab the port from the existing container
portOutput, err := exec.Capture(ctx, ".", "docker", "port", ftlContainerName, "5432/tcp")
port, err = container.GetContainerPort(ctx, ftlContainerName, 5432)
if err != nil {
logger.Errorf(err, "%s", portOutput)
return "", err
}
port = slices.Reduce(strings.Split(string(portOutput), "\n"), "", func(port string, line string) string {
if parts := strings.Split(line, ":"); len(parts) == 2 {
return parts[1]
}
return port
})

logger.Debugf("Reusing existing docker container %q on port %q for postgres db", ftlContainerName, port)
logger.Debugf("Reusing existing docker container %s on port %d for postgres db", ftlContainerName, port)
}

err = pollContainerHealth(ctx, ftlContainerName, 10*time.Second)
err = container.PollContainerHealth(ctx, ftlContainerName, 10*time.Second)
if err != nil {
return "", err
return "", fmt.Errorf("db container failed to be healthy: %w", err)
}

dsn := fmt.Sprintf("postgres://postgres:secret@localhost:%s/ftl?sslmode=disable", port)
dsn := fmt.Sprintf("postgres://postgres:secret@localhost:%d/ftl?sslmode=disable", port)
logger.Debugf("Postgres DSN: %s", dsn)

_, err = databasetesting.CreateForDevel(ctx, dsn, recreate)
Expand All @@ -315,32 +292,6 @@ func (s *serveCmd) setupDB(ctx context.Context) (string, error) {
return dsn, nil
}

func pollContainerHealth(ctx context.Context, containerName string, timeout time.Duration) error {
logger := log.FromContext(ctx)
logger.Debugf("Waiting for %s to be healthy", containerName)

pollCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

for {
select {
case <-pollCtx.Done():
return fmt.Errorf("timed out waiting for container to be healthy: %w", pollCtx.Err())

case <-time.After(1 * time.Millisecond):
output, err := exec.Capture(pollCtx, ".", "docker", "inspect", "--format", "{{.State.Health.Status}}", containerName)
if err != nil {
return err
}

status := strings.TrimSpace(string(output))
if status == "healthy" {
return nil
}
}
}
}

// waitForControllerOnline polls the controller service until it is online.
func waitForControllerOnline(ctx context.Context, startupTimeout time.Duration, client ftlv1connect.ControllerServiceClient) error {
logger := log.FromContext(ctx)
Expand Down
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/beevik/etree v1.4.0
github.com/bmatcuk/doublestar/v4 v4.6.1
github.com/deckarep/golang-set/v2 v2.6.0
github.com/docker/docker v25.0.5+incompatible
github.com/go-logr/logr v1.4.1
github.com/gofrs/flock v0.8.1
github.com/golang/protobuf v1.5.4
Expand Down Expand Up @@ -61,14 +62,22 @@ require (
)

require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/muesli/termenv v0.15.2 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
Expand All @@ -77,7 +86,10 @@ require (
github.com/segmentio/ksuid v1.0.4 // indirect
github.com/sourcegraph/jsonrpc2 v0.2.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.50.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
gotest.tools/v3 v3.5.1 // indirect
modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect
)

Expand Down
Loading

0 comments on commit 9860ee3

Please sign in to comment.