Skip to content

Commit

Permalink
refactor: local plugin runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Sep 4, 2024
1 parent b70d830 commit 1697a0e
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 109 deletions.
103 changes: 6 additions & 97 deletions internal/core/plugin_manager/local_manager/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,118 +3,27 @@ package local_manager
import (
"fmt"
"os"
"os/exec"
"path"
"strings"
"sync"
"syscall"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/types/entities/constants"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
)

func (r *LocalPluginRuntime) InitEnvironment() error {
if _, err := os.Stat(path.Join(r.State.AbsolutePath, ".installed")); err == nil {
return nil
}

// execute init command, create
// TODO
handle := exec.Command("bash")
handle.Dir = r.State.AbsolutePath
handle.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

// get stdout and stderr
stdout, err := handle.StdoutPipe()
if err != nil {
return err
var err error
if r.Config.Meta.Runner.Language == constants.Python {
err = r.InitPythonEnvironment()
} else {
return fmt.Errorf("unsupported language: %s", r.Config.Meta.Runner.Language)
}
defer stdout.Close()

stderr, err := handle.StderrPipe()
if err != nil {
return err
}
defer stderr.Close()

// start command
if err := handle.Start(); err != nil {
return err
}
defer func() {
if handle.Process != nil {
handle.Process.Kill()
}
}()

var err_msg strings.Builder
var wg sync.WaitGroup
wg.Add(2)

last_active_at := time.Now()

routine.Submit(func() {
defer wg.Done()
// read stdout
buf := make([]byte, 1024)
for {
n, err := stdout.Read(buf)
if err != nil {
break
}
log.Info("installing %s - %s", r.Config.Identity(), string(buf[:n]))
last_active_at = time.Now()
}
})

routine.Submit(func() {
defer wg.Done()
// read stderr
buf := make([]byte, 1024)
for {
n, err := stderr.Read(buf)
if err != nil && err != os.ErrClosed {
last_active_at = time.Now()
err_msg.WriteString(string(buf[:n]))
break
} else if err == os.ErrClosed {
break
}

if n > 0 {
err_msg.WriteString(string(buf[:n]))
last_active_at = time.Now()
}
}
})

routine.Submit(func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if handle.ProcessState != nil && handle.ProcessState.Exited() {
break
}

if time.Since(last_active_at) > 60*time.Second {
handle.Process.Kill()
err_msg.WriteString("init process exited due to long time no activity")
break
}
}
})

wg.Wait()

if err_msg.Len() > 0 {
return fmt.Errorf("install failed: %s", err_msg.String())
}

if err := handle.Wait(); err != nil {
return err
}

// create .installed file
f, err := os.Create(path.Join(r.State.AbsolutePath, ".installed"))
Expand Down
167 changes: 158 additions & 9 deletions internal/core/plugin_manager/local_manager/environment_python.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,168 @@
package local_manager

import "os/exec"
import (
"context"
"fmt"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"sync"
"time"

func (p *LocalPluginRuntime) InitPythonEnvironment(requirements_txt string) error {
// create virtual env
identity, err := p.Identity()
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
)

func (p *LocalPluginRuntime) InitPythonEnvironment() error {
// execute init command, create a virtual environment
success := false

cmd := exec.Command("bash", "-c", "python3 -m venv .venv")
cmd.Dir = p.State.WorkingPath
if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to create virtual environment: %s", err)
}
defer func() {
// if init failed, remove the .venv directory
if !success {
os.RemoveAll(path.Join(p.State.WorkingPath, ".venv"))
}
}()

// wait for the virtual environment to be created
if err := cmd.Wait(); err != nil {
return fmt.Errorf("failed to create virtual environment: %s", err)
}

// try find python interpreter and pip
pip_path, err := filepath.Abs(path.Join(p.State.WorkingPath, ".venv/bin/pip"))
if err != nil {
return err
return fmt.Errorf("failed to find pip: %s", err)
}

cmd := exec.Command("python", "-m", "venv", identity.String())
python_path, err := filepath.Abs(path.Join(p.State.WorkingPath, ".venv/bin/python"))
if err != nil {
return fmt.Errorf("failed to find python: %s", err)
}

if _, err := os.Stat(pip_path); err != nil {
return fmt.Errorf("failed to find pip: %s", err)
}

// set working directory
cmd.Dir = p.WorkingPath
if _, err := os.Stat(python_path); err != nil {
return fmt.Errorf("failed to find python: %s", err)
}

p.python_interpreter_path = python_path

// try find requirements.txt
requirements_path := path.Join(p.State.WorkingPath, "requirements.txt")
if _, err := os.Stat(requirements_path); err != nil {
return fmt.Errorf("failed to find requirements.txt: %s", err)
}

// install dependencies
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

cmd = exec.CommandContext(ctx, pip_path, "install", "-r", requirements_path)
cmd.Dir = p.State.WorkingPath
if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to install dependencies: %s", err)
}

// get stdout and stderr
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("failed to get stdout: %s", err)
}
defer stdout.Close()

stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("failed to get stderr: %s", err)
}
defer stderr.Close()

// start command
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start command: %s", err)
}
defer func() {
if cmd.Process != nil {
cmd.Process.Kill()
}
}()

var err_msg strings.Builder
var wg sync.WaitGroup
wg.Add(2)

last_active_at := time.Now()

routine.Submit(func() {
defer wg.Done()
// read stdout
buf := make([]byte, 1024)
for {
n, err := stdout.Read(buf)
if err != nil {
break
}
log.Info("installing %s - %s", p.Config.Identity(), string(buf[:n]))
last_active_at = time.Now()
}
})

routine.Submit(func() {
defer wg.Done()
// read stderr
buf := make([]byte, 1024)
for {
n, err := stderr.Read(buf)
if err != nil && err != os.ErrClosed {
last_active_at = time.Now()
err_msg.WriteString(string(buf[:n]))
break
} else if err == os.ErrClosed {
break
}

if n > 0 {
err_msg.WriteString(string(buf[:n]))
last_active_at = time.Now()
}
}
})

routine.Submit(func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if cmd.ProcessState != nil && cmd.ProcessState.Exited() {
break
}

if time.Since(last_active_at) > 60*time.Second {
cmd.Process.Kill()
err_msg.WriteString("init process exited due to long time no activity")
break
}
}
})

wg.Wait()

if err_msg.Len() > 0 {
return fmt.Errorf("install failed: %s", err_msg.String())
}

if err := cmd.Wait(); err != nil {
return fmt.Errorf("failed to install dependencies: %s", err)
}

// TODO
success = true
return nil
}
21 changes: 18 additions & 3 deletions internal/core/plugin_manager/local_manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/langgenius/dify-plugin-daemon/internal/process"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/constants"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
Expand All @@ -33,14 +34,28 @@ func (r *LocalPluginRuntime) Type() plugin_entities.PluginRuntimeType {
return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
}

func (r *LocalPluginRuntime) getCmd() (*exec.Cmd, error) {
if r.Config.Meta.Runner.Language == constants.Python {
cmd := exec.Command(r.python_interpreter_path, "-m", r.Config.Meta.Runner.Entrypoint)
cmd.Dir = r.State.WorkingPath
return cmd, nil
}

return nil, fmt.Errorf("unsupported language: %s", r.Config.Meta.Runner.Language)
}

func (r *LocalPluginRuntime) StartPlugin() error {
defer log.Info("plugin %s stopped", r.Config.Identity())

r.init()

// start plugin
// TODO: use exec.Command("bash") instead of exec.Command("bash", r.Config.Execution.Launch)
e := exec.Command("bash")
e.Dir = r.State.AbsolutePath
e, err := r.getCmd()
if err != nil {
return err
}

e.Dir = r.State.WorkingPath
// add env INSTALL_METHOD=local
e.Env = append(e.Env, "INSTALL_METHOD=local", "PATH="+os.Getenv("PATH"))

Expand Down
3 changes: 3 additions & 0 deletions internal/core/plugin_manager/local_manager/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ type LocalPluginRuntime struct {

wait_chan chan bool
io_identity string

// python interpreter path, currently only support python
python_interpreter_path string
}

0 comments on commit 1697a0e

Please sign in to comment.