Skip to content

Commit

Permalink
feat: service support persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Aug 28, 2024
1 parent 4ea1f0b commit e139f38
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 51 deletions.
26 changes: 26 additions & 0 deletions internal/core/dify_invocation/types.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package dify_invocation

import (
"github.com/go-playground/validator/v10"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/app_entities"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
"github.com/langgenius/dify-plugin-daemon/internal/types/validators"
)

type BaseInvokeDifyRequest struct {
Expand All @@ -23,6 +25,7 @@ const (
INVOKE_TYPE_TOOL InvokeType = "tool"
INVOKE_TYPE_NODE InvokeType = "node"
INVOKE_TYPE_APP InvokeType = "app"
INVOKE_TYPE_STORAGE InvokeType = "storage"
)

type InvokeLLMRequest struct {
Expand Down Expand Up @@ -71,6 +74,29 @@ type InvokeAppSchema struct {
Files []*app_entities.FileVar `json:"files" validate:"omitempty,dive"`
}

type StorageOpt string

const (
STORAGE_OPT_GET StorageOpt = "get"
STORAGE_OPT_SET StorageOpt = "set"
STORAGE_OPT_DEL StorageOpt = "del"
)

func isStorageOpt(fl validator.FieldLevel) bool {
opt := StorageOpt(fl.Field().String())
return opt == STORAGE_OPT_GET || opt == STORAGE_OPT_SET || opt == STORAGE_OPT_DEL
}

func init() {
validators.GlobalEntitiesValidator.RegisterValidation("storage_opt", isStorageOpt)
}

type InvokeStorageRequest struct {
Opt StorageOpt `json:"opt" validate:"required,storage_opt"`
Key string `json:"key" validate:"required"`
Value string `json:"value"` // encoded in hex, optional
}

type InvokeAppRequest struct {
BaseInvokeDifyRequest
requests.BaseRequestInvokeModel
Expand Down
14 changes: 10 additions & 4 deletions internal/core/persistence/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import (
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
)

func InitPersistence(config *app.Config) *Persistence {
var (
persistence *Persistence
)

func InitPersistence(config *app.Config) {
if config.PersistenceStorageType == "s3" {
s3, err := NewS3Wrapper(
config.PersistenceStorageS3Region,
Expand All @@ -17,16 +21,18 @@ func InitPersistence(config *app.Config) *Persistence {
log.Panic("Failed to initialize S3 wrapper: %v", err)
}

return &Persistence{
persistence = &Persistence{
storage: s3,
}
} else if config.PersistenceStorageType == "local" {
return &Persistence{
persistence = &Persistence{
storage: NewLocalWrapper(config.PersistenceStorageLocalPath),
}
} else {
log.Panic("Invalid persistence storage type: %s", config.PersistenceStorageType)
}
}

return nil
func GetPersistence() *Persistence {
return persistence
}
22 changes: 11 additions & 11 deletions internal/core/persistence/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ const (
CACHE_KEY_PREFIX = "persistence:cache"
)

func (c *Persistence) getCacheKey(tenant_id string, plugin_checksum string, key string) string {
return fmt.Sprintf("%s:%s:%s:%s", CACHE_KEY_PREFIX, tenant_id, plugin_checksum, key)
func (c *Persistence) getCacheKey(tenant_id string, plugin_identity string, key string) string {
return fmt.Sprintf("%s:%s:%s:%s", CACHE_KEY_PREFIX, tenant_id, plugin_identity, key)
}

func (c *Persistence) Save(tenant_id string, plugin_checksum string, key string, data []byte) error {
func (c *Persistence) Save(tenant_id string, plugin_identity string, key string, data []byte) error {
if len(key) > 64 {
return fmt.Errorf("key length must be less than 64 characters")
}

return c.storage.Save(tenant_id, plugin_checksum, key, data)
return c.storage.Save(tenant_id, plugin_identity, key, data)
}

func (c *Persistence) Load(tenant_id string, plugin_checksum string, key string) ([]byte, error) {
func (c *Persistence) Load(tenant_id string, plugin_identity string, key string) ([]byte, error) {
// check if the key exists in cache
h, err := cache.GetString(c.getCacheKey(tenant_id, plugin_checksum, key))
h, err := cache.GetString(c.getCacheKey(tenant_id, plugin_identity, key))
if err != nil && err != cache.ErrNotFound {
return nil, err
}
Expand All @@ -39,22 +39,22 @@ func (c *Persistence) Load(tenant_id string, plugin_checksum string, key string)
}

// load from storage
data, err := c.storage.Load(tenant_id, plugin_checksum, key)
data, err := c.storage.Load(tenant_id, plugin_identity, key)
if err != nil {
return nil, err
}

// add to cache
cache.Store(c.getCacheKey(tenant_id, plugin_checksum, key), hex.EncodeToString(data), time.Minute*5)
cache.Store(c.getCacheKey(tenant_id, plugin_identity, key), hex.EncodeToString(data), time.Minute*5)

return data, nil
}

func (c *Persistence) Delete(tenant_id string, plugin_checksum string, key string) error {
func (c *Persistence) Delete(tenant_id string, plugin_identity string, key string) error {
// delete from cache and storage
err := cache.Del(c.getCacheKey(tenant_id, plugin_checksum, key))
err := cache.Del(c.getCacheKey(tenant_id, plugin_identity, key))
if err != nil {
return err
}
return c.storage.Delete(tenant_id, plugin_checksum, key)
return c.storage.Delete(tenant_id, plugin_identity, key)
}
16 changes: 8 additions & 8 deletions internal/core/persistence/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ func TestPersistenceStoreAndLoad(t *testing.T) {
}
defer cache.Close()

p := InitPersistence(&app.Config{
InitPersistence(&app.Config{
PersistenceStorageType: "local",
PersistenceStorageLocalPath: "./persistence_storage",
})

key := strings.RandomString(10)

if err := p.Save("tenant_id", "plugin_checksum", key, []byte("data")); err != nil {
if err := persistence.Save("tenant_id", "plugin_checksum", key, []byte("data")); err != nil {
t.Fatalf("Failed to save data: %v", err)
}

data, err := p.Load("tenant_id", "plugin_checksum", key)
data, err := persistence.Load("tenant_id", "plugin_checksum", key)
if err != nil {
t.Fatalf("Failed to load data: %v", err)
}
Expand Down Expand Up @@ -65,14 +65,14 @@ func TestPersistenceSaveAndLoadWithLongKey(t *testing.T) {
}
defer cache.Close()

p := InitPersistence(&app.Config{
InitPersistence(&app.Config{
PersistenceStorageType: "local",
PersistenceStorageLocalPath: "./persistence_storage",
})

key := strings.RandomString(65)

if err := p.Save("tenant_id", "plugin_checksum", key, []byte("data")); err == nil {
if err := persistence.Save("tenant_id", "plugin_checksum", key, []byte("data")); err == nil {
t.Fatalf("Expected error, got nil")
}
}
Expand All @@ -84,18 +84,18 @@ func TestPersistenceDelete(t *testing.T) {
}
defer cache.Close()

p := InitPersistence(&app.Config{
InitPersistence(&app.Config{
PersistenceStorageType: "local",
PersistenceStorageLocalPath: "./persistence_storage",
})

key := strings.RandomString(10)

if err := p.Save("tenant_id", "plugin_checksum", key, []byte("data")); err != nil {
if err := persistence.Save("tenant_id", "plugin_checksum", key, []byte("data")); err != nil {
t.Fatalf("Failed to save data: %v", err)
}

if err := p.Delete("tenant_id", "plugin_checksum", key); err != nil {
if err := persistence.Delete("tenant_id", "plugin_checksum", key); err != nil {
t.Fatalf("Failed to delete data: %v", err)
}

Expand Down
80 changes: 72 additions & 8 deletions internal/core/plugin_daemon/backwards_invocation/task.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backwards_invocation

import (
"encoding/hex"
"fmt"

"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
Expand Down Expand Up @@ -169,28 +170,31 @@ func prepareDifyInvocationArguments(
var (
dispatchMapping = map[dify_invocation.InvokeType]func(handle *BackwardsInvocation){
dify_invocation.INVOKE_TYPE_TOOL: func(handle *BackwardsInvocation) {
genericDispatchTask[dify_invocation.InvokeToolRequest](handle, executeDifyInvocationToolTask)
genericDispatchTask(handle, executeDifyInvocationToolTask)
},
dify_invocation.INVOKE_TYPE_LLM: func(handle *BackwardsInvocation) {
genericDispatchTask[dify_invocation.InvokeLLMRequest](handle, executeDifyInvocationLLMTask)
genericDispatchTask(handle, executeDifyInvocationLLMTask)
},
dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: func(handle *BackwardsInvocation) {
genericDispatchTask[dify_invocation.InvokeTextEmbeddingRequest](handle, executeDifyInvocationTextEmbeddingTask)
genericDispatchTask(handle, executeDifyInvocationTextEmbeddingTask)
},
dify_invocation.INVOKE_TYPE_RERANK: func(handle *BackwardsInvocation) {
genericDispatchTask[dify_invocation.InvokeRerankRequest](handle, executeDifyInvocationRerankTask)
genericDispatchTask(handle, executeDifyInvocationRerankTask)
},
dify_invocation.INVOKE_TYPE_TTS: func(handle *BackwardsInvocation) {
genericDispatchTask[dify_invocation.InvokeTTSRequest](handle, executeDifyInvocationTTSTask)
genericDispatchTask(handle, executeDifyInvocationTTSTask)
},
dify_invocation.INVOKE_TYPE_SPEECH2TEXT: func(handle *BackwardsInvocation) {
genericDispatchTask[dify_invocation.InvokeSpeech2TextRequest](handle, executeDifyInvocationSpeech2TextTask)
genericDispatchTask(handle, executeDifyInvocationSpeech2TextTask)
},
dify_invocation.INVOKE_TYPE_MODERATION: func(handle *BackwardsInvocation) {
genericDispatchTask[dify_invocation.InvokeModerationRequest](handle, executeDifyInvocationModerationTask)
genericDispatchTask(handle, executeDifyInvocationModerationTask)
},
dify_invocation.INVOKE_TYPE_APP: func(handle *BackwardsInvocation) {
genericDispatchTask[dify_invocation.InvokeAppRequest](handle, executeDifyInvocationAppTask)
genericDispatchTask(handle, executeDifyInvocationAppTask)
},
dify_invocation.INVOKE_TYPE_STORAGE: func(handle *BackwardsInvocation) {
genericDispatchTask(handle, executeDifyInvocationStorageTask)
},
}
)
Expand Down Expand Up @@ -356,3 +360,63 @@ func executeDifyInvocationAppTask(
handle.WriteResponse("stream", t)
})
}

func executeDifyInvocationStorageTask(
handle *BackwardsInvocation,
request *dify_invocation.InvokeStorageRequest,
) {
if handle.session == nil {
handle.WriteError(fmt.Errorf("session not found"))
return
}

persistence := handle.session.Storage()
if persistence == nil {
handle.WriteError(fmt.Errorf("persistence not found"))
return
}

tenant_id, err := handle.TenantID()
if err != nil {
handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
return
}

plugin_id := handle.session.PluginIdentity

if request.Opt == dify_invocation.STORAGE_OPT_GET {
data, err := persistence.Load(tenant_id, plugin_id, request.Key)
if err != nil {
handle.WriteError(fmt.Errorf("load data failed: %s", err.Error()))
return
}

handle.WriteResponse("struct", map[string]any{
"data": hex.EncodeToString(data),
})
} else if request.Opt == dify_invocation.STORAGE_OPT_SET {
data, err := hex.DecodeString(request.Value)
if err != nil {
handle.WriteError(fmt.Errorf("decode data failed: %s", err.Error()))
return
}

if err := persistence.Save(tenant_id, plugin_id, request.Key, data); err != nil {
handle.WriteError(fmt.Errorf("save data failed: %s", err.Error()))
return
}

handle.WriteResponse("struct", map[string]any{
"data": "ok",
})
} else if request.Opt == dify_invocation.STORAGE_OPT_DEL {
if err := persistence.Delete(tenant_id, plugin_id, request.Key); err != nil {
handle.WriteError(fmt.Errorf("delete data failed: %s", err.Error()))
return
}

handle.WriteResponse("struct", map[string]any{
"data": "ok",
})
}
}
12 changes: 10 additions & 2 deletions internal/core/session_manager/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
Expand All @@ -21,8 +22,9 @@ var (

// session need to implement the backwards_invocation.BackwardsInvocationWriter interface
type Session struct {
ID string `json:"id"`
runtime plugin_entities.PluginRuntimeInterface `json:"-"`
ID string `json:"id"`
runtime plugin_entities.PluginRuntimeInterface `json:"-"`
persistence *persistence.Persistence `json:"-"`

TenantID string `json:"tenant_id"`
UserID string `json:"user_id"`
Expand All @@ -45,6 +47,7 @@ func NewSession(
invoke_from access_types.PluginAccessType,
action access_types.PluginAccessAction,
declaration *plugin_entities.PluginDeclaration,
persistence *persistence.Persistence,
) *Session {
s := &Session{
ID: uuid.New().String(),
Expand All @@ -55,6 +58,7 @@ func NewSession(
InvokeFrom: invoke_from,
Action: action,
Declaration: declaration,
persistence: persistence,
}

session_lock.Lock()
Expand Down Expand Up @@ -97,6 +101,10 @@ func (s *Session) Runtime() plugin_entities.PluginRuntimeInterface {
return s.runtime
}

func (s *Session) Storage() *persistence.Persistence {
return s.persistence
}

type PLUGIN_IN_STREAM_EVENT string

const (
Expand Down
4 changes: 0 additions & 4 deletions internal/server/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package server

import (
"github.com/langgenius/dify-plugin-daemon/internal/cluster"
"github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation/transaction"
)

Expand All @@ -18,7 +17,4 @@ type App struct {
// aws transaction handler
// accept aws transaction request and forward to the plugin daemon
aws_transaction_handler *transaction.AWSTransactionHandler

// persistence
persistence *persistence.Persistence
}
2 changes: 1 addition & 1 deletion internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (a *App) Run(config *app.Config) {
plugin_manager.InitGlobalPluginManager(a.cluster, config)

// init persistence
a.persistence = persistence.InitPersistence(config)
persistence.InitPersistence(config)

// launch cluster
a.cluster.Launch()
Expand Down
Loading

0 comments on commit e139f38

Please sign in to comment.