From 3603a9c5b9a86b82944a703d21df7fd839c5989e Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Wed, 4 Sep 2024 14:39:07 +0800 Subject: [PATCH] refactor --- internal/cluster/plugin.go | 4 +-- internal/cluster/plugin_test.go | 4 +-- .../backwards_invocation/task.go | 2 +- internal/core/plugin_daemon/generic.go | 2 +- .../plugin_manager/aws_manager/environment.go | 4 +-- .../aws_manager/packager_test.go | 4 +-- .../local_manager/environment.go | 9 +++--- .../local_manager/environment_python.go | 19 +++++++++++ .../core/plugin_manager/local_manager/run.go | 5 ++- .../local_manager/stdio_handle.go | 22 ++++++------- .../local_manager/stdio_store.go | 14 ++++---- internal/core/plugin_manager/manager.go | 4 +-- .../remote_manager/environment.go | 4 +-- .../remote_manager/server_test.go | 4 --- internal/core/session_manager/session.go | 32 +++++++++---------- internal/server/controllers/endpoint.go | 20 ++++++------ internal/server/endpoint.go | 9 ++++-- internal/server/middleware.go | 12 +++---- internal/service/endpoint.go | 6 ++-- internal/service/install_service/state.go | 8 ++--- internal/service/invoke_tool.go | 6 ++-- internal/service/setup_endpoint.go | 10 +++--- .../entities/plugin_entities/identity.go | 6 ++-- .../plugin_entities/plugin_declaration.go | 8 +---- .../plugin_declaration_test.go | 4 --- .../types/entities/plugin_entities/request.go | 11 +++---- .../types/entities/plugin_entities/runtime.go | 2 +- internal/types/models/curd/atomic.go | 32 +++++++++---------- internal/types/models/installation.go | 10 +++--- internal/types/models/plugin.go | 4 +-- internal/utils/parser/identity.go | 2 +- 31 files changed, 147 insertions(+), 136 deletions(-) create mode 100644 internal/core/plugin_manager/local_manager/environment_python.go diff --git a/internal/cluster/plugin.go b/internal/cluster/plugin.go index 0fdb109..68650e7 100644 --- a/internal/cluster/plugin.go +++ b/internal/cluster/plugin.go @@ -241,7 +241,7 @@ func (c *Cluster) autoGCPlugins() error { ) } -func (c *Cluster) IsPluginNoCurrentNode(identity string) bool { - _, ok := c.plugins.Load(identity) +func (c *Cluster) IsPluginNoCurrentNode(identity plugin_entities.PluginUniqueIdentifier) bool { + _, ok := c.plugins.Load(identity.String()) return ok } diff --git a/internal/cluster/plugin_test.go b/internal/cluster/plugin_test.go index 7cfb281..5833c90 100644 --- a/internal/cluster/plugin_test.go +++ b/internal/cluster/plugin_test.go @@ -22,8 +22,8 @@ func (r *fakePlugin) Checksum() string { return "" } -func (r *fakePlugin) Identity() (plugin_entities.PluginIdentity, error) { - return plugin_entities.PluginIdentity(""), nil +func (r *fakePlugin) Identity() (plugin_entities.PluginUniqueIdentifier, error) { + return plugin_entities.PluginUniqueIdentifier(""), nil } func (r *fakePlugin) StartPlugin() error { diff --git a/internal/core/plugin_daemon/backwards_invocation/task.go b/internal/core/plugin_daemon/backwards_invocation/task.go index 8eca11b..620f04e 100644 --- a/internal/core/plugin_daemon/backwards_invocation/task.go +++ b/internal/core/plugin_daemon/backwards_invocation/task.go @@ -389,7 +389,7 @@ func executeDifyInvocationStorageTask( return } - plugin_id := handle.session.PluginIdentity + plugin_id := handle.session.PluginUniqueIdentifier if request.Opt == dify_invocation.STORAGE_OPT_GET { data, err := persistence.Load(tenant_id, plugin_id.PluginID(), request.Key) diff --git a/internal/core/plugin_daemon/generic.go b/internal/core/plugin_daemon/generic.go index 7794659..491356c 100644 --- a/internal/core/plugin_daemon/generic.go +++ b/internal/core/plugin_daemon/generic.go @@ -18,7 +18,7 @@ func genericInvokePlugin[Req any, Rsp any]( request *Req, response_buffer_size int, ) (*stream.StreamResponse[Rsp], error) { - runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity.String()) + runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginUniqueIdentifier) if runtime == nil { return nil, errors.New("plugin not found") } diff --git a/internal/core/plugin_manager/aws_manager/environment.go b/internal/core/plugin_manager/aws_manager/environment.go index 9e949f7..a25b859 100644 --- a/internal/core/plugin_manager/aws_manager/environment.go +++ b/internal/core/plugin_manager/aws_manager/environment.go @@ -34,8 +34,8 @@ func (r *AWSPluginRuntime) InitEnvironment() error { return nil } -func (r *AWSPluginRuntime) Identity() (plugin_entities.PluginIdentity, error) { - return plugin_entities.PluginIdentity(fmt.Sprintf("%s@%s", r.Config.Identity(), r.Checksum())), nil +func (r *AWSPluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) { + return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", r.Config.Identity(), r.Checksum())), nil } func (r *AWSPluginRuntime) initEnvironment() error { diff --git a/internal/core/plugin_manager/aws_manager/packager_test.go b/internal/core/plugin_manager/aws_manager/packager_test.go index b1b9426..1765531 100644 --- a/internal/core/plugin_manager/aws_manager/packager_test.go +++ b/internal/core/plugin_manager/aws_manager/packager_test.go @@ -31,8 +31,8 @@ func (r *TPluginRuntime) Checksum() string { return "test_checksum" } -func (r *TPluginRuntime) Identity() (plugin_entities.PluginIdentity, error) { - return plugin_entities.PluginIdentity("test_identity"), nil +func (r *TPluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) { + return plugin_entities.PluginUniqueIdentifier("test_identity"), nil } func (r *TPluginRuntime) StartPlugin() error { diff --git a/internal/core/plugin_manager/local_manager/environment.go b/internal/core/plugin_manager/local_manager/environment.go index 44c942b..f5542f1 100644 --- a/internal/core/plugin_manager/local_manager/environment.go +++ b/internal/core/plugin_manager/local_manager/environment.go @@ -20,8 +20,9 @@ func (r *LocalPluginRuntime) InitEnvironment() error { return nil } - // execute init command - handle := exec.Command("bash", r.Config.Execution.Install) + // execute init command, create + // TODO + handle := exec.Command("bash") handle.Dir = r.State.AbsolutePath handle.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} @@ -125,6 +126,6 @@ func (r *LocalPluginRuntime) InitEnvironment() error { return nil } -func (r *LocalPluginRuntime) Identity() (plugin_entities.PluginIdentity, error) { - return plugin_entities.PluginIdentity(fmt.Sprintf("%s@%s", r.Config.Identity(), r.Checksum())), nil +func (r *LocalPluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) { + return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", r.Config.Identity(), r.Checksum())), nil } diff --git a/internal/core/plugin_manager/local_manager/environment_python.go b/internal/core/plugin_manager/local_manager/environment_python.go new file mode 100644 index 0000000..4ae563d --- /dev/null +++ b/internal/core/plugin_manager/local_manager/environment_python.go @@ -0,0 +1,19 @@ +package local_manager + +import "os/exec" + +func (p *LocalPluginRuntime) InitPythonEnvironment(requirements_txt string) error { + // create virtual env + identity, err := p.Identity() + if err != nil { + return err + } + + cmd := exec.Command("python", "-m", "venv", identity.String()) + + // set working directory + cmd.Dir = p.WorkingPath + + // TODO + return nil +} diff --git a/internal/core/plugin_manager/local_manager/run.go b/internal/core/plugin_manager/local_manager/run.go index 9d5dabe..c31384d 100644 --- a/internal/core/plugin_manager/local_manager/run.go +++ b/internal/core/plugin_manager/local_manager/run.go @@ -38,11 +38,14 @@ func (r *LocalPluginRuntime) StartPlugin() error { r.init() // start plugin - e := exec.Command("bash", r.Config.Execution.Launch) + // TODO: use exec.Command("bash") instead of exec.Command("bash", r.Config.Execution.Launch) + e := exec.Command("bash") e.Dir = r.State.AbsolutePath // add env INSTALL_METHOD=local e.Env = append(e.Env, "INSTALL_METHOD=local", "PATH="+os.Getenv("PATH")) + // NOTE: subprocess will be taken care of by subprocess manager + // ensure all subprocess are killed when parent process exits, especially on Golang debugger process.WrapProcess(e) // get writer diff --git a/internal/core/plugin_manager/local_manager/stdio_handle.go b/internal/core/plugin_manager/local_manager/stdio_handle.go index f0cddfe..f0e13f0 100644 --- a/internal/core/plugin_manager/local_manager/stdio_handle.go +++ b/internal/core/plugin_manager/local_manager/stdio_handle.go @@ -21,15 +21,15 @@ var ( ) type stdioHolder struct { - id string - plugin_identity string - writer io.WriteCloser - reader io.ReadCloser - err_reader io.ReadCloser - l *sync.Mutex - listener map[string]func([]byte) - error_listener map[string]func([]byte) - started bool + id string + plugin_unique_identifier string + writer io.WriteCloser + reader io.ReadCloser + err_reader io.ReadCloser + l *sync.Mutex + listener map[string]func([]byte) + error_listener map[string]func([]byte) + started bool err_message string last_err_message_updated_at time.Time @@ -94,7 +94,7 @@ func (s *stdioHolder) StartStdout() { continue } - log.Info("plugin %s: %s", s.plugin_identity, logEvent.Message) + log.Info("plugin %s: %s", s.plugin_unique_identifier, logEvent.Message) } case plugin_entities.PLUGIN_EVENT_SESSION: for _, listener := range listeners { @@ -107,7 +107,7 @@ func (s *stdioHolder) StartStdout() { } } case plugin_entities.PLUGIN_EVENT_ERROR: - log.Error("plugin %s: %s", s.plugin_identity, event.Data) + log.Error("plugin %s: %s", s.plugin_unique_identifier, event.Data) case plugin_entities.PLUGIN_EVENT_HEARTBEAT: s.last_active_at = time.Now() } diff --git a/internal/core/plugin_manager/local_manager/stdio_store.go b/internal/core/plugin_manager/local_manager/stdio_store.go index 0d4a01d..10d359f 100644 --- a/internal/core/plugin_manager/local_manager/stdio_store.go +++ b/internal/core/plugin_manager/local_manager/stdio_store.go @@ -8,18 +8,18 @@ import ( ) func PutStdioIo( - plugin_identity string, writer io.WriteCloser, + plugin_unique_identifier string, writer io.WriteCloser, reader io.ReadCloser, err_reader io.ReadCloser, ) *stdioHolder { id := uuid.New().String() holder := &stdioHolder{ - plugin_identity: plugin_identity, - writer: writer, - reader: reader, - err_reader: err_reader, - id: id, - l: &sync.Mutex{}, + plugin_unique_identifier: plugin_unique_identifier, + writer: writer, + reader: reader, + err_reader: err_reader, + id: id, + l: &sync.Mutex{}, health_chan_lock: &sync.Mutex{}, health_chan: make(chan bool), diff --git a/internal/core/plugin_manager/manager.go b/internal/core/plugin_manager/manager.go index 4a3eb38..cfcc9eb 100644 --- a/internal/core/plugin_manager/manager.go +++ b/internal/core/plugin_manager/manager.go @@ -58,8 +58,8 @@ func (p *PluginManager) List() []plugin_entities.PluginRuntimeInterface { return runtimes } -func (p *PluginManager) Get(identity string) plugin_entities.PluginRuntimeInterface { - if v, ok := p.m.Load(identity); ok { +func (p *PluginManager) Get(identity plugin_entities.PluginUniqueIdentifier) plugin_entities.PluginRuntimeInterface { + if v, ok := p.m.Load(identity.String()); ok { if r, ok := v.(plugin_entities.PluginRuntimeInterface); ok { return r } diff --git a/internal/core/plugin_manager/remote_manager/environment.go b/internal/core/plugin_manager/remote_manager/environment.go index c4d3c12..8b76afb 100644 --- a/internal/core/plugin_manager/remote_manager/environment.go +++ b/internal/core/plugin_manager/remote_manager/environment.go @@ -7,9 +7,9 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities" ) -func (r *RemotePluginRuntime) Identity() (plugin_entities.PluginIdentity, error) { +func (r *RemotePluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) { identity := strings.Join([]string{r.Configuration().Identity(), r.tenant_id}, ":") - return plugin_entities.PluginIdentity(fmt.Sprintf("%s@%s", identity, r.Checksum())), nil + return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", identity, r.Checksum())), nil } func (r *RemotePluginRuntime) Cleanup() { diff --git a/internal/core/plugin_manager/remote_manager/server_test.go b/internal/core/plugin_manager/remote_manager/server_test.go index 0c8532a..cc39073 100644 --- a/internal/core/plugin_manager/remote_manager/server_test.go +++ b/internal/core/plugin_manager/remote_manager/server_test.go @@ -151,10 +151,6 @@ func TestAcceptConnection(t *testing.T) { Plugins: []string{ "test", }, - Execution: plugin_entities.PluginExecution{ - Install: "echo 'hello'", - Launch: "echo 'hello'", - }, Meta: plugin_entities.PluginMeta{ Version: "0.0.1", Arch: []constants.Arch{ diff --git a/internal/core/session_manager/session.go b/internal/core/session_manager/session.go index 02ed8a4..e771374 100644 --- a/internal/core/session_manager/session.go +++ b/internal/core/session_manager/session.go @@ -26,13 +26,13 @@ type Session struct { runtime plugin_entities.PluginRuntimeInterface `json:"-"` persistence *persistence.Persistence `json:"-"` - TenantID string `json:"tenant_id"` - UserID string `json:"user_id"` - PluginIdentity plugin_entities.PluginIdentity `json:"plugin_identity"` - ClusterID string `json:"cluster_id"` - InvokeFrom access_types.PluginAccessType `json:"invoke_from"` - Action access_types.PluginAccessAction `json:"action"` - Declaration *plugin_entities.PluginDeclaration `json:"declaration"` + TenantID string `json:"tenant_id"` + UserID string `json:"user_id"` + PluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier `json:"plugin_unique_identifier"` + ClusterID string `json:"cluster_id"` + InvokeFrom access_types.PluginAccessType `json:"invoke_from"` + Action access_types.PluginAccessAction `json:"action"` + Declaration *plugin_entities.PluginDeclaration `json:"declaration"` } func sessionKey(id string) string { @@ -42,21 +42,21 @@ func sessionKey(id string) string { func NewSession( tenant_id string, user_id string, - plugin_identity plugin_entities.PluginIdentity, + plugin_unique_identifier plugin_entities.PluginUniqueIdentifier, cluster_id string, invoke_from access_types.PluginAccessType, action access_types.PluginAccessAction, declaration *plugin_entities.PluginDeclaration, ) *Session { s := &Session{ - ID: uuid.New().String(), - TenantID: tenant_id, - UserID: user_id, - PluginIdentity: plugin_identity, - ClusterID: cluster_id, - InvokeFrom: invoke_from, - Action: action, - Declaration: declaration, + ID: uuid.New().String(), + TenantID: tenant_id, + UserID: user_id, + PluginUniqueIdentifier: plugin_unique_identifier, + ClusterID: cluster_id, + InvokeFrom: invoke_from, + Action: action, + Declaration: declaration, } session_lock.Lock() diff --git a/internal/server/controllers/endpoint.go b/internal/server/controllers/endpoint.go index 4a47153..ea586ee 100644 --- a/internal/server/controllers/endpoint.go +++ b/internal/server/controllers/endpoint.go @@ -8,18 +8,18 @@ import ( func SetupEndpoint(ctx *gin.Context) { BindRequest(ctx, func(request struct { - PluginIdentity string `json:"plugin_identity" binding:"required"` - TenantID string `json:"tenant_id" binding:"required"` - UserID string `json:"user_id" binding:"required"` - Settings map[string]any `json:"settings" binding:"omitempty"` + PluginUniqueIdentifier string `json:"plugin_unique_identifier" binding:"required"` + TenantID string `json:"tenant_id" binding:"required"` + UserID string `json:"user_id" binding:"required"` + Settings map[string]any `json:"settings" binding:"omitempty"` }) { - plugin_identity := request.PluginIdentity + plugin_unique_identifier := request.PluginUniqueIdentifier tenant_id := request.TenantID user_id := request.UserID settings := request.Settings ctx.JSON(200, service.SetupEndpoint( - tenant_id, user_id, plugin_entities.PluginIdentity(plugin_identity), settings, + tenant_id, user_id, plugin_entities.PluginUniqueIdentifier(plugin_unique_identifier), settings, )) }) } @@ -40,12 +40,12 @@ func ListEndpoints(ctx *gin.Context) { func RemoveEndpoint(ctx *gin.Context) { BindRequest(ctx, func(request struct { - PluginIdentity string `json:"plugin_identity"` - TenantID string `json:"tenant_id"` + PluginUniqueIdentifier string `json:"plugin_unique_identifier"` + TenantID string `json:"tenant_id"` }) { - plugin_identity := request.PluginIdentity + plugin_unique_identifier := request.PluginUniqueIdentifier tenant_id := request.TenantID - ctx.JSON(200, service.RemoveEndpoint(plugin_identity, tenant_id)) + ctx.JSON(200, service.RemoveEndpoint(plugin_unique_identifier, tenant_id)) }) } diff --git a/internal/server/endpoint.go b/internal/server/endpoint.go index 354395e..3888bf9 100644 --- a/internal/server/endpoint.go +++ b/internal/server/endpoint.go @@ -4,6 +4,7 @@ import ( "github.com/gin-gonic/gin" "github.com/langgenius/dify-plugin-daemon/internal/db" "github.com/langgenius/dify-plugin-daemon/internal/service" + "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities" "github.com/langgenius/dify-plugin-daemon/internal/types/models" "github.com/langgenius/dify-plugin-daemon/internal/utils/log" ) @@ -54,8 +55,12 @@ func (app *App) EndpointHandler(ctx *gin.Context, hook_id string, path string) { } // check if plugin exists in current node - if !app.cluster.IsPluginNoCurrentNode(plugin_installation.PluginIdentity) { - app.redirectPluginInvokeByPluginID(ctx, plugin_installation.PluginIdentity) + if !app.cluster.IsPluginNoCurrentNode( + plugin_entities.PluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier), + ) { + app.redirectPluginInvokeByPluginID(ctx, plugin_entities.PluginUniqueIdentifier( + plugin_installation.PluginUniqueIdentifier, + )) } else { service.Endpoint(ctx, &endpoint, &plugin_installation, path) } diff --git a/internal/server/middleware.go b/internal/server/middleware.go index c7fd020..bff8777 100644 --- a/internal/server/middleware.go +++ b/internal/server/middleware.go @@ -49,20 +49,18 @@ func (app *App) RedirectPluginInvoke() gin.HandlerFunc { reader: bytes.NewReader(raw), } - identity, err := parser.UnmarshalJsonBytes[plugin_entities.InvokePluginPluginIdentity](raw) + identity, err := parser.UnmarshalJsonBytes[plugin_entities.BasePluginIdentifier](raw) if err != nil { ctx.AbortWithStatusJSON(400, gin.H{"error": "Invalid request"}) return } - plugin_id := parser.MarshalPluginIdentity(identity.PluginName, identity.PluginVersion) - // check if plugin in current node if !app.cluster.IsPluginNoCurrentNode( - plugin_id, + identity.PluginUniqueIdentifier, ) { - app.redirectPluginInvokeByPluginID(ctx, plugin_id) + app.redirectPluginInvokeByPluginID(ctx, identity.PluginUniqueIdentifier) ctx.Abort() } else { ctx.Next() @@ -70,9 +68,9 @@ func (app *App) RedirectPluginInvoke() gin.HandlerFunc { } } -func (app *App) redirectPluginInvokeByPluginID(ctx *gin.Context, plugin_id string) { +func (app *App) redirectPluginInvokeByPluginID(ctx *gin.Context, plugin_id plugin_entities.PluginUniqueIdentifier) { // try find the correct node - nodes, err := app.cluster.FetchPluginAvailableNodesById(plugin_id) + nodes, err := app.cluster.FetchPluginAvailableNodesById(plugin_id.PluginID()) if err != nil { ctx.AbortWithStatusJSON(500, gin.H{"error": "Internal server error"}) log.Error("fetch plugin available nodes failed: %s", err.Error()) diff --git a/internal/service/endpoint.go b/internal/service/endpoint.go index 69d362b..249acd6 100644 --- a/internal/service/endpoint.go +++ b/internal/service/endpoint.go @@ -37,7 +37,9 @@ func Endpoint( // fetch plugin manager := plugin_manager.GetGlobalPluginManager() - runtime := manager.Get(plugin_installation.PluginIdentity) + runtime := manager.Get( + plugin_entities.PluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier), + ) if runtime == nil { ctx.JSON(404, gin.H{"error": "plugin not found"}) return @@ -74,7 +76,7 @@ func Endpoint( session := session_manager.NewSession( endpoint.TenantID, "", - plugin_entities.PluginIdentity(plugin_installation.PluginIdentity), + plugin_entities.PluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier), ctx.GetString("cluster_id"), access_types.PLUGIN_ACCESS_TYPE_ENDPOINT, access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT, diff --git a/internal/service/install_service/state.go b/internal/service/install_service/state.go index 6d8e1f1..afb9058 100644 --- a/internal/service/install_service/state.go +++ b/internal/service/install_service/state.go @@ -40,11 +40,11 @@ func InstallPlugin( func UninstallPlugin( tenant_id string, installation_id string, - plugin_identity plugin_entities.PluginIdentity, + plugin_unique_identifier plugin_entities.PluginUniqueIdentifier, install_type plugin_entities.PluginRuntimeType, ) error { // delete the plugin from db - _, err := curd.DeletePlugin(tenant_id, plugin_identity, installation_id) + _, err := curd.DeletePlugin(tenant_id, plugin_unique_identifier, installation_id) if err != nil { return err } @@ -52,7 +52,7 @@ func UninstallPlugin( // delete endpoints if plugin is not installed through remote if install_type != plugin_entities.PLUGIN_RUNTIME_TYPE_REMOTE { if err := db.DeleteByCondition(models.Endpoint{ - PluginID: plugin_identity.PluginID(), + PluginID: plugin_unique_identifier.PluginID(), TenantID: tenant_id, }); err != nil { return err @@ -65,7 +65,7 @@ func UninstallPlugin( // setup a plugin to db, // returns the endpoint id func InstallEndpoint( - plugin_id plugin_entities.PluginIdentity, + plugin_id plugin_entities.PluginUniqueIdentifier, installation_id string, tenant_id string, user_id string, diff --git a/internal/service/invoke_tool.go b/internal/service/invoke_tool.go index 187457c..fecbd40 100644 --- a/internal/service/invoke_tool.go +++ b/internal/service/invoke_tool.go @@ -9,7 +9,6 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities" "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities" "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests" - "github.com/langgenius/dify-plugin-daemon/internal/utils/parser" "github.com/langgenius/dify-plugin-daemon/internal/utils/stream" ) @@ -19,13 +18,12 @@ func createSession[T any]( access_action access_types.PluginAccessAction, cluster_id string, ) (*session_manager.Session, error) { - plugin_identity := parser.MarshalPluginIdentity(r.PluginName, r.PluginVersion) - runtime := plugin_manager.GetGlobalPluginManager().Get(plugin_identity) + runtime := plugin_manager.GetGlobalPluginManager().Get(r.PluginUniqueIdentifier) session := session_manager.NewSession( r.TenantId, r.UserId, - plugin_entities.PluginIdentity(plugin_identity), + r.PluginUniqueIdentifier, cluster_id, access_type, access_action, diff --git a/internal/service/setup_endpoint.go b/internal/service/setup_endpoint.go index ed9830e..f8d809d 100644 --- a/internal/service/setup_endpoint.go +++ b/internal/service/setup_endpoint.go @@ -14,13 +14,13 @@ import ( func SetupEndpoint( tenant_id string, user_id string, - plugin_identity plugin_entities.PluginIdentity, + plugin_unique_identifier plugin_entities.PluginUniqueIdentifier, settings map[string]any, ) *entities.Response { // try find plugin installation installation, err := db.GetOne[models.PluginInstallation]( db.Equal("tenant_id", tenant_id), - db.Equal("plugin_identity", plugin_identity.String()), + db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()), ) if err != nil { return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin installation: %v", err)) @@ -28,7 +28,7 @@ func SetupEndpoint( // try get plugin plugin, err := db.GetOne[models.Plugin]( - db.Equal("plugin_identity", plugin_identity.String()), + db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()), ) if err != nil { return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin: %v", err)) @@ -70,7 +70,7 @@ func SetupEndpoint( } _, err = install_service.InstallEndpoint( - plugin_identity, + plugin_unique_identifier, installation.ID, tenant_id, user_id, @@ -104,7 +104,7 @@ func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response } plugin, err := db.GetOne[models.Plugin]( - db.Equal("plugin_identity", plugin_installation.PluginIdentity), + db.Equal("plugin_unique_identifier", plugin_installation.PluginUniqueIdentifier), ) if err != nil { return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin: %v", err)) diff --git a/internal/types/entities/plugin_entities/identity.go b/internal/types/entities/plugin_entities/identity.go index a95e9a0..f7e40ba 100644 --- a/internal/types/entities/plugin_entities/identity.go +++ b/internal/types/entities/plugin_entities/identity.go @@ -2,9 +2,9 @@ package plugin_entities import "strings" -type PluginIdentity string +type PluginUniqueIdentifier string -func (p PluginIdentity) PluginID() string { +func (p PluginUniqueIdentifier) PluginID() string { // try find @ split := strings.Split(p.String(), "@") if len(split) == 2 { @@ -13,6 +13,6 @@ func (p PluginIdentity) PluginID() string { return p.String() } -func (p PluginIdentity) String() string { +func (p PluginUniqueIdentifier) String() string { return string(p) } diff --git a/internal/types/entities/plugin_entities/plugin_declaration.go b/internal/types/entities/plugin_entities/plugin_declaration.go index 7156cfb..438778b 100644 --- a/internal/types/entities/plugin_entities/plugin_declaration.go +++ b/internal/types/entities/plugin_entities/plugin_declaration.go @@ -126,11 +126,6 @@ type PluginMeta struct { Runner PluginRunner `json:"runner" yaml:"runner" validate:"required"` } -type PluginExecution struct { - Install string `json:"install" yaml:"install" validate:"omitempty"` - Launch string `json:"launch" yaml:"launch" validate:"omitempty"` -} - type PluginDeclarationWithoutAdvancedFields struct { Version string `json:"version" yaml:"version,omitempty" validate:"required,version"` Type DifyManifestType `json:"type" yaml:"type,omitempty" validate:"required,eq=plugin"` @@ -140,7 +135,6 @@ type PluginDeclarationWithoutAdvancedFields struct { CreatedAt time.Time `json:"created_at" yaml:"created_at,omitempty" validate:"required"` Resource PluginResourceRequirement `json:"resource" yaml:"resource,omitempty" validate:"required"` Plugins []string `json:"plugins" yaml:"plugins,omitempty" validate:"required,dive,max=128"` - Execution PluginExecution `json:"execution" yaml:"execution,omitempty" validate:"required"` Meta PluginMeta `json:"meta" yaml:"meta,omitempty" validate:"required"` } @@ -168,7 +162,7 @@ func isPluginName(fl validator.FieldLevel) bool { } func (p *PluginDeclaration) Identity() string { - return parser.MarshalPluginIdentity(p.Name, p.Version) + return parser.MarshalPluginUniqueIdentifier(p.Name, p.Version) } func (p *PluginDeclaration) ManifestValidate() error { diff --git a/internal/types/entities/plugin_entities/plugin_declaration_test.go b/internal/types/entities/plugin_entities/plugin_declaration_test.go index d183d4b..21cf24d 100644 --- a/internal/types/entities/plugin_entities/plugin_declaration_test.go +++ b/internal/types/entities/plugin_entities/plugin_declaration_test.go @@ -38,10 +38,6 @@ func preparePluginDeclaration() PluginDeclaration { }, }, Plugins: []string{}, - Execution: PluginExecution{ - Install: "echo 'hello'", - Launch: "echo 'hello'", - }, Meta: PluginMeta{ Version: "0.0.1", Arch: []constants.Arch{ diff --git a/internal/types/entities/plugin_entities/request.go b/internal/types/entities/plugin_entities/request.go index bd27794..848c51e 100644 --- a/internal/types/entities/plugin_entities/request.go +++ b/internal/types/entities/plugin_entities/request.go @@ -1,18 +1,17 @@ package plugin_entities -type InvokePluginPluginIdentity struct { - PluginName string `json:"plugin_name" binding:"required"` - PluginVersion string `json:"plugin_version" binding:"required"` -} - type InvokePluginUserIdentity struct { TenantId string `json:"tenant_id" binding:"required"` UserId string `json:"user_id" binding:"required"` } +type BasePluginIdentifier struct { + PluginUniqueIdentifier PluginUniqueIdentifier `json:"plugin_unique_identifier" binding:"required"` +} + type InvokePluginRequest[T any] struct { - InvokePluginPluginIdentity InvokePluginUserIdentity + BasePluginIdentifier Data T `json:"data" binding:"required"` } diff --git a/internal/types/entities/plugin_entities/runtime.go b/internal/types/entities/plugin_entities/runtime.go index 9232260..26e1083 100644 --- a/internal/types/entities/plugin_entities/runtime.go +++ b/internal/types/entities/plugin_entities/runtime.go @@ -30,7 +30,7 @@ type ( // returns the plugin configuration Configuration() *PluginDeclaration // unique identity of the plugin - Identity() (PluginIdentity, error) + Identity() (PluginUniqueIdentifier, error) // hashed identity of the plugin HashedIdentity() (string, error) // before the plugin starts, it will call this method to initialize the environment diff --git a/internal/types/models/curd/atomic.go b/internal/types/models/curd/atomic.go index 793c9ad..8fbe92b 100644 --- a/internal/types/models/curd/atomic.go +++ b/internal/types/models/curd/atomic.go @@ -16,7 +16,7 @@ import ( func CreatePlugin( tenant_id string, user_id string, - plugin_identity plugin_entities.PluginIdentity, + plugin_unique_identifier plugin_entities.PluginUniqueIdentifier, install_type plugin_entities.PluginRuntimeType, declaration *plugin_entities.PluginDeclaration, ) ( @@ -29,19 +29,19 @@ func CreatePlugin( err := db.WithTransaction(func(tx *gorm.DB) error { p, err := db.GetOne[models.Plugin]( db.WithTransactionContext(tx), - db.Equal("plugin_identity", plugin_identity.String()), - db.Equal("plugin_id", plugin_identity.PluginID()), + db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()), + db.Equal("plugin_id", plugin_unique_identifier.PluginID()), db.Equal("install_type", string(install_type)), db.WLock(), ) if err == db.ErrDatabaseNotFound { plugin := &models.Plugin{ - PluginID: plugin_identity.PluginID(), - PluginIdentity: plugin_identity.String(), - InstallType: install_type, - Refers: 1, - Declaration: parser.MarshalJson(declaration), + PluginID: plugin_unique_identifier.PluginID(), + PluginUniqueIdentifier: plugin_unique_identifier.String(), + InstallType: install_type, + Refers: 1, + Declaration: parser.MarshalJson(declaration), } err := db.Create(plugin, tx) @@ -62,10 +62,10 @@ func CreatePlugin( } installation := &models.PluginInstallation{ - PluginID: plugin_to_be_returns.PluginID, - PluginIdentity: plugin_to_be_returns.PluginIdentity, - TenantID: tenant_id, - UserID: user_id, + PluginID: plugin_to_be_returns.PluginID, + PluginUniqueIdentifier: plugin_to_be_returns.PluginUniqueIdentifier, + TenantID: tenant_id, + UserID: user_id, } err = db.Create(installation, tx) @@ -94,13 +94,13 @@ type DeletePluginResponse struct { // Delete plugin for a tenant, delete the plugin if it has never been created before // and uninstall it from the tenant, return the plugin and the installation // if the plugin has been created before, return the plugin which has been created before -func DeletePlugin(tenant_id string, plugin_identity plugin_entities.PluginIdentity, installation_id string) (*DeletePluginResponse, error) { +func DeletePlugin(tenant_id string, plugin_unique_identifier plugin_entities.PluginUniqueIdentifier, installation_id string) (*DeletePluginResponse, error) { var plugin_to_be_returns *models.Plugin var installation_to_be_returns *models.PluginInstallation _, err := db.GetOne[models.PluginInstallation]( db.Equal("id", installation_id), - db.Equal("plugin_identity", plugin_identity.String()), + db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()), db.Equal("tenant_id", tenant_id), ) @@ -115,7 +115,7 @@ func DeletePlugin(tenant_id string, plugin_identity plugin_entities.PluginIdenti err = db.WithTransaction(func(tx *gorm.DB) error { p, err := db.GetOne[models.Plugin]( db.WithTransactionContext(tx), - db.Equal("plugin_identity", plugin_identity.String()), + db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()), db.WLock(), ) @@ -134,7 +134,7 @@ func DeletePlugin(tenant_id string, plugin_identity plugin_entities.PluginIdenti installation, err := db.GetOne[models.PluginInstallation]( db.WithTransactionContext(tx), - db.Equal("plugin_identity", plugin_identity.String()), + db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()), db.Equal("tenant_id", tenant_id), ) diff --git a/internal/types/models/installation.go b/internal/types/models/installation.go index b8d3f0a..c6a9f61 100644 --- a/internal/types/models/installation.go +++ b/internal/types/models/installation.go @@ -6,11 +6,11 @@ type PluginInstallationStatus string type PluginInstallation struct { Model - TenantID string `json:"tenant_id" orm:"index;type:uuid;"` - UserID string `json:"user_id" orm:"index;type:uuid;"` - PluginID string `json:"plugin_id" orm:"index;size:127"` - PluginIdentity string `json:"plugin_identity" orm:"index;size:127"` - Config string `json:"config"` + TenantID string `json:"tenant_id" orm:"index;type:uuid;"` + UserID string `json:"user_id" orm:"index;type:uuid;"` + PluginID string `json:"plugin_id" orm:"index;size:127"` + PluginUniqueIdentifier string `json:"plugin_unique_identifier" orm:"index;size:127"` + Config string `json:"config"` } func (p *PluginInstallation) ConfigMap() (map[string]any, error) { diff --git a/internal/types/models/plugin.go b/internal/types/models/plugin.go index 5d36964..073c9a5 100644 --- a/internal/types/models/plugin.go +++ b/internal/types/models/plugin.go @@ -7,8 +7,8 @@ import ( type Plugin struct { Model - // PluginIdentity is a unique identifier for the plugin, it contains version and checksum - PluginIdentity string `json:"plugin_identity" orm:"index;size:127"` + // PluginUniqueIdentifier is a unique identifier for the plugin, it contains version and checksum + PluginUniqueIdentifier string `json:"plugin_unique_identifier" orm:"index;size:127"` // PluginID is the id of the plugin, only plugin name is considered PluginID string `json:"id" orm:"index;size:127"` Refers int `json:"refers" orm:"default:0"` diff --git a/internal/utils/parser/identity.go b/internal/utils/parser/identity.go index 2a26780..0ad1e83 100644 --- a/internal/utils/parser/identity.go +++ b/internal/utils/parser/identity.go @@ -2,6 +2,6 @@ package parser import "fmt" -func MarshalPluginIdentity(name string, version string) string { +func MarshalPluginUniqueIdentifier(name string, version string) string { return fmt.Sprintf("%s:%s", name, version) }