Skip to content

Commit

Permalink
Merge branch 'feat/workflow-parallel-support' into deploy/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
zxhlyh committed Aug 30, 2024
2 parents 5cc1bcb + 7c9081a commit 61917e6
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 19 deletions.
1 change: 1 addition & 0 deletions web/app/components/workflow/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ export const ITERATION_PADDING = {
left: 16,
}
export const PARALLEL_LIMIT = 10
export const PARALLEL_DEPTH_LIMIT = 3

export const RETRIEVAL_OUTPUT_STRUCT = `{
"content": "",
Expand Down
60 changes: 52 additions & 8 deletions web/app/components/workflow/hooks/use-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
useWorkflowStore,
} from '../store'
import {
// PARALLEL_DEPTH_LIMIT,
PARALLEL_LIMIT,
SUPPORT_OUTPUT_VARS_NODE,
} from '../constants'
Expand Down Expand Up @@ -279,6 +280,53 @@ export const useWorkflow = () => {
return isUsed
}, [isVarUsedInNodes])

const checkParallelLimit = useCallback((nodeId: string) => {
const {
getNodes,
edges,
} = store.getState()
const nodes = getNodes()
const currentNode = nodes.find(node => node.id === nodeId)!
const sourceNodeOutgoers = getOutgoers(currentNode, nodes, edges)
if (sourceNodeOutgoers.length > PARALLEL_LIMIT - 1) {
const { setShowTips } = workflowStore.getState()
setShowTips(t('workflow.common.parallelTip.limit', { num: PARALLEL_LIMIT }))
return false
}
// if (sourceNodeOutgoers.length > 0) {
// let hasOverDepth = false
// let parallelDepth = 1
// const traverse = (root: Node, depth: number) => {
// if (depth > PARALLEL_DEPTH_LIMIT) {
// hasOverDepth = true
// return
// }
// if (depth > parallelDepth)
// parallelDepth = depth

// const incomerNodes = getIncomers(root, nodes, edges)

// if (incomerNodes.length) {
// incomerNodes.forEach((incomer) => {
// const incomerOutgoers = getOutgoers(incomer, nodes, edges)

// if (incomerOutgoers.length > 1)
// traverse(incomer, depth + 1)
// else
// traverse(incomer, depth)
// })
// }
// }
// traverse(currentNode, parallelDepth)
// if (hasOverDepth) {
// const { setShowTips } = workflowStore.getState()
// setShowTips(t('workflow.common.parallelTip.depthLimit', { num: PARALLEL_DEPTH_LIMIT }))
// return false
// }
// }
return true
}, [store, workflowStore, t])

const isValidConnection = useCallback(({ source, target }: Connection) => {
const {
edges,
Expand All @@ -288,16 +336,11 @@ export const useWorkflow = () => {
const sourceNode: Node = nodes.find(node => node.id === source)!
const targetNode: Node = nodes.find(node => node.id === target)!

if (sourceNode.type === CUSTOM_NOTE_NODE || targetNode.type === CUSTOM_NOTE_NODE)
if (!checkParallelLimit(source!))
return false

const sourceNodeOutgoers = getOutgoers(sourceNode, nodes, edges)

if (sourceNodeOutgoers.length > PARALLEL_LIMIT - 1) {
const { setShowTips } = workflowStore.getState()
setShowTips(t('workflow.common.parallelTip.limit', { num: PARALLEL_LIMIT }))
if (sourceNode.type === CUSTOM_NOTE_NODE || targetNode.type === CUSTOM_NOTE_NODE)
return false
}

if (sourceNode && targetNode) {
const sourceNodeAvailableNextNodes = nodesExtraData[sourceNode.data.type].availableNextNodes
Expand Down Expand Up @@ -325,7 +368,7 @@ export const useWorkflow = () => {
}

return !hasCycle(targetNode)
}, [store, nodesExtraData])
}, [store, nodesExtraData, checkParallelLimit])

const formatTimeFromNow = useCallback((time: number) => {
return dayjs(time).locale(locale === 'zh-Hans' ? 'zh-cn' : locale).fromNow()
Expand All @@ -348,6 +391,7 @@ export const useWorkflow = () => {
isVarUsedInNodes,
removeUsedVarInNodes,
isNodeVarsUsedInNodes,
checkParallelLimit,
isValidConnection,
formatTimeFromNow,
getNode,
Expand Down
7 changes: 7 additions & 0 deletions web/app/components/workflow/limit-tips.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@ const LimitTips = () => {

return (
<div className='absolute bottom-16 left-1/2 -translate-x-1/2 flex items-center rounded-xl p-2 h-10 border border-components-panel-border bg-components-panel-bg-blur shadow-md z-[9]'>
<div
className='absolute inset-0 opacity-[0.4] rounded-xl'
style={{
background: 'linear-gradient(92deg, rgba(247, 144, 9, 0.25) 0%, rgba(255, 255, 255, 0.00) 100%)',
}}
></div>
<div className='flex items-center justify-center w-5 h-5'>
<RiAlertFill className='w-4 h-4 text-text-warning-secondary' />
</div>
<div className='mx-1 px-1 system-xs-medium text-text-primary'>
{showTips}
</div>
<ActionButton
className='z-[1]'
onClick={() => setShowTips('')}
>
<RiCloseLine className='w-4 h-4' />
Expand Down
17 changes: 6 additions & 11 deletions web/app/components/workflow/nodes/_base/components/node-handle.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ import {
useIsChatMode,
useNodesInteractions,
useNodesReadOnly,
useWorkflow,
} from '../../../hooks'
import {
useStore,
useWorkflowStore,
} from '../../../store'
import { PARALLEL_LIMIT } from '../../../constants'
import Tooltip from '@/app/components/base/tooltip'

type NodeHandleProps = {
Expand Down Expand Up @@ -119,27 +118,23 @@ export const NodeSourceHandle = memo(({
}: NodeHandleProps) => {
const { t } = useTranslation()
const notInitialWorkflow = useStore(s => s.notInitialWorkflow)
const workflowStore = useWorkflowStore()
const [open, setOpen] = useState(false)
const { handleNodeAdd } = useNodesInteractions()
const { getNodesReadOnly } = useNodesReadOnly()
const { availableNextBlocks } = useAvailableBlocks(data.type, data.isInIteration)
const isConnectable = !!availableNextBlocks.length
const isChatMode = useIsChatMode()
const { checkParallelLimit } = useWorkflow()

const connected = data._connectedSourceHandleIds?.includes(handleId)
const handleOpenChange = useCallback((v: boolean) => {
setOpen(v)
}, [])
const handleHandleClick = useCallback((e: MouseEvent) => {
e.stopPropagation()
if (data._connectedSourceHandleIds!.length > PARALLEL_LIMIT - 1) {
const { setShowTips } = workflowStore.getState()
setShowTips(t('workflow.common.parallelTip.limit', { num: PARALLEL_LIMIT }))
return
}
setOpen(v => !v)
}, [data._connectedSourceHandleIds])
if (checkParallelLimit(id))
setOpen(v => !v)
}, [checkParallelLimit, id])
const handleSelect = useCallback((type: BlockEnum, toolDefaultValue?: ToolDefaultValue) => {
handleNodeAdd(
{
Expand All @@ -156,7 +151,7 @@ export const NodeSourceHandle = memo(({
useEffect(() => {
if (notInitialWorkflow && data.type === BlockEnum.Start && !isChatMode)
setOpen(true)
}, [notInitialWorkflow, data.type])
}, [notInitialWorkflow, data.type, isChatMode])

return (
<Tooltip
Expand Down
1 change: 1 addition & 0 deletions web/app/components/workflow/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ export function getIterationStartNode(iterationId: string): Node {
title: '',
desc: '',
type: BlockEnum.IterationStart,
isInIteration: true,
},
position: {
x: 24,
Expand Down
1 change: 1 addition & 0 deletions web/i18n/en-US/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ const translation = {
desc: ' to connect',
},
limit: 'Parallelism is limited to {{num}} branches.',
depthLimit: 'Parallel nesting layer limit of {{num}} layers',
},
},
env: {
Expand Down
1 change: 1 addition & 0 deletions web/i18n/zh-Hans/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ const translation = {
desc: '连接节点',
},
limit: '并行分支限制为 {{num}} 个',
depthLimit: '并行嵌套层数限制 {{num}} 层',
},
},
env: {
Expand Down

0 comments on commit 61917e6

Please sign in to comment.