Skip to content

Commit

Permalink
Fix some issues with logs using Vertex Kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
quentinguidee committed Sep 27, 2023
1 parent 4f9a9d0 commit a507e07
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 45 deletions.
68 changes: 49 additions & 19 deletions adapter/runner_docker.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package adapter

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net/http"
"path"
Expand Down Expand Up @@ -63,18 +65,24 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat
if err != nil {
return
}
defer stdout.Close()

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
defer stdout.Close()
_, err := io.Copy(wOut, stdout)
if err != nil {
log.Error(err)
return

scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
if scanner.Err() != nil {
log.Error(scanner.Err())
return
}
_, err := fmt.Fprintln(wOut, scanner.Text())
if err != nil {
return
}
}
}()

Expand Down Expand Up @@ -195,6 +203,9 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat
}

go func() {
defer stdout.Close()
defer wOut.Close()

_, err := io.Copy(wOut, stdout)
if err != nil {
log.Error(err)
Expand All @@ -203,7 +214,10 @@ func (a RunnerDockerAdapter) Start(instance *types.Instance, setStatus func(stat
}()

go func() {
_, err := io.Copy(wErr, stderr)
defer stderr.Close()
defer wErr.Close()

_, err := io.Copy(wOut, stdout)
if err != nil {
log.Error(err)
return
Expand Down Expand Up @@ -458,20 +472,36 @@ func (a RunnerDockerAdapter) readLogs(containerID string) (stdout io.ReadCloser,
return
}

var res *http.Response
res, err = http.DefaultClient.Do(reqStdout)
if err != nil {
return
}
stdout = res.Body
rOut, wOut := io.Pipe()
rErr, wErr := io.Pipe()

res, err = http.DefaultClient.Do(reqStderr)
if err != nil {
_ = stdout.Close()
return
}
stderr = res.Body
return
go func() {
res, err := http.DefaultClient.Do(reqStdout)
if err != nil {
return
}
defer res.Body.Close()

_, err = io.Copy(wOut, res.Body)
if err != nil {
return
}
}()

go func() {
res, err := http.DefaultClient.Do(reqStderr)
if err != nil {
return
}
defer res.Body.Close()

_, err = io.Copy(wErr, res.Body)
if err != nil {
return
}
}()

return rOut, rErr, nil
}

func (a RunnerDockerAdapter) getPath(instance types.Instance) string {
Expand Down
14 changes: 5 additions & 9 deletions router/docker_kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,19 @@ func handleLogsStdoutDockerContainer(c *gin.Context) {
})
return
}
defer stdout.Close()

scanner := bufio.NewScanner(stdout)

c.Stream(func(w io.Writer) bool {
if scanner.Err() != nil {
log.Error(scanner.Err())
return false
}

if !scanner.Scan() {
return false
}

_, err := io.WriteString(w, scanner.Text()+"\n")
_, err := fmt.Fprintln(w, scanner.Text())
if err != nil {
log.Error(err)
return false
Expand All @@ -166,20 +165,19 @@ func handleLogsStderrDockerContainer(c *gin.Context) {
})
return
}
defer stderr.Close()

scanner := bufio.NewScanner(stderr)

c.Stream(func(w io.Writer) bool {
if scanner.Err() != nil {
log.Error(scanner.Err())
return false
}

if !scanner.Scan() {
return false
}

_, err := io.WriteString(w, scanner.Text()+"\n")
_, err := fmt.Fprintln(w, scanner.Text())
if err != nil {
log.Error(err)
return false
Expand Down Expand Up @@ -244,15 +242,13 @@ func handlePullDockerImage(c *gin.Context) {

c.Stream(func(w io.Writer) bool {
if scanner.Err() != nil {
log.Error(scanner.Err())
return false
}

if !scanner.Scan() {
return false
}

_, err := io.WriteString(w, scanner.Text()+"\n")
_, err := fmt.Fprintln(w, scanner.Text())
if err != nil {
log.Error(err)
return false
Expand Down
32 changes: 15 additions & 17 deletions services/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"errors"
"fmt"
"io"
"net"
"os"
"path"
Expand Down Expand Up @@ -138,12 +137,14 @@ func (s *InstanceService) Start(uuid uuid.UUID) error {
s.setStatus(instance, status)
}

var stdout, stderr io.ReadCloser
var runner types.RunnerAdapterPort
if instance.IsDockerized() {
stdout, stderr, err = s.dockerRunnerAdapter.Start(instance, setStatus)
runner = s.dockerRunnerAdapter
} else {
stdout, stderr, err = s.fsRunnerAdapter.Start(instance, setStatus)
runner = s.fsRunnerAdapter
}

stdout, stderr, err := runner.Start(instance, setStatus)
if err != nil {
s.setStatus(instance, types.InstanceStatusError)
return err
Expand All @@ -157,6 +158,9 @@ func (s *InstanceService) Start(uuid uuid.UUID) error {

scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
if scanner.Err() != nil {
break
}
s.eventsAdapter.Send(types.EventInstanceLog{
InstanceUUID: uuid,
Kind: types.LogKindOut,
Expand All @@ -171,6 +175,9 @@ func (s *InstanceService) Start(uuid uuid.UUID) error {

scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
if scanner.Err() != nil {
break
}
s.eventsAdapter.Send(types.EventInstanceLog{
InstanceUUID: uuid,
Kind: types.LogKindErr,
Expand All @@ -179,15 +186,16 @@ func (s *InstanceService) Start(uuid uuid.UUID) error {
}
}()

// Wait for the instance until stopped
wg.Wait()

// Log stopped
s.eventsAdapter.Send(types.EventInstanceLog{
InstanceUUID: uuid,
Kind: types.LogKindVertexOut,
Message: "Instance started.",
Message: "Stopping instance...",
})

log.Info("instance started",
log.Info("stopping instance",
vlog.String("uuid", uuid.String()),
)

Expand Down Expand Up @@ -240,16 +248,6 @@ func (s *InstanceService) Stop(uuid uuid.UUID) error {
return err
}

s.eventsAdapter.Send(types.EventInstanceLog{
InstanceUUID: uuid,
Kind: types.LogKindVertexOut,
Message: "Stopping instance...",
})

log.Info("stopping instance",
vlog.String("uuid", uuid.String()),
)

if !instance.IsRunning() {
s.eventsAdapter.Send(types.EventInstanceLog{
InstanceUUID: uuid,
Expand Down

0 comments on commit a507e07

Please sign in to comment.