Skip to content

Commit

Permalink
feat: add workflow parallel depth limit configuration (#11460)
Browse files Browse the repository at this point in the history
Signed-off-by: -LAN- <[email protected]>
Co-authored-by: zxhlyh <[email protected]>
  • Loading branch information
2 people authored and 刘江波 committed Dec 20, 2024
1 parent 8802d8b commit 282ea0b
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 5 deletions.
1 change: 1 addition & 0 deletions api/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000
WORKFLOW_MAX_EXECUTION_STEPS=500
WORKFLOW_MAX_EXECUTION_TIME=1200
WORKFLOW_CALL_MAX_DEPTH=5
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
MAX_VARIABLE_SIZE=204800

# App configuration
Expand Down
5 changes: 5 additions & 0 deletions api/configs/feature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,11 @@ class WorkflowConfig(BaseSettings):
default=5,
)

WORKFLOW_PARALLEL_DEPTH_LIMIT: PositiveInt = Field(
description="Maximum allowed depth for nested parallel executions",
default=3,
)

MAX_VARIABLE_SIZE: PositiveInt = Field(
description="Maximum size in bytes for a single variable in workflows. Default to 200 KB.",
default=200 * 1024,
Expand Down
15 changes: 15 additions & 0 deletions api/controllers/console/app/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound

import services
from configs import dify_config
from controllers.console import api
from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
from controllers.console.app.wraps import get_app_model
Expand Down Expand Up @@ -426,7 +427,21 @@ def post(self, app_model: App):
}


class WorkflowConfigApi(Resource):
"""Resource for workflow configuration."""

@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App):
return {
"parallel_depth_limit": dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT,
}


api.add_resource(DraftWorkflowApi, "/apps/<uuid:app_id>/workflows/draft")
api.add_resource(WorkflowConfigApi, "/apps/<uuid:app_id>/workflows/draft/config")
api.add_resource(AdvancedChatDraftWorkflowRunApi, "/apps/<uuid:app_id>/advanced-chat/workflows/draft/run")
api.add_resource(DraftWorkflowRunApi, "/apps/<uuid:app_id>/workflows/draft/run")
api.add_resource(WorkflowTaskStopApi, "/apps/<uuid:app_id>/workflow-runs/tasks/<string:task_id>/stop")
Expand Down
5 changes: 4 additions & 1 deletion api/core/workflow/graph_engine/entities/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pydantic import BaseModel, Field

from configs import dify_config
from core.workflow.graph_engine.entities.run_condition import RunCondition
from core.workflow.nodes import NodeType
from core.workflow.nodes.answer.answer_stream_generate_router import AnswerStreamGeneratorRouter
Expand Down Expand Up @@ -170,7 +171,9 @@ def init(cls, graph_config: Mapping[str, Any], root_node_id: Optional[str] = Non
for parallel in parallel_mapping.values():
if parallel.parent_parallel_id:
cls._check_exceed_parallel_limit(
parallel_mapping=parallel_mapping, level_limit=3, parent_parallel_id=parallel.parent_parallel_id
parallel_mapping=parallel_mapping,
level_limit=dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT,
parent_parallel_id=parallel.parent_parallel_id,
)

# init answer stream generate routes
Expand Down
2 changes: 2 additions & 0 deletions api/tests/unit_tests/configs/test_dify_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def test_dify_config(example_env_file):
# annotated field with configured value
assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 30

assert config.WORKFLOW_PARALLEL_DEPTH_LIMIT == 3


# NOTE: If there is a `.env` file in your Workspace, this test might not succeed as expected.
# This is due to `pymilvus` loading all the variables from the `.env` file into `os.environ`.
Expand Down
1 change: 1 addition & 0 deletions docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ WORKFLOW_MAX_EXECUTION_STEPS=500
WORKFLOW_MAX_EXECUTION_TIME=1200
WORKFLOW_CALL_MAX_DEPTH=5
MAX_VARIABLE_SIZE=204800
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
WORKFLOW_FILE_UPLOAD_LIMIT=10

# HTTP request node in workflow configuration
Expand Down
3 changes: 2 additions & 1 deletion docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ x-shared-env: &shared-api-worker-env
LOG_DATEFORMAT: ${LOG_DATEFORMAT:-"%Y-%m-%d %H:%M:%S"}
LOG_TZ: ${LOG_TZ:-UTC}
DEBUG: ${DEBUG:-false}
SENTRY_DSN: ${SENTRY_DSN:-}
FLASK_DEBUG: ${FLASK_DEBUG:-false}
SECRET_KEY: ${SECRET_KEY:-sk-9f73s3ljTXVcMT3Blb3ljTqtsKiGHXVcMT3BlbkFJLK7U}
INIT_PASSWORD: ${INIT_PASSWORD:-}
Expand Down Expand Up @@ -260,6 +259,7 @@ x-shared-env: &shared-api-worker-env
UPLOAD_IMAGE_FILE_SIZE_LIMIT: ${UPLOAD_IMAGE_FILE_SIZE_LIMIT:-10}
UPLOAD_VIDEO_FILE_SIZE_LIMIT: ${UPLOAD_VIDEO_FILE_SIZE_LIMIT:-100}
UPLOAD_AUDIO_FILE_SIZE_LIMIT: ${UPLOAD_AUDIO_FILE_SIZE_LIMIT:-50}
SENTRY_DSN: ${SENTRY_DSN:-}
API_SENTRY_DSN: ${API_SENTRY_DSN:-}
API_SENTRY_TRACES_SAMPLE_RATE: ${API_SENTRY_TRACES_SAMPLE_RATE:-1.0}
API_SENTRY_PROFILES_SAMPLE_RATE: ${API_SENTRY_PROFILES_SAMPLE_RATE:-1.0}
Expand Down Expand Up @@ -299,6 +299,7 @@ x-shared-env: &shared-api-worker-env
WORKFLOW_MAX_EXECUTION_TIME: ${WORKFLOW_MAX_EXECUTION_TIME:-1200}
WORKFLOW_CALL_MAX_DEPTH: ${WORKFLOW_CALL_MAX_DEPTH:-5}
MAX_VARIABLE_SIZE: ${MAX_VARIABLE_SIZE:-204800}
WORKFLOW_PARALLEL_DEPTH_LIMIT: ${WORKFLOW_PARALLEL_DEPTH_LIMIT:-3}
WORKFLOW_FILE_UPLOAD_LIMIT: ${WORKFLOW_FILE_UPLOAD_LIMIT:-10}
HTTP_REQUEST_NODE_MAX_BINARY_SIZE: ${HTTP_REQUEST_NODE_MAX_BINARY_SIZE:-10485760}
HTTP_REQUEST_NODE_MAX_TEXT_SIZE: ${HTTP_REQUEST_NODE_MAX_TEXT_SIZE:-1048576}
Expand Down
9 changes: 6 additions & 3 deletions web/app/components/workflow/hooks/use-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import {
import I18n from '@/context/i18n'
import { CollectionType } from '@/app/components/tools/types'
import { CUSTOM_ITERATION_START_NODE } from '@/app/components/workflow/nodes/iteration-start/constants'
import { useWorkflowConfig } from '@/service/use-workflow'

export const useIsChatMode = () => {
const appDetail = useAppStore(s => s.appDetail)
Expand All @@ -69,7 +70,9 @@ export const useWorkflow = () => {
const { locale } = useContext(I18n)
const store = useStoreApi()
const workflowStore = useWorkflowStore()
const appId = useStore(s => s.appId)
const nodesExtraData = useNodesExtraData()
const { data: workflowConfig } = useWorkflowConfig(appId)
const setPanelWidth = useCallback((width: number) => {
localStorage.setItem('workflow-node-panel-width', `${width}`)
workflowStore.setState({ panelWidth: width })
Expand Down Expand Up @@ -336,15 +339,15 @@ export const useWorkflow = () => {
for (let i = 0; i < parallelList.length; i++) {
const parallel = parallelList[i]

if (parallel.depth > PARALLEL_DEPTH_LIMIT) {
if (parallel.depth > (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT)) {
const { setShowTips } = workflowStore.getState()
setShowTips(t('workflow.common.parallelTip.depthLimit', { num: PARALLEL_DEPTH_LIMIT }))
setShowTips(t('workflow.common.parallelTip.depthLimit', { num: (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT) }))
return false
}
}

return true
}, [t, workflowStore])
}, [t, workflowStore, workflowConfig?.parallel_depth_limit])

const isValidConnection = useCallback(({ source, sourceHandle, target }: Connection) => {
const {
Expand Down
12 changes: 12 additions & 0 deletions web/service/use-workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { useQuery } from '@tanstack/react-query'
import { get } from './base'
import type { WorkflowConfigResponse } from '@/types/workflow'

const NAME_SPACE = 'workflow'

export const useWorkflowConfig = (appId: string) => {
return useQuery({
queryKey: [NAME_SPACE, 'config', appId],
queryFn: () => get<WorkflowConfigResponse>(`/apps/${appId}/workflows/draft/config`),
})
}
4 changes: 4 additions & 0 deletions web/types/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,7 @@ export type ConversationVariableResponse = {
}

export type IterationDurationMap = Record<string, number>

export type WorkflowConfigResponse = {
parallel_depth_limit: number
}

0 comments on commit 282ea0b

Please sign in to comment.