Skip to content

Commit

Permalink
feat: plugin assets
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Sep 4, 2024
1 parent 277ce20 commit d02ef5e
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 20 deletions.
4 changes: 3 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ func setDefault(config *app.Config) {
setDefaultInt(&config.MaxPluginPackageSize, 52428800)
setDefaultInt(&config.MaxAWSLambdaTransactionTimeout, 150)
setDefaultInt(&config.PluginMaxExecutionTimeout, 240)
setDefaultInt(&config.PluginMediaCacheSize, 1024)
setDefaultBool(&config.PluginRemoteInstallingEnabled, true)
setDefaultBool(&config.PluginEndpointEnabled, true)
setDefaultString(&config.DBSslMode, "disable")
setDefaultString(&config.ProcessCachingPath, "/tmp/dify-plugin-daemon-subprocesses")
setDefaultString(&config.PluginMediaCachePath, "/var/dify-plugin-daemon/media-cache")
setDefaultString(&config.ProcessCachingPath, "/var/dify-plugin-daemon/subprocesses")
}

func setDefaultInt[T constraints.Integer](value *T, defaultValue T) {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
Expand Down Expand Up @@ -85,6 +86,7 @@ require (
github.com/go-playground/universal-translator v0.18.1
github.com/go-playground/validator/v10 v10.22.0
github.com/goccy/go-json v0.10.3 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/joho/godotenv v1.5.1
github.com/json-iterator/go v1.1.12 // indirect
github.com/kelseyhightower/envconfig v1.4.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
Expand Down
12 changes: 12 additions & 0 deletions internal/core/plugin_manager/basic_manager/remap_assets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package basic_manager

import "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"

// RemapAssets will take the assets and remap them to a media id
func (r *BasicPluginRuntime) RemapAssets(
declaration *plugin_entities.PluginDeclaration,
assets map[string][]byte,
) error {
// TODO: implement
return nil
}
11 changes: 11 additions & 0 deletions internal/core/plugin_manager/basic_manager/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package basic_manager

import "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"

type BasicPluginRuntime struct {
mediaManager *media_manager.MediaManager
}

func NewBasicPluginRuntime(mediaManager *media_manager.MediaManager) BasicPluginRuntime {
return BasicPluginRuntime{mediaManager: mediaManager}
}
10 changes: 9 additions & 1 deletion internal/core/plugin_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/langgenius/dify-plugin-daemon/internal/cluster"
"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
Expand All @@ -22,6 +23,9 @@ type PluginManager struct {
maxPluginPackageSize int64
workingDirectory string

// mediaManager is used to manage media files like plugin icons, images, etc.
mediaManager *media_manager.MediaManager

// running plugin in storage contains relations between plugin packages and their running instances
runningPluginInStorage mapping.Map[string, string]
// start process lock
Expand All @@ -37,7 +41,11 @@ func InitGlobalPluginManager(cluster *cluster.Cluster, configuration *app.Config
cluster: cluster,
maxPluginPackageSize: configuration.MaxPluginPackageSize,
workingDirectory: configuration.PluginWorkingPath,
startProcessLock: lock.NewHighGranularityLock(),
mediaManager: media_manager.NewMediaManager(
configuration.PluginMediaCachePath,
configuration.PluginMediaCacheSize,
),
startProcessLock: lock.NewHighGranularityLock(),
}
manager.Init(configuration)
}
Expand Down
71 changes: 71 additions & 0 deletions internal/core/plugin_manager/media_manager/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package media_manager

import (
"crypto/sha256"
"encoding/hex"
"os"
"path"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/strings"
)

type MediaManager struct {
storagePath string
cache *lru.Cache[string, []byte]
}

func NewMediaManager(storage_path string, cache_size uint16) *MediaManager {
// mkdir -p storage_path
if err := os.MkdirAll(storage_path, 0o755); err != nil {
log.Error("Failed to create storage path: %s", err)
}

// lru.New only raises error when cache_size is a negative number, which is impossible
cache, _ := lru.New[string, []byte](int(cache_size))

return &MediaManager{storagePath: storage_path, cache: cache}
}

// Upload uploads a file to the media manager and returns an identifier
func (m *MediaManager) Upload(file []byte) (string, error) {
// calculate checksum
checksum := sha256.Sum256(append(file, []byte(strings.RandomString(10))...))

id := hex.EncodeToString(checksum[:])

// store locally
filePath := path.Join(m.storagePath, id)
err := os.WriteFile(filePath, file, 0o644)
if err != nil {
return "", err
}

return id, nil
}

func (m *MediaManager) Get(id string) ([]byte, error) {
// check if id is in cache
data, ok := m.cache.Get(id)
if ok {
return data, nil
}

// check if id is in storage
filePath := path.Join(m.storagePath, id)
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return nil, err
}

// read file
file, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}

// store in cache
m.cache.Add(id, file)

return file, nil
}
7 changes: 6 additions & 1 deletion internal/core/plugin_manager/positive_manager/types.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package positive_manager

import "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
import (
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
)

type PositivePluginRuntime struct {
basic_manager.BasicPluginRuntime

LocalPackagePath string
WorkingPath string
// plugin decoder used to manage the plugin
Expand Down
35 changes: 35 additions & 0 deletions internal/core/plugin_manager/remote_manager/hooks.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package remote_manager

import (
"encoding/hex"
"sync"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
Expand All @@ -17,6 +19,8 @@ type DifyServer struct {

engine gnet.Engine

mediaManager *media_manager.MediaManager

// listening address
addr string
port uint16
Expand Down Expand Up @@ -221,6 +225,33 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
declaration.Endpoint = &endpoints[0]
runtime.Config = declaration
}
} else if !runtime.assets_transferred {
assets, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.RemoteAssetPayload](message)
if err != nil {
runtime.conn.Write([]byte("assets register failed\n"))
log.Error("assets register failed, error: %v", err)
runtime.conn.Close()
return
}

files := make(map[string][]byte)
for _, asset := range assets {
files[asset.Filename], err = hex.DecodeString(asset.Data)
if err != nil {
runtime.conn.Write([]byte("assets decode failed\n"))
log.Error("assets decode failed, error: %v", err)
runtime.conn.Close()
return
}
}

// remap assets
if err := runtime.RemapAssets(&runtime.Config, files); err != nil {
runtime.conn.Write([]byte("assets remap failed\n"))
log.Error("assets remap failed, error: %v", err)
runtime.conn.Close()
return
}

runtime.checksum = runtime.calculateChecksum()
runtime.InitState()
Expand All @@ -241,3 +272,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
runtime.response.Write(message)
}
}

func (s *DifyServer) onAssets(runtime *RemotePluginRuntime, assets []plugin_entities.RemoteAssetPayload) {

}
14 changes: 8 additions & 6 deletions internal/core/plugin_manager/remote_manager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
"github.com/panjf2000/gnet/v2"
Expand Down Expand Up @@ -77,7 +78,7 @@ func (r *RemotePluginServer) Launch() error {
}

// NewRemotePluginServer creates a new RemotePluginServer
func NewRemotePluginServer(config *app.Config) *RemotePluginServer {
func NewRemotePluginServer(config *app.Config, media_manager *media_manager.MediaManager) *RemotePluginServer {
addr := fmt.Sprintf(
"tcp://%s:%d",
config.PluginRemoteInstallingHost,
Expand All @@ -90,11 +91,12 @@ func NewRemotePluginServer(config *app.Config) *RemotePluginServer {

multicore := true
s := &DifyServer{
addr: addr,
port: config.PluginRemoteInstallingPort,
multicore: multicore,
num_loops: config.PluginRemoteInstallServerEventLoopNums,
response: response,
mediaManager: media_manager,
addr: addr,
port: config.PluginRemoteInstallingPort,
multicore: multicore,
num_loops: config.PluginRemoteInstallServerEventLoopNums,
response: response,

plugins: make(map[int]*RemotePluginRuntime),
plugins_lock: &sync.RWMutex{},
Expand Down
2 changes: 1 addition & 1 deletion internal/core/plugin_manager/remote_manager/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func preparePluginServer(t *testing.T) (*RemotePluginServer, uint16) {
PluginRemoteInstallingPort: port,
PluginRemoteInstallingMaxConn: 1,
PluginRemoteInstallServerEventLoopNums: 8,
}), port
}, nil), port
}

// TestLaunchAndClosePluginServer tests the launch and close of the plugin server
Expand Down
3 changes: 3 additions & 0 deletions internal/core/plugin_manager/remote_manager/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"sync"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
"github.com/panjf2000/gnet/v2"
)

type RemotePluginRuntime struct {
basic_manager.BasicPluginRuntime
plugin_entities.PluginRuntime

// connection
Expand Down Expand Up @@ -38,6 +40,7 @@ type RemotePluginRuntime struct {
tools_registration_transferred bool
models_registration_transferred bool
endpoints_registration_transferred bool
assets_transferred bool

// tenant id
tenant_id string
Expand Down
17 changes: 10 additions & 7 deletions internal/core/plugin_manager/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
Expand All @@ -34,7 +35,7 @@ func (p *PluginManager) startLocalWatcher(config *app.Config) {
func (p *PluginManager) startRemoteWatcher(config *app.Config) {
// launch TCP debugging server if enabled
if config.PluginRemoteInstallingEnabled {
server := remote_manager.NewRemotePluginServer(config)
server := remote_manager.NewRemotePluginServer(config, p.mediaManager)
go func() {
err := server.Launch()
if err != nil {
Expand All @@ -58,18 +59,20 @@ func (p *PluginManager) handleNewPlugins(config *app.Config) {
plugin_interface = &aws_manager.AWSPluginRuntime{
PluginRuntime: plugin.Runtime,
PositivePluginRuntime: positive_manager.PositivePluginRuntime{
LocalPackagePath: plugin.Runtime.State.AbsolutePath,
WorkingPath: plugin.Runtime.State.WorkingPath,
Decoder: plugin.Decoder,
BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
LocalPackagePath: plugin.Runtime.State.AbsolutePath,
WorkingPath: plugin.Runtime.State.WorkingPath,
Decoder: plugin.Decoder,
},
}
} else if config.Platform == app.PLATFORM_LOCAL {
plugin_interface = &local_manager.LocalPluginRuntime{
PluginRuntime: plugin.Runtime,
PositivePluginRuntime: positive_manager.PositivePluginRuntime{
LocalPackagePath: plugin.Runtime.State.AbsolutePath,
WorkingPath: plugin.Runtime.State.WorkingPath,
Decoder: plugin.Decoder,
BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
LocalPackagePath: plugin.Runtime.State.AbsolutePath,
WorkingPath: plugin.Runtime.State.WorkingPath,
Decoder: plugin.Decoder,
},
}
} else {
Expand Down
8 changes: 5 additions & 3 deletions internal/types/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ type Config struct {

PluginEndpointEnabled bool `envconfig:"PLUGIN_ENDPOINT_ENABLED"`

PluginStoragePath string `envconfig:"STORAGE_PLUGIN_PATH" validate:"required"`
PluginWorkingPath string `envconfig:"PLUGIN_WORKING_PATH"`
ProcessCachingPath string `envconfig:"PROCESS_CACHING_PATH"`
PluginStoragePath string `envconfig:"STORAGE_PLUGIN_PATH" validate:"required"`
PluginWorkingPath string `envconfig:"PLUGIN_WORKING_PATH"`
PluginMediaCacheSize uint16 `envconfig:"PLUGIN_MEDIA_CACHE_SIZE"`
PluginMediaCachePath string `envconfig:"PLUGIN_MEDIA_CACHE_PATH"`
ProcessCachingPath string `envconfig:"PROCESS_CACHING_PATH"`

PluginMaxExecutionTimeout int `envconfig:"PLUGIN_MAX_EXECUTION_TIMEOUT" validate:"required"`

Expand Down
6 changes: 6 additions & 0 deletions internal/types/entities/plugin_entities/remote_entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package plugin_entities

type RemoteAssetPayload struct {
Filename string `json:"filename" validate:"required"`
Data string `json:"data" validate:"required"`
}

0 comments on commit d02ef5e

Please sign in to comment.