Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Sep 9, 2024
1 parent b3b20e7 commit 8dd58b3
Show file tree
Hide file tree
Showing 17 changed files with 259 additions and 133 deletions.
4 changes: 2 additions & 2 deletions internal/cluster/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type pluginLifeTime struct {
lifetime plugin_entities.PluginRuntimeTimeLifeInterface
lifetime plugin_entities.PluginLifetime
last_scheduled_at time.Time
}

Expand All @@ -22,7 +22,7 @@ type pluginState struct {
}

// RegisterPlugin registers a plugin to the cluster, and start to be scheduled
func (c *Cluster) RegisterPlugin(lifetime plugin_entities.PluginRuntimeTimeLifeInterface) error {
func (c *Cluster) RegisterPlugin(lifetime plugin_entities.PluginLifetime) error {
identity, err := lifetime.Identity()
if err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions internal/cluster/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/google/uuid"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
)

Expand Down Expand Up @@ -38,6 +39,13 @@ func (r *fakePlugin) Wait() (<-chan bool, error) {
return nil, nil
}

func (r *fakePlugin) Listen(string) *entities.Broadcast[plugin_entities.SessionMessage] {
return nil
}

func (r *fakePlugin) Write(string, []byte) {
}

func getRandomPluginRuntime() fakePlugin {
return fakePlugin{
PluginRuntime: plugin_entities.PluginRuntime{
Expand Down
17 changes: 7 additions & 10 deletions internal/core/plugin_manager/aws_manager/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ var (
)

func (r *AWSPluginRuntime) InitEnvironment() error {
if err := r.initEnvironment(); err != nil {
return err
}

// init http client
r.client = &http.Client{
Transport: &http.Transport{
Expand All @@ -38,7 +34,8 @@ func (r *AWSPluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, e
return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", r.Config.Identity(), r.Checksum())), nil
}

func (r *AWSPluginRuntime) initEnvironment() error {
// UploadPlugin uploads the plugin to the AWS Lambda
func (r *AWSPluginRuntime) UploadPlugin() error {
r.Log("Starting to initialize environment")
// check if the plugin has already been initialized, at most 300s
if err := cache.Lock(AWS_LAUNCH_LOCK_PREFIX+r.Checksum(), 300*time.Second, 300*time.Second); err != nil {
Expand All @@ -58,9 +55,9 @@ func (r *AWSPluginRuntime) initEnvironment() error {
}
} else {
// found, return directly
r.lambda_url = function.FunctionURL
r.lambda_name = function.FunctionName
r.Log(fmt.Sprintf("Found existing lambda function: %s", r.lambda_name))
r.LambdaURL = function.FunctionURL
r.LambdaName = function.FunctionName
r.Log(fmt.Sprintf("Found existing lambda function: %s", r.LambdaName))
return nil
}

Expand Down Expand Up @@ -91,9 +88,9 @@ func (r *AWSPluginRuntime) initEnvironment() error {
case Error:
return fmt.Errorf("error: %s", response.Message)
case LambdaUrl:
r.lambda_url = response.Message
r.LambdaURL = response.Message
case Lambda:
r.lambda_name = response.Message
r.LambdaName = response.Message
case Info:
r.Log(fmt.Sprintf("installing: %s", response.Message))
}
Expand Down
2 changes: 1 addition & 1 deletion internal/core/plugin_manager/aws_manager/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (r *AWSPluginRuntime) Write(session_id string, data []byte) {
return
}

url, err := url.JoinPath(r.lambda_url, "invoke")
url, err := url.JoinPath(r.LambdaURL, "invoke")
if err != nil {
r.Error(fmt.Sprintf("Error creating request: %v", err))
return
Expand Down
4 changes: 2 additions & 2 deletions internal/core/plugin_manager/aws_manager/packager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
)

type Packager struct {
runtime plugin_entities.PluginRuntimeInterface
runtime plugin_entities.PluginLifetime
decoder decoder.PluginDecoder
}

func NewPackager(runtime plugin_entities.PluginRuntimeInterface, decoder decoder.PluginDecoder) *Packager {
func NewPackager(runtime plugin_entities.PluginLifetime, decoder decoder.PluginDecoder) *Packager {
return &Packager{
runtime: runtime,
decoder: decoder,
Expand Down
4 changes: 2 additions & 2 deletions internal/core/plugin_manager/aws_manager/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type AWSPluginRuntime struct {
plugin_entities.PluginRuntime

// access url for the lambda function
lambda_url string
lambda_name string
LambdaURL string
LambdaName string

// listeners mapping session id to the listener
listeners mapping.Map[string, *entities.Broadcast[plugin_entities.SessionMessage]]
Expand Down
6 changes: 5 additions & 1 deletion internal/core/plugin_manager/lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
)

func (p *PluginManager) lifetime(r plugin_entities.PluginRuntimeInterface) {
func (p *PluginManager) localLifetime(r plugin_entities.PluginFullDuplexLifetime) {
configuration := r.Configuration()

log.Info("new plugin logged in: %s", configuration.Identity())
Expand Down Expand Up @@ -86,3 +86,7 @@ func (p *PluginManager) lifetime(r plugin_entities.PluginRuntimeInterface) {
r.AddRestarts()
}
}

func (p *PluginManager) serverlessLifetime(r plugin_entities.PluginServerlessLifetime, onStop func()) {
//
}
41 changes: 21 additions & 20 deletions internal/core/plugin_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package plugin_manager

import (
"fmt"
"sync"

"github.com/langgenius/dify-plugin-daemon/internal/cluster"
"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
Expand All @@ -16,7 +15,7 @@ import (
)

type PluginManager struct {
m sync.Map
m mapping.Map[string, plugin_entities.PluginLifetime]

cluster *cluster.Cluster

Expand All @@ -30,6 +29,7 @@ type PluginManager struct {
runningPluginInStorage mapping.Map[string, string]
// start process lock
startProcessLock *lock.HighGranularityLock
// serverless runtime
}

var (
Expand All @@ -54,33 +54,32 @@ func GetGlobalPluginManager() *PluginManager {
return manager
}

func (p *PluginManager) Add(plugin plugin_entities.PluginRuntimeInterface) error {
func (p *PluginManager) Add(
plugin plugin_entities.PluginLifetime,
) error {
identity, err := plugin.Identity()
if err != nil {
return err
}

p.m.Store(identity.String(), plugin)
return nil
}

func (p *PluginManager) List() []plugin_entities.PluginRuntimeInterface {
var runtimes []plugin_entities.PluginRuntimeInterface
p.m.Range(func(key, value interface{}) bool {
if v, ok := value.(plugin_entities.PluginRuntimeInterface); ok {
runtimes = append(runtimes, v)
}
return true
})
return runtimes
}

func (p *PluginManager) Get(identity plugin_entities.PluginUniqueIdentifier) plugin_entities.PluginRuntimeInterface {
func (p *PluginManager) Get(
identity plugin_entities.PluginUniqueIdentifier,
) plugin_entities.PluginLifetime {
if v, ok := p.m.Load(identity.String()); ok {
if r, ok := v.(plugin_entities.PluginRuntimeInterface); ok {
return r
}
return v
}
return nil

// check if plugin is a serverless runtime
plugin_session_interface, err := p.getServerlessPluginRuntime(identity)
if err != nil {
return nil
}

return plugin_session_interface
}

func (p *PluginManager) GetAsset(id string) ([]byte, error) {
Expand All @@ -106,7 +105,9 @@ func (p *PluginManager) Init(configuration *app.Config) {
}

// start local watcher
p.startLocalWatcher(configuration)
if configuration.Platform == app.PLATFORM_LOCAL {
p.startLocalWatcher(configuration)
}

// start remote watcher
p.startRemoteWatcher(configuration)
Expand Down
6 changes: 3 additions & 3 deletions internal/core/plugin_manager/positive_manager/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ func (r *PositivePluginRuntime) calculateChecksum() string {
}

func (r *PositivePluginRuntime) Checksum() string {
if r.checksum == "" {
r.checksum = r.calculateChecksum()
if r.InnerChecksum == "" {
r.InnerChecksum = r.calculateChecksum()
}

return r.checksum
return r.InnerChecksum
}

func (r *PositivePluginRuntime) Cleanup() {
Expand Down
2 changes: 1 addition & 1 deletion internal/core/plugin_manager/positive_manager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ type PositivePluginRuntime struct {
// plugin decoder used to manage the plugin
Decoder decoder.PluginDecoder

checksum string
InnerChecksum string
}
95 changes: 95 additions & 0 deletions internal/core/plugin_manager/serverless.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package plugin_manager

import (
"errors"
"fmt"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
"github.com/langgenius/dify-plugin-daemon/internal/db"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/types/models"
"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
)

const (
PLUGIN_SERVERLESS_CACHE_KEY = "serverless:runtime:%s"
)

func (p *PluginManager) getServerlessRuntimeCacheKey(
identity plugin_entities.PluginUniqueIdentifier,
) string {
return fmt.Sprintf(PLUGIN_SERVERLESS_CACHE_KEY, identity.String())
}

func (p *PluginManager) getServerlessPluginRuntime(
identity plugin_entities.PluginUniqueIdentifier,
) (plugin_entities.PluginLifetime, error) {
model, err := p.getServerlessPluginRuntimeModel(identity)
if err != nil {
return nil, err
}

declaration, err := model.GetDeclaration()
if err != nil {
return nil, err
}

// init runtime entity
runtime_entity := plugin_entities.PluginRuntime{
Config: *declaration,
}
runtime_entity.InitState()

// convert to plugin runtime
plugin_runtime := aws_manager.AWSPluginRuntime{
PositivePluginRuntime: positive_manager.PositivePluginRuntime{
BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
InnerChecksum: model.Checksum,
},
PluginRuntime: runtime_entity,
LambdaURL: model.FunctionURL,
LambdaName: model.FunctionName,
}

if err := plugin_runtime.InitEnvironment(); err != nil {
return nil, err
}

return &plugin_runtime, nil
}

func (p *PluginManager) getServerlessPluginRuntimeModel(
identity plugin_entities.PluginUniqueIdentifier,
) (*models.ServerlessRuntime, error) {
// check if plugin is a serverless runtime
runtime, err := cache.Get[models.ServerlessRuntime](
p.getServerlessRuntimeCacheKey(identity),
)
if err != nil && err != cache.ErrNotFound {
return nil, errors.New("plugin not found")
}

if err == cache.ErrNotFound {
runtime_model, err := db.GetOne[models.ServerlessRuntime](
db.Equal("plugin_unique_identifier", identity.String()),
)

if err == db.ErrDatabaseNotFound {
return nil, errors.New("plugin not found")
}

if err != nil {
return nil, fmt.Errorf("failed to load serverless runtime from db: %v", err)
}

cache.Store(p.getServerlessRuntimeCacheKey(identity), runtime_model, time.Minute*30)
runtime = &runtime_model
} else if err != nil {
return nil, fmt.Errorf("failed to load serverless runtime from cache: %v", err)
}

return runtime, nil
}
Loading

0 comments on commit 8dd58b3

Please sign in to comment.