Skip to content

Commit

Permalink
fix: endless restarting of local runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Nov 11, 2024
1 parent 357bc37 commit b6899ca
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 24 deletions.
44 changes: 23 additions & 21 deletions internal/core/plugin_manager/local_manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func (r *LocalPluginRuntime) gc() {
removeStdioHandler(r.io_identity)
}

if r.wait_chan != nil {
close(r.wait_chan)
r.wait_chan = nil
if r.waitChan != nil {
close(r.waitChan)
r.waitChan = nil
}
}

Expand All @@ -46,18 +46,25 @@ func (r *LocalPluginRuntime) getCmd() (*exec.Cmd, error) {
func (r *LocalPluginRuntime) StartPlugin() error {
defer log.Info("plugin %s stopped", r.Config.Identity())
defer func() {
r.wait_chan_lock.Lock()
for _, c := range r.wait_stopped_chan {
r.waitChanLock.Lock()
for _, c := range r.waitStoppedChan {
select {
case c <- true:
default:
}
}
r.wait_chan_lock.Unlock()
r.waitChanLock.Unlock()
}()

if r.isNotFirstStart {
r.SetRestarting()
} else {
r.SetLaunching()
r.isNotFirstStart = true
}

// reset wait chan
r.wait_chan = make(chan bool)
r.waitChan = make(chan bool)
// reset wait launched chan

// start plugin
Expand All @@ -73,37 +80,32 @@ func (r *LocalPluginRuntime) StartPlugin() error {
// get writer
stdin, err := e.StdinPipe()
if err != nil {
r.SetRestarting()
return fmt.Errorf("get stdin pipe failed: %s", err.Error())
}
defer stdin.Close()

// get stdout
stdout, err := e.StdoutPipe()
if err != nil {
r.SetRestarting()
return fmt.Errorf("get stdout pipe failed: %s", err.Error())
}
defer stdout.Close()

// get stderr
stderr, err := e.StderrPipe()
if err != nil {
r.SetRestarting()
return fmt.Errorf("get stderr pipe failed: %s", err.Error())
}
defer stderr.Close()

if err := e.Start(); err != nil {
r.SetRestarting()
return fmt.Errorf("start plugin failed: %s", err.Error())
}

defer func() {
// wait for plugin to exit
err = e.Wait()
if err != nil {
r.SetRestarting()
log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
}

Expand Down Expand Up @@ -136,14 +138,14 @@ func (r *LocalPluginRuntime) StartPlugin() error {
})

// send started event
r.wait_chan_lock.Lock()
r.waitChanLock.Lock()
for _, c := range r.wait_started_chan {
select {
case c <- true:
default:
}
}
r.wait_chan_lock.Unlock()
r.waitChanLock.Unlock()

// wait for plugin to exit
err = stdio.Wait()
Expand All @@ -159,27 +161,27 @@ func (r *LocalPluginRuntime) StartPlugin() error {

// Wait returns a channel that will be closed when the plugin stops
func (r *LocalPluginRuntime) Wait() (<-chan bool, error) {
if r.wait_chan == nil {
if r.waitChan == nil {
return nil, errors.New("plugin not started")
}
return r.wait_chan, nil
return r.waitChan, nil
}

// WaitStarted returns a channel that will receive true when the plugin starts
func (r *LocalPluginRuntime) WaitStarted() <-chan bool {
c := make(chan bool)
r.wait_chan_lock.Lock()
r.waitChanLock.Lock()
r.wait_started_chan = append(r.wait_started_chan, c)
r.wait_chan_lock.Unlock()
r.waitChanLock.Unlock()
return c
}

// WaitStopped returns a channel that will receive true when the plugin stops
func (r *LocalPluginRuntime) WaitStopped() <-chan bool {
c := make(chan bool)
r.wait_chan_lock.Lock()
r.wait_stopped_chan = append(r.wait_stopped_chan, c)
r.wait_chan_lock.Unlock()
r.waitChanLock.Lock()
r.waitStoppedChan = append(r.waitStoppedChan, c)
r.waitChanLock.Unlock()
return c
}

Expand Down
8 changes: 5 additions & 3 deletions internal/core/plugin_manager/local_manager/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type LocalPluginRuntime struct {
positive_manager.PositivePluginRuntime
plugin_entities.PluginRuntime

wait_chan chan bool
waitChan chan bool
io_identity string

// python interpreter path, currently only support python
Expand All @@ -21,9 +21,11 @@ type LocalPluginRuntime struct {
// by using its venv module
default_python_interpreter_path string

wait_chan_lock sync.Mutex
waitChanLock sync.Mutex
wait_started_chan []chan bool
wait_stopped_chan []chan bool
waitStoppedChan []chan bool

isNotFirstStart bool
}

func NewLocalPluginRuntime(
Expand Down

0 comments on commit b6899ca

Please sign in to comment.