Skip to content

Commit

Permalink
feat: plugin debugging server
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Jul 25, 2024
1 parent d35cf3c commit 52f51e1
Show file tree
Hide file tree
Showing 24 changed files with 644 additions and 128 deletions.
1 change: 1 addition & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func setDefault(config *app.Config) {
setDefaultInt(&config.LifetimeCollectionHeartbeatInterval, 5)
setDefaultInt(&config.LifetimeStateGCInterval, 300)
setDefaultInt(&config.DifyInvocationConnectionIdleTimeout, 120)
setDefaultInt(&config.PluginRemoteInstallServerEventLoopNums, 8)

setDebugString(&config.ProcessCachingPath, "/tmp/dify-plugin-daemon-subprocesses")
}
Expand Down
1 change: 0 additions & 1 deletion internal/core/plugin_manager/aws_connector.go

This file was deleted.

4 changes: 0 additions & 4 deletions internal/core/plugin_manager/aws_manager/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,3 @@ func (r *AWSPluginRuntime) Listen(session_id string) *entities.BytesIOListener {
func (r *AWSPluginRuntime) Write(session_id string, data []byte) {

}

func (r *AWSPluginRuntime) Request(session_id string, data []byte) ([]byte, error) {
return nil, nil
}
16 changes: 8 additions & 8 deletions internal/core/plugin_manager/lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ func lifetime(config *app.Config, r entities.PluginRuntimeInterface) {
start_failed_times := 0
configuration := r.Configuration()

// store plugin runtime
m.Store(configuration.Identity(), r)
defer m.Delete(configuration.Identity())

// update lifetime state for this pod
addLifetimeState(r)
defer func() {
// remove lifetime state after plugin if it has been stopped for $LIFETIME_STATE_GC_INTERVAL and not started again
time.AfterFunc(time.Duration(config.LifetimeStateGCInterval)*time.Second, func() {
if r.Stopped() {
deleteLifetimeState(r)
}
})
}()

// remove lifetime state after plugin if it has been stopped
defer deleteLifetimeState(r)

for !r.Stopped() {
if err := r.InitEnvironment(); err != nil {
Expand Down
1 change: 0 additions & 1 deletion internal/core/plugin_manager/local_connector.go

This file was deleted.

6 changes: 1 addition & 5 deletions internal/core/plugin_manager/local_manager/io.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package local_manager

import (
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/stdio_holder"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager/stdio_holder"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
)

Expand All @@ -20,7 +20,3 @@ func (r *LocalPluginRuntime) Listen(session_id string) *entities.BytesIOListener
func (r *LocalPluginRuntime) Write(session_id string, data []byte) {
stdio_holder.Write(r.io_identity, append(data, '\n'))
}

func (r *LocalPluginRuntime) Request(session_id string, data []byte) ([]byte, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion internal/core/plugin_manager/local_manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os/exec"
"sync"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/stdio_holder"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager/stdio_holder"
"github.com/langgenius/dify-plugin-daemon/internal/process"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
Expand Down Expand Up @@ -168,7 +169,7 @@ func (s *stdioHolder) Wait() error {
case <-ticker.C:
// check heartbeat
if time.Since(s.last_active_at) > 20*time.Second {
return errors.New("plugin is not active, does not respond to heartbeat in 20 seconds")
return plugin_errors.ErrPluginNotActive
}
case <-s.health_chan:
// closed
Expand Down
7 changes: 7 additions & 0 deletions internal/core/plugin_manager/plugin_errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package plugin_errors

import "errors"

var (
ErrPluginNotActive = errors.New("plugin is not active, does not respond to heartbeat in 20 seconds")
)
47 changes: 47 additions & 0 deletions internal/core/plugin_manager/remote_manager/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package remote_manager

import (
"bytes"
"errors"

"github.com/panjf2000/gnet/v2"
)

type codec struct {
buf bytes.Buffer
}

func (w *codec) Decode(c gnet.Conn) ([][]byte, error) {
size := c.InboundBuffered()
buf := make([]byte, size)
read, err := c.Read(buf)

if err != nil {
return nil, err
}

if read < size {
return nil, errors.New("read less than size")
}

return w.getLines(buf), nil
}

func (w *codec) getLines(data []byte) [][]byte {
// write to buffer
w.buf.Write(data)

// read line by line, split by \n, remaining data will be kept in buffer
lines := bytes.Split(w.buf.Bytes(), []byte("\n"))
w.buf.Reset()

// if last line is not complete, keep it in buffer
if len(lines[len(lines)-1]) != 0 {
w.buf.Write(lines[len(lines)-1])
lines = lines[:len(lines)-1]
} else if len(lines) > 0 {
lines = lines[:len(lines)-1]
}

return lines
}
21 changes: 21 additions & 0 deletions internal/core/plugin_manager/remote_manager/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package remote_manager

import "testing"

func TestCodec(t *testing.T) {
codec := &codec{}
liens := codec.getLines([]byte("test\n"))
if len(liens) != 1 {
t.Error("getLines failed")
}

liens = codec.getLines([]byte("test\ntest"))
if len(liens) == 2 {
t.Error("getLines failed")
}

liens = codec.getLines([]byte("\n"))
if len(liens) != 1 {
t.Error("getLines failed")
}
}
106 changes: 106 additions & 0 deletions internal/core/plugin_manager/remote_manager/hooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package remote_manager

import (
"sync"

"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
"github.com/panjf2000/gnet/v2"
)

type DifyServer struct {
gnet.BuiltinEventEngine

engine gnet.Engine

// listening address
addr string

// enabled multicore
multicore bool

// event loop count
num_loops int

// read new connections
response *stream.StreamResponse[*RemotePluginRuntime]

plugins map[int]*RemotePluginRuntime
plugins_lock *sync.RWMutex

shutdown_chan chan bool
}

func (s *DifyServer) OnBoot(c gnet.Engine) (action gnet.Action) {
s.engine = c
return gnet.None
}

func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
// new plugin connected
c.SetContext(&codec{})
runtime := &RemotePluginRuntime{
conn: c,
response: stream.NewStreamResponse[[]byte](512),
callbacks: make(map[string][]func([]byte)),
callbacks_lock: &sync.RWMutex{},

shutdown_chan: make(chan bool),

alive: true,
}

// store plugin runtime
s.plugins_lock.Lock()
s.plugins[c.Fd()] = runtime
s.plugins_lock.Unlock()

s.response.Write(runtime)

// verified
verified := true
if verified {
return nil, gnet.None
}

return nil, gnet.Close
}

func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
// plugin disconnected
s.plugins_lock.Lock()
plugin := s.plugins[c.Fd()]
delete(s.plugins, c.Fd())
s.plugins_lock.Unlock()

// close plugin
plugin.close()

return gnet.None
}

func (s *DifyServer) OnShutdown(c gnet.Engine) {
close(s.shutdown_chan)
}

func (s *DifyServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
codec := c.Context().(*codec)
messages, err := codec.Decode(c)
if err != nil {
return gnet.Close
}

// get plugin runtime
s.plugins_lock.RLock()
runtime, ok := s.plugins[c.Fd()]
s.plugins_lock.RUnlock()
if !ok {
return gnet.Close
}

// handle messages
for _, message := range messages {
runtime.response.Write(message)
}

return gnet.None
}
25 changes: 25 additions & 0 deletions internal/core/plugin_manager/remote_manager/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package remote_manager

import (
"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
"github.com/panjf2000/gnet/v2"
)

func (r *RemotePluginRuntime) Listen(session_id string) *entities.BytesIOListener {
listener := entities.NewIOListener[[]byte]()
listener.OnClose(func() {
r.removeCallback(session_id)
})

r.addCallback(session_id, func(data []byte) {
listener.Emit(data)
})

return listener
}

func (r *RemotePluginRuntime) Write(session_id string, data []byte) {
r.conn.AsyncWrite(append(data, '\n'), func(c gnet.Conn, err error) error {
return nil
})
}
101 changes: 101 additions & 0 deletions internal/core/plugin_manager/remote_manager/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package remote_manager

import (
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
)

func (r *RemotePluginRuntime) InitEnvironment() error {
return nil
}

func (r *RemotePluginRuntime) Stopped() bool {
return !r.alive
}

func (r *RemotePluginRuntime) Stop() {
r.alive = false
if r.conn == nil {
return
}
r.conn.Close()
}

func (r *RemotePluginRuntime) StartPlugin() error {
var exit_error error

// handle heartbeat
routine.Submit(func() {
r.last_active_at = time.Now()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if time.Since(r.last_active_at) > 20*time.Second {
// kill this connection
r.conn.Close()
exit_error = plugin_errors.ErrPluginNotActive
return
}
case <-r.shutdown_chan:
return
}
}
})

for r.response.Next() {
data, err := r.response.Read()
if err != nil {
return err
}

// handle event
event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginUniversalEvent](data)
if err != nil {
continue
}

session_id := event.SessionId

switch event.Event {
case plugin_entities.PLUGIN_EVENT_LOG:
if event.Event == plugin_entities.PLUGIN_EVENT_LOG {
log_event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginLogEvent](
event.Data,
)
if err != nil {
log.Error("unmarshal json failed: %s", err.Error())
continue
}

log.Info("plugin %s: %s", r.Configuration().Identity(), log_event.Message)
}
case plugin_entities.PLUGIN_EVENT_SESSION:
r.callbacks_lock.RLock()
listeners := r.callbacks[session_id][:]
r.callbacks_lock.RUnlock()

// handle session event
for _, listener := range listeners {
listener(event.Data)
}
case plugin_entities.PLUGIN_EVENT_ERROR:
log.Error("plugin %s: %s", r.Configuration().Identity(), event.Data)
case plugin_entities.PLUGIN_EVENT_HEARTBEAT:
r.last_active_at = time.Now()
}
}

return exit_error
}

func (r *RemotePluginRuntime) Wait() (<-chan bool, error) {
return r.shutdown_chan, nil
}
Loading

0 comments on commit 52f51e1

Please sign in to comment.