Skip to content

Commit

Permalink
feat: install
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Sep 10, 2024
1 parent b397e73 commit cda6574
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 117 deletions.
2 changes: 1 addition & 1 deletion internal/core/plugin_daemon/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func genericInvokePlugin[Req any, Rsp any](
request *Req,
response_buffer_size int,
) (*stream.Stream[Rsp], error) {
runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginUniqueIdentifier)
runtime := plugin_manager.Manager().Get(session.PluginUniqueIdentifier)
if runtime == nil {
return nil, errors.New("plugin not found")
}
Expand Down
11 changes: 11 additions & 0 deletions internal/core/plugin_manager/install.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package plugin_manager

import (
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/installer"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
)

func (p *PluginManager) Install(decoder decoder.PluginDecoder) (*stream.Stream[installer.PluginInstallResponse], error) {
return p.installer(decoder)
}
2 changes: 1 addition & 1 deletion internal/core/plugin_manager/installer/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ import (
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
)

func AwsInstaller(decoder decoder.PluginDecoder) (*stream.Stream[string], error) {
func AwsInstaller(decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error) {
return nil, nil
}
7 changes: 6 additions & 1 deletion internal/core/plugin_manager/installer/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@ import (
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
)

type Installer func(decoder decoder.PluginDecoder) (*stream.Stream[string], error)
type PluginInstallResponse struct {
Event string `json:"event"`
Data string `json:"data"`
}

type Installer func(decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error)
2 changes: 1 addition & 1 deletion internal/core/plugin_manager/installer/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ import (
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
)

func LocalInstaller(decoder decoder.PluginDecoder) (*stream.Stream[string], error) {
func LocalInstaller(decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error) {
return nil, nil
}
6 changes: 4 additions & 2 deletions internal/core/plugin_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/installer"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
Expand Down Expand Up @@ -39,7 +40,7 @@ var (
manager *PluginManager
)

func InitGlobalPluginManager(cluster *cluster.Cluster, configuration *app.Config) {
func InitManager(cluster *cluster.Cluster, configuration *app.Config) {
manager = &PluginManager{
cluster: cluster,
maxPluginPackageSize: configuration.MaxPluginPackageSize,
Expand All @@ -53,14 +54,15 @@ func InitGlobalPluginManager(cluster *cluster.Cluster, configuration *app.Config

if configuration.Platform == app.PLATFORM_AWS_LAMBDA {
manager.installer = installer.AwsInstaller
serverless.Init(configuration)
} else if configuration.Platform == app.PLATFORM_LOCAL {
manager.installer = installer.LocalInstaller
}

manager.Init(configuration)
}

func GetGlobalPluginManager() *PluginManager {
func Manager() *PluginManager {
return manager
}

Expand Down
2 changes: 2 additions & 0 deletions internal/core/plugin_manager/serverless/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ func Init(config *app.Config) {
if err := Ping(); err != nil {
log.Panic("Failed to ping serverless connector", err)
}

log.Info("Serverless connector initialized")
}
12 changes: 7 additions & 5 deletions internal/core/plugin_manager/serverless/packager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,15 @@ import (

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager/dockerfile"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/tmpfile"
)

type Packager struct {
runtime plugin_entities.PluginLifetime
decoder decoder.PluginDecoder
}

func NewPackager(runtime plugin_entities.PluginLifetime, decoder decoder.PluginDecoder) *Packager {
func NewPackager(decoder decoder.PluginDecoder) *Packager {
return &Packager{
runtime: runtime,
decoder: decoder,
}
}
Expand Down Expand Up @@ -63,6 +60,11 @@ func (d *dockerFileInfo) Sys() any {
// Pack takes a plugin and packs it into a tar file with dockerfile inside
// returns a *os.File with the tar file
func (p *Packager) Pack() (*os.File, error) {
declaration, err := p.decoder.Manifest()
if err != nil {
return nil, err
}

// walk through the plugin directory and add it to a tar file
// create a tmpfile
tmpfile, cleanup, err := tmpfile.CreateTempFile("plugin-aws-tar-*")
Expand Down Expand Up @@ -132,7 +134,7 @@ func (p *Packager) Pack() (*os.File, error) {
}

// add dockerfile
dockerfile, err := dockerfile.GenerateDockerfile(p.runtime.Configuration())
dockerfile, err := dockerfile.GenerateDockerfile(&declaration)
if err != nil {
return nil, err
}
Expand Down
59 changes: 1 addition & 58 deletions internal/core/plugin_manager/serverless/packager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,49 +11,9 @@ import (
"path/filepath"
"testing"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/constants"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
)

type TPluginRuntime struct {
plugin_entities.PluginRuntime
positive_manager.PositivePluginRuntime
}

func (r *TPluginRuntime) InitEnvironment() error {
return nil
}

func (r *TPluginRuntime) Checksum() string {
return "test_checksum"
}

func (r *TPluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) {
return plugin_entities.PluginUniqueIdentifier("test_identity"), nil
}

func (r *TPluginRuntime) StartPlugin() error {
return nil
}

func (r *TPluginRuntime) Type() plugin_entities.PluginRuntimeType {
return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
}

func (r *TPluginRuntime) Wait() (<-chan bool, error) {
return nil, nil
}

func (r *TPluginRuntime) Listen(string) *entities.Broadcast[plugin_entities.SessionMessage] {
return nil
}

func (r *TPluginRuntime) Write(string, []byte) {
}

//go:embed packager_test_plugin/*
var test_plugin embed.FS

Expand Down Expand Up @@ -102,24 +62,7 @@ func TestPackager_Pack(t *testing.T) {
t.Fatal(err)
}

packager := NewPackager(&TPluginRuntime{
PluginRuntime: plugin_entities.PluginRuntime{
Config: plugin_entities.PluginDeclaration{
PluginDeclarationWithoutAdvancedFields: plugin_entities.PluginDeclarationWithoutAdvancedFields{
Meta: plugin_entities.PluginMeta{
Runner: plugin_entities.PluginRunner{
Language: constants.Python,
Version: "3.12",
Entrypoint: "main",
},
Arch: []constants.Arch{
constants.AMD64,
},
},
},
},
},
}, decoder)
packager := NewPackager(decoder)

f, err := packager.Pack()
if err != nil {
Expand Down
76 changes: 34 additions & 42 deletions internal/core/plugin_manager/serverless/upload.go
Original file line number Diff line number Diff line change
@@ -1,79 +1,71 @@
package serverless

import (
"fmt"
"os"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
)

var (
AWS_LAUNCH_LOCK_PREFIX = "aws_launch_lock_"
)

// UploadPlugin uploads the plugin to the AWS Lambda
func UploadPlugin(r *aws_manager.AWSPluginRuntime) error {
r.Log("Starting to initialize environment")
// return the lambda url and name
func UploadPlugin(decoder decoder.PluginDecoder) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
checksum, err := checksum.CalculateChecksum(decoder)
if err != nil {
return nil, err
}

// check if the plugin has already been initialized, at most 300s
if err := cache.Lock(AWS_LAUNCH_LOCK_PREFIX+r.Checksum(), 300*time.Second, 300*time.Second); err != nil {
return err
if err := cache.Lock(AWS_LAUNCH_LOCK_PREFIX+checksum, 300*time.Second, 300*time.Second); err != nil {
return nil, err
}
defer cache.Unlock(AWS_LAUNCH_LOCK_PREFIX + r.Checksum())
r.Log("Started to initialize environment")
defer cache.Unlock(AWS_LAUNCH_LOCK_PREFIX + checksum)

identity, err := r.Identity()
manifest, err := decoder.Manifest()
if err != nil {
return err
return nil, err
}
function, err := FetchLambda(identity.String(), r.Checksum())

identity := manifest.Identity()
function, err := FetchLambda(identity, checksum)
if err != nil {
if err != ErrNoLambdaFunction {
return err
return nil, err
}
} else {
// found, return directly
r.LambdaURL = function.FunctionURL
r.LambdaName = function.FunctionName
r.Log(fmt.Sprintf("Found existing lambda function: %s", r.LambdaName))
return nil
response := stream.NewStreamResponse[LaunchAWSLambdaFunctionResponse](2)
response.Write(LaunchAWSLambdaFunctionResponse{
Event: LambdaUrl,
Message: function.FunctionURL,
})
response.Write(LaunchAWSLambdaFunctionResponse{
Event: Lambda,
Message: function.FunctionName,
})
return response, nil
}

// create it if not found
r.Log("Creating new lambda function")

// create lambda function
packager := NewPackager(r, r.Decoder)
packager := NewPackager(decoder)
context, err := packager.Pack()
if err != nil {
return err
return nil, err
}
defer os.Remove(context.Name())
defer context.Close()

response, err := LaunchLambda(identity.String(), r.Checksum(), context)
response, err := LaunchLambda(identity, checksum, context)
if err != nil {
return err
}

for response.Next() {
response, err := response.Read()
if err != nil {
return err
}

switch response.Event {
case Error:
return fmt.Errorf("error: %s", response.Message)
case LambdaUrl:
r.LambdaURL = response.Message
case Lambda:
r.LambdaName = response.Message
case Info:
r.Log(fmt.Sprintf("installing: %s", response.Message))
}
return nil, err
}

return nil
return response, nil
}
27 changes: 25 additions & 2 deletions internal/server/controllers/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (

"github.com/gin-gonic/gin"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
"github.com/langgenius/dify-plugin-daemon/internal/service"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
)

func GetAsset(c *gin.Context) {
plugin_manager := plugin_manager.GetGlobalPluginManager()
plugin_manager := plugin_manager.Manager()
asset, err := plugin_manager.GetAsset(c.Param("id"))

if err != nil {
Expand All @@ -19,7 +21,28 @@ func GetAsset(c *gin.Context) {
c.Data(http.StatusOK, "application/octet-stream", asset)
}

func InstallPlugin(c *gin.Context) {
func InstallPlugin(app *app.Config) gin.HandlerFunc {
return func(c *gin.Context) {
dify_pkg_file_header, err := c.FormFile("dify_pkg")
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

if dify_pkg_file_header.Size > app.MaxPluginPackageSize {
c.JSON(http.StatusRequestEntityTooLarge, gin.H{"error": "File size exceeds the maximum limit"})
return
}

dify_pkg_file, err := dify_pkg_file_header.Open()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
defer dify_pkg_file.Close()

service.InstallPlugin(c, dify_pkg_file)
}
}

func UninstallPlugin(c *gin.Context) {
Expand Down
2 changes: 1 addition & 1 deletion internal/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (app *App) pluginGroup(group *gin.RouterGroup, config *app.Config) {
group.Use(CheckingKey(config.PluginInnerApiKey))

group.GET("/asset/:id", controllers.GetAsset)
group.POST("/install", controllers.InstallPlugin)
group.POST("/install", controllers.InstallPlugin(config))
group.POST("/uninstall", controllers.UninstallPlugin)
group.GET("/list", controllers.ListPlugins)
}
2 changes: 1 addition & 1 deletion internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (app *App) Run(config *app.Config) {
process.Init(config)

// init plugin daemon
plugin_manager.InitGlobalPluginManager(app.cluster, config)
plugin_manager.InitManager(app.cluster, config)

// init persistence
persistence.InitPersistence(config)
Expand Down
2 changes: 1 addition & 1 deletion internal/service/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Endpoint(
}

// fetch plugin
manager := plugin_manager.GetGlobalPluginManager()
manager := plugin_manager.Manager()
runtime := manager.Get(
plugin_entities.PluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier),
)
Expand Down
Loading

0 comments on commit cda6574

Please sign in to comment.