diff --git a/internal/core/plugin_daemon/tool_service.go b/internal/core/plugin_daemon/tool_service.go index f54b9f0..9ac7555 100644 --- a/internal/core/plugin_daemon/tool_service.go +++ b/internal/core/plugin_daemon/tool_service.go @@ -1,12 +1,15 @@ package plugin_daemon import ( + "bytes" + "encoding/base64" "errors" "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager" "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities" "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests" "github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities" + "github.com/langgenius/dify-plugin-daemon/internal/utils/routine" "github.com/langgenius/dify-plugin-daemon/internal/utils/stream" "github.com/xeipuuv/gojsonschema" ) @@ -46,10 +49,84 @@ func InvokeTool( } } + new_response := stream.NewStream[tool_entities.ToolResponseChunk](128) + routine.Submit(func() { + files := make(map[string]*bytes.Buffer) + defer new_response.Close() + + for response.Next() { + item, err := response.Read() + if err != nil { + return + } + + if item.Type == tool_entities.ToolResponseChunkTypeBlobChunk { + id, ok := item.Message["id"].(string) + if !ok { + continue + } + + total_length, ok := item.Message["total_length"].(float64) + if !ok { + continue + } + + // convert total_length to int + total_length_int := int(total_length) + + blob, ok := item.Message["blob"].(string) + if !ok { + continue + } + + end, ok := item.Message["end"].(bool) + if !ok { + continue + } + + if _, ok := files[id]; !ok { + files[id] = bytes.NewBuffer(make([]byte, 0, total_length_int)) + } + + if end { + new_response.Write(tool_entities.ToolResponseChunk{ + Type: tool_entities.ToolResponseChunkTypeBlob, + Message: map[string]any{ + "blob": files[id].Bytes(), // bytes will be encoded to base64 finally + }, + Meta: item.Meta, + }) + } else { + if files[id].Len() > 15*1024*1024 { + // delete the file if it is too large + delete(files, id) + new_response.WriteError(errors.New("file is too large")) + return + } else { + // decode the blob using base64 + decoded, err := base64.StdEncoding.DecodeString(blob) + if err != nil { + new_response.WriteError(err) + return + } + if len(decoded) > 8192 { + // single chunk is too large, raises error + new_response.WriteError(errors.New("single file chunk is too large")) + return + } + files[id].Write(decoded) + } + } + } else { + new_response.Write(item) + } + } + }) + // bind json schema validator bindValidator(response, tool_output_schema) - return response, nil + return new_response, nil } func bindValidator( diff --git a/internal/core/plugin_manager/remote_manager/connection_key.go b/internal/core/plugin_manager/remote_manager/connection_key.go index b49ce63..d8d8380 100644 --- a/internal/core/plugin_manager/remote_manager/connection_key.go +++ b/internal/core/plugin_manager/remote_manager/connection_key.go @@ -41,7 +41,7 @@ const ( CONNECTION_KEY_MANAGER_KEY2ID_PREFIX = "remote:key:manager:key2id" CONNECTION_KEY_MANAGER_ID2KEY_PREFIX = "remote:key:manager:id2key" CONNECTION_KEY_LOCK = "connection_lock" - CONNECTION_KEY_EXPIRE_TIME = time.Minute * 15 + CONNECTION_KEY_EXPIRE_TIME = time.Hour * 15 ) // returns a random string, create it if not exists diff --git a/internal/types/entities/tool_entities/tool.go b/internal/types/entities/tool_entities/tool.go index 826a42e..2edf37b 100644 --- a/internal/types/entities/tool_entities/tool.go +++ b/internal/types/entities/tool_entities/tool.go @@ -12,6 +12,7 @@ const ( ToolResponseChunkTypeText ToolResponseChunkType = "text" ToolResponseChunkTypeFile ToolResponseChunkType = "file" ToolResponseChunkTypeBlob ToolResponseChunkType = "blob" + ToolResponseChunkTypeBlobChunk ToolResponseChunkType = "blob_chunk" ToolResponseChunkTypeJson ToolResponseChunkType = "json" ToolResponseChunkTypeLink ToolResponseChunkType = "link" ToolResponseChunkTypeImage ToolResponseChunkType = "image" @@ -25,6 +26,7 @@ func IsValidToolResponseChunkType(fl validator.FieldLevel) bool { case ToolResponseChunkTypeText, ToolResponseChunkTypeFile, ToolResponseChunkTypeBlob, + ToolResponseChunkTypeBlobChunk, ToolResponseChunkTypeJson, ToolResponseChunkTypeLink, ToolResponseChunkTypeImage,