From 4f9a9d08e5c4a437127417b058fb55c823596c2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Quentin=20Guid=C3=A9e?= Date: Wed, 27 Sep 2023 12:25:40 -0400 Subject: [PATCH] Improve stderr handling with the Docker Kernel --- adapter/docker_cli.go | 10 +++++++- adapter/runner_docker.go | 49 +++++++++++++++++++++++++++------------ router/docker_kernel.go | 42 +++++++++++++++++++++++++++++---- services/docker_kernel.go | 8 +++++-- services/instance.go | 32 ++++++++++++------------- types/docker.go | 3 ++- 6 files changed, 105 insertions(+), 39 deletions(-) diff --git a/adapter/docker_cli.go b/adapter/docker_cli.go index 5cc5d236..eff131f0 100644 --- a/adapter/docker_cli.go +++ b/adapter/docker_cli.go @@ -98,9 +98,17 @@ func (a DockerCliAdapter) InfoContainer(id string) (types.InfoContainerResponse, }, nil } -func (a DockerCliAdapter) LogsContainer(id string) (io.ReadCloser, error) { +func (a DockerCliAdapter) LogsStdoutContainer(id string) (io.ReadCloser, error) { return a.cli.ContainerLogs(context.Background(), id, dockertypes.ContainerLogsOptions{ ShowStdout: true, + Timestamps: false, + Follow: true, + Tail: "0", + }) +} + +func (a DockerCliAdapter) LogsStderrContainer(id string) (io.ReadCloser, error) { + return a.cli.ContainerLogs(context.Background(), id, dockertypes.ContainerLogsOptions{ ShowStderr: true, Timestamps: false, Follow: true, diff --git a/adapter/runner_docker.go b/adapter/runner_docker.go index 8a7b72af..50c3c7cb 100644 --- a/adapter/runner_docker.go +++ b/adapter/runner_docker.go @@ -39,8 +39,7 @@ func (a RunnerDockerAdapter) Delete(instance *types.Instance) error { } func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(status string)) (io.ReadCloser, io.ReadCloser, error) { - //rErr, wErr := io.Pipe() - rErr := io.ReadCloser(nil) + rErr, wErr := io.Pipe() rOut, wOut := io.Pipe() go func() { @@ -53,7 +52,7 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat // Build var err error - var stdout io.ReadCloser + var stdout, stderr io.ReadCloser if service.Methods.Docker.Dockerfile != nil { stdout, err = a.buildImageFromDockerfile(instancePath, imageName) } else if service.Methods.Docker.Image != nil { @@ -68,7 +67,6 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat var wg sync.WaitGroup - // Send stdout to wOut wg.Add(1) go func() { defer wg.Done() @@ -80,7 +78,6 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat } }() - //Send stderr to wErr //wg.Add(1) //go func() { // defer wg.Done() @@ -192,7 +189,7 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat } setStatus(types.InstanceStatusRunning) - stdout, err = a.readLogs(id) + stdout, stderr, err = a.readLogs(id) if err != nil { return } @@ -205,9 +202,15 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat } }() - a.watchForStatusChange(id, instance, setStatus) + go func() { + _, err := io.Copy(wErr, stderr) + if err != nil { + log.Error(err) + return + } + }() - return + a.watchForStatusChange(id, instance, setStatus) }() return rOut, rErr, nil @@ -439,20 +442,36 @@ func (a RunnerDockerAdapter) watchForStatusChange(containerID string, instance * }() } -func (a RunnerDockerAdapter) readLogs(containerID string) (io.ReadCloser, error) { - req, err := requests.URL("http://localhost:6131/"). - Pathf("/api/docker/container/%s/logs", containerID). +func (a RunnerDockerAdapter) readLogs(containerID string) (stdout io.ReadCloser, stderr io.ReadCloser, err error) { + var reqStdout, reqStderr *http.Request + reqStdout, err = requests.URL("http://localhost:6131/"). + Pathf("/api/docker/container/%s/logs/stdout", containerID). Request(context.Background()) if err != nil { - return nil, err + return + } + + reqStderr, err = requests.URL("http://localhost:6131/"). + Pathf("/api/docker/container/%s/logs/stderr", containerID). + Request(context.Background()) + if err != nil { + return } var res *http.Response - res, err = http.DefaultClient.Do(req) + res, err = http.DefaultClient.Do(reqStdout) if err != nil { - return nil, err + return } - return res.Body, nil + stdout = res.Body + + res, err = http.DefaultClient.Do(reqStderr) + if err != nil { + _ = stdout.Close() + return + } + stderr = res.Body + return } func (a RunnerDockerAdapter) getPath(instance types.Instance) string { diff --git a/router/docker_kernel.go b/router/docker_kernel.go index 1d23d528..be3dd851 100644 --- a/router/docker_kernel.go +++ b/router/docker_kernel.go @@ -18,7 +18,8 @@ func addDockerKernelRoutes(r *gin.RouterGroup) { r.POST("/container/:id/start", handleStartDockerContainer) r.POST("/container/:id/stop", handleStopDockerContainer) r.GET("/container/:id/info", handleInfoDockerContainer) - r.GET("/container/:id/logs", handleLogsDockerContainer) + r.GET("/container/:id/logs/stdout", handleLogsStdoutDockerContainer) + r.GET("/container/:id/logs/stderr", handleLogsStderrDockerContainer) r.GET("/container/:id/wait/:cond", handleWaitDockerContainer) r.GET("/image/:id/info", handleInfoDockerImage) r.POST("/image/pull", handlePullDockerImage) @@ -121,10 +122,10 @@ func handleInfoDockerContainer(c *gin.Context) { c.JSON(http.StatusOK, info) } -func handleLogsDockerContainer(c *gin.Context) { +func handleLogsStdoutDockerContainer(c *gin.Context) { id := c.Param("id") - r, err := dockerKernelService.LogsContainer(id) + stdout, err := dockerKernelService.LogsStdoutContainer(id) if err != nil { _ = c.AbortWithError(http.StatusInternalServerError, types.APIError{ Code: "failed_to_get_container_logs", @@ -133,7 +134,40 @@ func handleLogsDockerContainer(c *gin.Context) { return } - scanner := bufio.NewScanner(r) + scanner := bufio.NewScanner(stdout) + + c.Stream(func(w io.Writer) bool { + if scanner.Err() != nil { + log.Error(scanner.Err()) + return false + } + + if !scanner.Scan() { + return false + } + + _, err := io.WriteString(w, scanner.Text()+"\n") + if err != nil { + log.Error(err) + return false + } + return true + }) +} + +func handleLogsStderrDockerContainer(c *gin.Context) { + id := c.Param("id") + + stderr, err := dockerKernelService.LogsStderrContainer(id) + if err != nil { + _ = c.AbortWithError(http.StatusInternalServerError, types.APIError{ + Code: "failed_to_get_container_logs", + Message: err.Error(), + }) + return + } + + scanner := bufio.NewScanner(stderr) c.Stream(func(w io.Writer) bool { if scanner.Err() != nil { diff --git a/services/docker_kernel.go b/services/docker_kernel.go index 077d68cb..641edca6 100644 --- a/services/docker_kernel.go +++ b/services/docker_kernel.go @@ -41,8 +41,12 @@ func (s DockerKernelService) InfoContainer(id string) (types.InfoContainerRespon return s.dockerAdapter.InfoContainer(id) } -func (s DockerKernelService) LogsContainer(id string) (io.ReadCloser, error) { - return s.dockerAdapter.LogsContainer(id) +func (s DockerKernelService) LogsStdoutContainer(id string) (io.ReadCloser, error) { + return s.dockerAdapter.LogsStdoutContainer(id) +} + +func (s DockerKernelService) LogsStderrContainer(id string) (io.ReadCloser, error) { + return s.dockerAdapter.LogsStderrContainer(id) } func (s DockerKernelService) WaitContainer(id string, cond types.WaitContainerCondition) error { diff --git a/services/instance.go b/services/instance.go index 60a64b3a..2bef88fc 100644 --- a/services/instance.go +++ b/services/instance.go @@ -138,11 +138,11 @@ func (s *InstanceService) Start(uuid uuid.UUID) error { s.setStatus(instance, status) } - var stdout io.ReadCloser + var stdout, stderr io.ReadCloser if instance.IsDockerized() { - stdout, _, err = s.dockerRunnerAdapter.Start(instance, setStatus) + stdout, stderr, err = s.dockerRunnerAdapter.Start(instance, setStatus) } else { - stdout, _, err = s.fsRunnerAdapter.Start(instance, setStatus) + stdout, stderr, err = s.fsRunnerAdapter.Start(instance, setStatus) } if err != nil { s.setStatus(instance, types.InstanceStatusError) @@ -165,19 +165,19 @@ func (s *InstanceService) Start(uuid uuid.UUID) error { } }() - //wg.Add(1) - //go func() { - // defer wg.Done() - // - // scanner := bufio.NewScanner(stderr) - // for scanner.Scan() { - // s.eventsAdapter.Send(types.EventInstanceLog{ - // InstanceUUID: uuid, - // Kind: types.LogKindErr, - // Message: scanner.Text(), - // }) - // } - //}() + wg.Add(1) + go func() { + defer wg.Done() + + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + s.eventsAdapter.Send(types.EventInstanceLog{ + InstanceUUID: uuid, + Kind: types.LogKindErr, + Message: scanner.Text(), + }) + } + }() wg.Wait() diff --git a/types/docker.go b/types/docker.go index 06e304e4..60320bd0 100644 --- a/types/docker.go +++ b/types/docker.go @@ -70,7 +70,8 @@ type DockerAdapterPort interface { StartContainer(id string) error StopContainer(id string) error InfoContainer(id string) (InfoContainerResponse, error) - LogsContainer(id string) (io.ReadCloser, error) + LogsStdoutContainer(id string) (io.ReadCloser, error) + LogsStderrContainer(id string) (io.ReadCloser, error) WaitContainer(id string, cond WaitContainerCondition) error InfoImage(id string) (InfoImageResponse, error) PullImage(options PullImageOptions) (io.ReadCloser, error)