From b6899ca462033a02382144ff456fbfa27e2dddfd Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Mon, 11 Nov 2024 13:38:22 +0800 Subject: [PATCH] fix: endless restarting of local runtime --- .../core/plugin_manager/local_manager/run.go | 44 ++++++++++--------- .../core/plugin_manager/local_manager/type.go | 8 ++-- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/internal/core/plugin_manager/local_manager/run.go b/internal/core/plugin_manager/local_manager/run.go index 1729d11..b3bbac9 100644 --- a/internal/core/plugin_manager/local_manager/run.go +++ b/internal/core/plugin_manager/local_manager/run.go @@ -20,9 +20,9 @@ func (r *LocalPluginRuntime) gc() { removeStdioHandler(r.io_identity) } - if r.wait_chan != nil { - close(r.wait_chan) - r.wait_chan = nil + if r.waitChan != nil { + close(r.waitChan) + r.waitChan = nil } } @@ -46,18 +46,25 @@ func (r *LocalPluginRuntime) getCmd() (*exec.Cmd, error) { func (r *LocalPluginRuntime) StartPlugin() error { defer log.Info("plugin %s stopped", r.Config.Identity()) defer func() { - r.wait_chan_lock.Lock() - for _, c := range r.wait_stopped_chan { + r.waitChanLock.Lock() + for _, c := range r.waitStoppedChan { select { case c <- true: default: } } - r.wait_chan_lock.Unlock() + r.waitChanLock.Unlock() }() + if r.isNotFirstStart { + r.SetRestarting() + } else { + r.SetLaunching() + r.isNotFirstStart = true + } + // reset wait chan - r.wait_chan = make(chan bool) + r.waitChan = make(chan bool) // reset wait launched chan // start plugin @@ -73,7 +80,6 @@ func (r *LocalPluginRuntime) StartPlugin() error { // get writer stdin, err := e.StdinPipe() if err != nil { - r.SetRestarting() return fmt.Errorf("get stdin pipe failed: %s", err.Error()) } defer stdin.Close() @@ -81,7 +87,6 @@ func (r *LocalPluginRuntime) StartPlugin() error { // get stdout stdout, err := e.StdoutPipe() if err != nil { - r.SetRestarting() return fmt.Errorf("get stdout pipe failed: %s", err.Error()) } defer stdout.Close() @@ -89,13 +94,11 @@ func (r *LocalPluginRuntime) StartPlugin() error { // get stderr stderr, err := e.StderrPipe() if err != nil { - r.SetRestarting() return fmt.Errorf("get stderr pipe failed: %s", err.Error()) } defer stderr.Close() if err := e.Start(); err != nil { - r.SetRestarting() return fmt.Errorf("start plugin failed: %s", err.Error()) } @@ -103,7 +106,6 @@ func (r *LocalPluginRuntime) StartPlugin() error { // wait for plugin to exit err = e.Wait() if err != nil { - r.SetRestarting() log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error()) } @@ -136,14 +138,14 @@ func (r *LocalPluginRuntime) StartPlugin() error { }) // send started event - r.wait_chan_lock.Lock() + r.waitChanLock.Lock() for _, c := range r.wait_started_chan { select { case c <- true: default: } } - r.wait_chan_lock.Unlock() + r.waitChanLock.Unlock() // wait for plugin to exit err = stdio.Wait() @@ -159,27 +161,27 @@ func (r *LocalPluginRuntime) StartPlugin() error { // Wait returns a channel that will be closed when the plugin stops func (r *LocalPluginRuntime) Wait() (<-chan bool, error) { - if r.wait_chan == nil { + if r.waitChan == nil { return nil, errors.New("plugin not started") } - return r.wait_chan, nil + return r.waitChan, nil } // WaitStarted returns a channel that will receive true when the plugin starts func (r *LocalPluginRuntime) WaitStarted() <-chan bool { c := make(chan bool) - r.wait_chan_lock.Lock() + r.waitChanLock.Lock() r.wait_started_chan = append(r.wait_started_chan, c) - r.wait_chan_lock.Unlock() + r.waitChanLock.Unlock() return c } // WaitStopped returns a channel that will receive true when the plugin stops func (r *LocalPluginRuntime) WaitStopped() <-chan bool { c := make(chan bool) - r.wait_chan_lock.Lock() - r.wait_stopped_chan = append(r.wait_stopped_chan, c) - r.wait_chan_lock.Unlock() + r.waitChanLock.Lock() + r.waitStoppedChan = append(r.waitStoppedChan, c) + r.waitChanLock.Unlock() return c } diff --git a/internal/core/plugin_manager/local_manager/type.go b/internal/core/plugin_manager/local_manager/type.go index 1b3078b..a2e422b 100644 --- a/internal/core/plugin_manager/local_manager/type.go +++ b/internal/core/plugin_manager/local_manager/type.go @@ -11,7 +11,7 @@ type LocalPluginRuntime struct { positive_manager.PositivePluginRuntime plugin_entities.PluginRuntime - wait_chan chan bool + waitChan chan bool io_identity string // python interpreter path, currently only support python @@ -21,9 +21,11 @@ type LocalPluginRuntime struct { // by using its venv module default_python_interpreter_path string - wait_chan_lock sync.Mutex + waitChanLock sync.Mutex wait_started_chan []chan bool - wait_stopped_chan []chan bool + waitStoppedChan []chan bool + + isNotFirstStart bool } func NewLocalPluginRuntime(