Skip to content

Commit

Permalink
Improve stderr handling with the Docker Kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
quentinguidee committed Sep 27, 2023
1 parent baae1b8 commit 4f9a9d0
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 39 deletions.
10 changes: 9 additions & 1 deletion adapter/docker_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,17 @@ func (a DockerCliAdapter) InfoContainer(id string) (types.InfoContainerResponse,
}, nil
}

func (a DockerCliAdapter) LogsContainer(id string) (io.ReadCloser, error) {
func (a DockerCliAdapter) LogsStdoutContainer(id string) (io.ReadCloser, error) {
return a.cli.ContainerLogs(context.Background(), id, dockertypes.ContainerLogsOptions{
ShowStdout: true,
Timestamps: false,
Follow: true,
Tail: "0",
})
}

func (a DockerCliAdapter) LogsStderrContainer(id string) (io.ReadCloser, error) {
return a.cli.ContainerLogs(context.Background(), id, dockertypes.ContainerLogsOptions{
ShowStderr: true,
Timestamps: false,
Follow: true,
Expand Down
49 changes: 34 additions & 15 deletions adapter/runner_docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func (a RunnerDockerAdapter) Delete(instance *types.Instance) error {
}

func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(status string)) (io.ReadCloser, io.ReadCloser, error) {
//rErr, wErr := io.Pipe()
rErr := io.ReadCloser(nil)
rErr, wErr := io.Pipe()
rOut, wOut := io.Pipe()

go func() {
Expand All @@ -53,7 +52,7 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat

// Build
var err error
var stdout io.ReadCloser
var stdout, stderr io.ReadCloser
if service.Methods.Docker.Dockerfile != nil {
stdout, err = a.buildImageFromDockerfile(instancePath, imageName)
} else if service.Methods.Docker.Image != nil {
Expand All @@ -68,7 +67,6 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat

var wg sync.WaitGroup

// Send stdout to wOut
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -80,7 +78,6 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat
}
}()

//Send stderr to wErr
//wg.Add(1)
//go func() {
// defer wg.Done()
Expand Down Expand Up @@ -192,7 +189,7 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat
}
setStatus(types.InstanceStatusRunning)

stdout, err = a.readLogs(id)
stdout, stderr, err = a.readLogs(id)
if err != nil {
return
}
Expand All @@ -205,9 +202,15 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat
}
}()

a.watchForStatusChange(id, instance, setStatus)
go func() {
_, err := io.Copy(wErr, stderr)
if err != nil {
log.Error(err)
return
}
}()

return
a.watchForStatusChange(id, instance, setStatus)
}()

return rOut, rErr, nil
Expand Down Expand Up @@ -439,20 +442,36 @@ func (a RunnerDockerAdapter) watchForStatusChange(containerID string, instance *
}()
}

func (a RunnerDockerAdapter) readLogs(containerID string) (io.ReadCloser, error) {
req, err := requests.URL("http://localhost:6131/").
Pathf("/api/docker/container/%s/logs", containerID).
func (a RunnerDockerAdapter) readLogs(containerID string) (stdout io.ReadCloser, stderr io.ReadCloser, err error) {
var reqStdout, reqStderr *http.Request
reqStdout, err = requests.URL("http://localhost:6131/").
Pathf("/api/docker/container/%s/logs/stdout", containerID).
Request(context.Background())
if err != nil {
return nil, err
return
}

reqStderr, err = requests.URL("http://localhost:6131/").
Pathf("/api/docker/container/%s/logs/stderr", containerID).
Request(context.Background())
if err != nil {
return
}

var res *http.Response
res, err = http.DefaultClient.Do(req)
res, err = http.DefaultClient.Do(reqStdout)
if err != nil {
return nil, err
return
}
return res.Body, nil
stdout = res.Body

res, err = http.DefaultClient.Do(reqStderr)
if err != nil {
_ = stdout.Close()
return
}
stderr = res.Body
return
}

func (a RunnerDockerAdapter) getPath(instance types.Instance) string {
Expand Down
42 changes: 38 additions & 4 deletions router/docker_kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func addDockerKernelRoutes(r *gin.RouterGroup) {
r.POST("/container/:id/start", handleStartDockerContainer)
r.POST("/container/:id/stop", handleStopDockerContainer)
r.GET("/container/:id/info", handleInfoDockerContainer)
r.GET("/container/:id/logs", handleLogsDockerContainer)
r.GET("/container/:id/logs/stdout", handleLogsStdoutDockerContainer)
r.GET("/container/:id/logs/stderr", handleLogsStderrDockerContainer)
r.GET("/container/:id/wait/:cond", handleWaitDockerContainer)
r.GET("/image/:id/info", handleInfoDockerImage)
r.POST("/image/pull", handlePullDockerImage)
Expand Down Expand Up @@ -121,10 +122,10 @@ func handleInfoDockerContainer(c *gin.Context) {
c.JSON(http.StatusOK, info)
}

func handleLogsDockerContainer(c *gin.Context) {
func handleLogsStdoutDockerContainer(c *gin.Context) {
id := c.Param("id")

r, err := dockerKernelService.LogsContainer(id)
stdout, err := dockerKernelService.LogsStdoutContainer(id)
if err != nil {
_ = c.AbortWithError(http.StatusInternalServerError, types.APIError{
Code: "failed_to_get_container_logs",
Expand All @@ -133,7 +134,40 @@ func handleLogsDockerContainer(c *gin.Context) {
return
}

scanner := bufio.NewScanner(r)
scanner := bufio.NewScanner(stdout)

c.Stream(func(w io.Writer) bool {
if scanner.Err() != nil {
log.Error(scanner.Err())
return false
}

if !scanner.Scan() {
return false
}

_, err := io.WriteString(w, scanner.Text()+"\n")
if err != nil {
log.Error(err)
return false
}
return true
})
}

func handleLogsStderrDockerContainer(c *gin.Context) {
id := c.Param("id")

stderr, err := dockerKernelService.LogsStderrContainer(id)
if err != nil {
_ = c.AbortWithError(http.StatusInternalServerError, types.APIError{
Code: "failed_to_get_container_logs",
Message: err.Error(),
})
return
}

scanner := bufio.NewScanner(stderr)

c.Stream(func(w io.Writer) bool {
if scanner.Err() != nil {
Expand Down
8 changes: 6 additions & 2 deletions services/docker_kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ func (s DockerKernelService) InfoContainer(id string) (types.InfoContainerRespon
return s.dockerAdapter.InfoContainer(id)
}

func (s DockerKernelService) LogsContainer(id string) (io.ReadCloser, error) {
return s.dockerAdapter.LogsContainer(id)
func (s DockerKernelService) LogsStdoutContainer(id string) (io.ReadCloser, error) {
return s.dockerAdapter.LogsStdoutContainer(id)
}

func (s DockerKernelService) LogsStderrContainer(id string) (io.ReadCloser, error) {
return s.dockerAdapter.LogsStderrContainer(id)
}

func (s DockerKernelService) WaitContainer(id string, cond types.WaitContainerCondition) error {
Expand Down
32 changes: 16 additions & 16 deletions services/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ func (s *InstanceService) Start(uuid uuid.UUID) error {
s.setStatus(instance, status)
}

var stdout io.ReadCloser
var stdout, stderr io.ReadCloser
if instance.IsDockerized() {
stdout, _, err = s.dockerRunnerAdapter.Start(instance, setStatus)
stdout, stderr, err = s.dockerRunnerAdapter.Start(instance, setStatus)
} else {
stdout, _, err = s.fsRunnerAdapter.Start(instance, setStatus)
stdout, stderr, err = s.fsRunnerAdapter.Start(instance, setStatus)
}
if err != nil {
s.setStatus(instance, types.InstanceStatusError)
Expand All @@ -165,19 +165,19 @@ func (s *InstanceService) Start(uuid uuid.UUID) error {
}
}()

//wg.Add(1)
//go func() {
// defer wg.Done()
//
// scanner := bufio.NewScanner(stderr)
// for scanner.Scan() {
// s.eventsAdapter.Send(types.EventInstanceLog{
// InstanceUUID: uuid,
// Kind: types.LogKindErr,
// Message: scanner.Text(),
// })
// }
//}()
wg.Add(1)
go func() {
defer wg.Done()

scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
s.eventsAdapter.Send(types.EventInstanceLog{
InstanceUUID: uuid,
Kind: types.LogKindErr,
Message: scanner.Text(),
})
}
}()

wg.Wait()

Expand Down
3 changes: 2 additions & 1 deletion types/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type DockerAdapterPort interface {
StartContainer(id string) error
StopContainer(id string) error
InfoContainer(id string) (InfoContainerResponse, error)
LogsContainer(id string) (io.ReadCloser, error)
LogsStdoutContainer(id string) (io.ReadCloser, error)
LogsStderrContainer(id string) (io.ReadCloser, error)
WaitContainer(id string, cond WaitContainerCondition) error
InfoImage(id string) (InfoImageResponse, error)
PullImage(options PullImageOptions) (io.ReadCloser, error)
Expand Down

0 comments on commit 4f9a9d0

Please sign in to comment.