Skip to content

Commit

Permalink
feat: invoke tool and models
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Jul 19, 2024
1 parent 3772c09 commit c9dc8aa
Show file tree
Hide file tree
Showing 36 changed files with 1,373 additions and 368 deletions.
28 changes: 14 additions & 14 deletions internal/core/dify_invocation/http_request.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
package dify_invocation

import (
"github.com/langgenius/dify-plugin-daemon/internal/utils/requests"
"github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests"
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
)

func Request[T any](method string, path string, options ...requests.HttpOptions) (*T, error) {
func Request[T any](method string, path string, options ...http_requests.HttpOptions) (*T, error) {
options = append(options,
requests.HttpHeader(map[string]string{
http_requests.HttpHeader(map[string]string{
"X-Inner-Api-Key": PLUGIN_INNER_API_KEY,
}),
requests.HttpWriteTimeout(5000),
requests.HttpReadTimeout(60000),
http_requests.HttpWriteTimeout(5000),
http_requests.HttpReadTimeout(60000),
)

return requests.RequestAndParse[T](client, difyPath(path), method, options...)
return http_requests.RequestAndParse[T](client, difyPath(path), method, options...)
}

func StreamResponse[T any](method string, path string, options ...requests.HttpOptions) (*stream.StreamResponse[T], error) {
func StreamResponse[T any](method string, path string, options ...http_requests.HttpOptions) (*stream.StreamResponse[T], error) {
options = append(
options, requests.HttpHeader(map[string]string{
options, http_requests.HttpHeader(map[string]string{
"X-Inner-Api-Key": PLUGIN_INNER_API_KEY,
}),
requests.HttpWriteTimeout(5000),
requests.HttpReadTimeout(60000),
http_requests.HttpWriteTimeout(5000),
http_requests.HttpReadTimeout(60000),
)

return requests.RequestAndParseStream[T](client, difyPath(path), method, options...)
return http_requests.RequestAndParseStream[T](client, difyPath(path), method, options...)
}

func InvokeModel(payload *InvokeModelRequest) (*stream.StreamResponse[InvokeModelResponseChunk], error) {
return StreamResponse[InvokeModelResponseChunk]("POST", "invoke/model", requests.HttpPayloadJson(payload))
return StreamResponse[InvokeModelResponseChunk]("POST", "invoke/model", http_requests.HttpPayloadJson(payload))
}

func InvokeTool(payload *InvokeToolRequest) (*stream.StreamResponse[InvokeToolResponseChunk], error) {
return StreamResponse[InvokeToolResponseChunk]("POST", "invoke/tool", requests.HttpPayloadJson(payload))
return StreamResponse[InvokeToolResponseChunk]("POST", "invoke/tool", http_requests.HttpPayloadJson(payload))
}

func InvokeNode[T WorkflowNodeData](payload *InvokeNodeRequest[T]) (*InvokeNodeResponse, error) {
return Request[InvokeNodeResponse]("POST", "invoke/node", requests.HttpPayloadJson(payload))
return Request[InvokeNodeResponse]("POST", "invoke/node", http_requests.HttpPayloadJson(payload))
}
31 changes: 31 additions & 0 deletions internal/core/plugin_daemon/basic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package plugin_daemon

type PluginAccessType string

const (
PLUGIN_ACCESS_TYPE_TOOL PluginAccessType = "tool"
PLUGIN_ACCESS_TYPE_MODEL PluginAccessType = "model"
)

type PluginAccessAction string

const (
PLUGIN_ACCESS_ACTION_INVOKE_TOOL PluginAccessAction = "invoke_tool"
PLUGIN_ACCESS_ACTION_INVOKE_LLM PluginAccessAction = "invoke_llm"
)

const (
PLUGIN_IN_STREAM_EVENT = "request"
)

func getBasicPluginAccessMap(session_id string, user_id string, access_type PluginAccessType, action PluginAccessAction) map[string]any {
return map[string]any{
"session_id": session_id,
"event": PLUGIN_IN_STREAM_EVENT,
"data": map[string]any{
"user_id": user_id,
"type": access_type,
"action": action,
},
}
}
65 changes: 0 additions & 65 deletions internal/core/plugin_daemon/daemon.go
Original file line number Diff line number Diff line change
@@ -1,66 +1 @@
package plugin_daemon

import (
"errors"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
)

type ToolResponseChunk = plugin_entities.InvokeToolResponseChunk

func InvokeTool(session *session_manager.Session, provider_name string, tool_name string, tool_parameters map[string]any) (
*stream.StreamResponse[ToolResponseChunk], error,
) {
runtime := plugin_manager.Get(session.PluginIdentity())
if runtime == nil {
return nil, errors.New("plugin not found")
}

response := stream.NewStreamResponse[ToolResponseChunk](512)

listener := runtime.Listen(session.ID())
listener.AddListener(func(message []byte) {
chunk, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](message)
if err != nil {
log.Error("unmarshal json failed: %s", err.Error())
return
}

switch chunk.Type {
case plugin_entities.SESSION_MESSAGE_TYPE_STREAM:
chunk, err := parser.UnmarshalJsonBytes[ToolResponseChunk](chunk.Data)
if err != nil {
log.Error("unmarshal json failed: %s", err.Error())
return
}
response.Write(chunk)
case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
invokeDify(runtime, session, chunk.Data)
case plugin_entities.SESSION_MESSAGE_TYPE_END:
response.Close()
default:
log.Error("unknown stream message type: %s", chunk.Type)
response.Close()
}
})

response.OnClose(func() {
listener.Close()
})

runtime.Write(session.ID(), []byte(parser.MarshalJson(
map[string]any{
"provider": provider_name,
"tool": tool_name,
"parameters": tool_parameters,
"session_id": session.ID(),
},
)))

return response, nil
}
3 changes: 2 additions & 1 deletion internal/core/plugin_daemon/invoke_dify.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
)

func invokeDify(runtime entities.PluginRuntimeInterface,
func invokeDify(
runtime entities.PluginRuntimeInterface,
session *session_manager.Session, data []byte,
) error {
// unmarshal invoke data
Expand Down
88 changes: 88 additions & 0 deletions internal/core/plugin_daemon/model_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package plugin_daemon

import (
"errors"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
)

func getInvokeModelMap(
session *session_manager.Session,
action PluginAccessAction,
request *requests.RequestInvokeLLM,
) map[string]any {
req := getBasicPluginAccessMap(session.ID(), session.UserID(), PLUGIN_ACCESS_TYPE_MODEL, action)
data := req["data"].(map[string]any)

data["provider"] = request.Provider
data["model"] = request.Model
data["model_type"] = request.ModelType
data["model_parameters"] = request.ModelParameters
data["prompt_messages"] = request.PromptMessages
data["tools"] = request.Tools
data["stop"] = request.Stop
data["stream"] = request.Stream
data["credentials"] = request.Credentials

return req
}

func InvokeLLM(
session *session_manager.Session,
request *requests.RequestInvokeLLM,
) (
*stream.StreamResponse[plugin_entities.InvokeModelResponseChunk], error,
) {
runtime := plugin_manager.Get(session.PluginIdentity())
if runtime == nil {
return nil, errors.New("plugin not found")
}

response := stream.NewStreamResponse[plugin_entities.InvokeModelResponseChunk](512)

listener := runtime.Listen(session.ID())
listener.AddListener(func(message []byte) {
chunk, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](message)
if err != nil {
log.Error("unmarshal json failed: %s", err.Error())
return
}

switch chunk.Type {
case plugin_entities.SESSION_MESSAGE_TYPE_STREAM:
chunk, err := parser.UnmarshalJsonBytes[plugin_entities.InvokeModelResponseChunk](chunk.Data)
if err != nil {
log.Error("unmarshal json failed: %s", err.Error())
return
}
response.Write(chunk)
case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
invokeDify(runtime, session, chunk.Data)
case plugin_entities.SESSION_MESSAGE_TYPE_END:
response.Close()
default:
log.Error("unknown stream message type: %s", chunk.Type)
response.Close()
}
})

response.OnClose(func() {
listener.Close()
})

runtime.Write(session.ID(), []byte(parser.MarshalJson(
getInvokeModelMap(
session,
PLUGIN_ACCESS_ACTION_INVOKE_LLM,
request,
),
)))

return response, nil
}
86 changes: 86 additions & 0 deletions internal/core/plugin_daemon/tool_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package plugin_daemon

import (
"errors"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
)

func getInvokeToolMap(
session *session_manager.Session,
action PluginAccessAction,
request *requests.RequestInvokeTool,
) map[string]any {
req := getBasicPluginAccessMap(session.ID(), session.UserID(), PLUGIN_ACCESS_TYPE_TOOL, action)
data := req["data"].(map[string]any)

data["provider"] = request.Provider
data["tool"] = request.Tool
data["parameters"] = request.ToolParameters
data["credentials"] = request.Credentials

return req
}

func InvokeTool(
session *session_manager.Session,
request *requests.RequestInvokeTool,
) (
*stream.StreamResponse[plugin_entities.ToolResponseChunk], error,
) {
runtime := plugin_manager.Get(session.PluginIdentity())
if runtime == nil {
return nil, errors.New("plugin not found")
}

response := stream.NewStreamResponse[plugin_entities.ToolResponseChunk](512)

listener := runtime.Listen(session.ID())
listener.AddListener(func(message []byte) {
chunk, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](message)
if err != nil {
log.Error("unmarshal json failed: %s", err.Error())
return
}

switch chunk.Type {
case plugin_entities.SESSION_MESSAGE_TYPE_STREAM:
chunk, err := parser.UnmarshalJsonBytes[plugin_entities.ToolResponseChunk](chunk.Data)
if err != nil {
log.Error("unmarshal json failed: %s", err.Error())
return
}
response.Write(chunk)
case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
invokeDify(runtime, session, chunk.Data)
case plugin_entities.SESSION_MESSAGE_TYPE_END:
response.Close()
case plugin_entities.SESSION_MESSAGE_TYPE_ERROR:
e, err := parser.UnmarshalJsonBytes[plugin_entities.ErrorResponse](chunk.Data)
if err != nil {
break
}
response.WriteError(errors.New(e.Error))
response.Close()
default:
response.WriteError(errors.New("unknown stream message type: " + string(chunk.Type)))
response.Close()
}
})

response.OnClose(func() {
listener.Close()
})

runtime.Write(session.ID(), []byte(parser.MarshalJson(
getInvokeToolMap(session, PLUGIN_ACCESS_ACTION_INVOKE_TOOL, request)),
))

return response, nil
}
4 changes: 4 additions & 0 deletions internal/core/plugin_manager/aws_manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ func (r *AWSPluginRuntime) StartPlugin() error {

return nil
}

func (r *AWSPluginRuntime) Wait() (<-chan bool, error) {
return nil, nil
}
9 changes: 9 additions & 0 deletions internal/core/plugin_manager/lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,14 @@ func lifetime(config *app.Config, r entities.PluginRuntimeInterface) {
start_failed_times++
continue
}

// wait for plugin to stop
c, err := r.Wait()
if err == nil {
<-c
}

// restart plugin in 5s
time.Sleep(5 * time.Second)
}
}
2 changes: 1 addition & 1 deletion internal/core/plugin_manager/local_manager/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
}

// execute init command
handle := exec.Command("bash", "install.sh")
handle := exec.Command("bash", r.Config.Execution.Install)
handle.Dir = r.State.RelativePath

// get stdout and stderr
Expand Down
Loading

0 comments on commit c9dc8aa

Please sign in to comment.