fix(trigger): enhance error handling and refactor end user creation in trigger workflows
Some checks are pending
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions

- Improved error handling in `TriggerSubscriptionListApi` to return a 404 response for ValueErrors.
- Refactored end user creation logic in `service_api/wraps.py` to use `get_or_create_end_user` for better clarity and consistency.
- Introduced a new method `create_end_user_batch` for batch creation of end users, optimizing database interactions.
- Updated various trigger-related services to utilize the new end user handling, ensuring proper user context during trigger dispatching.
This commit is contained in:
Harry 2025-10-17 21:00:43 +08:00
parent 8a5174d078
commit 80f2c1be67
17 changed files with 429 additions and 323 deletions

View File

@ -1114,7 +1114,10 @@ class DraftWorkflowTriggerRunAllApi(Resource):
try:
trigger_debug_event: TriggerDebugEvent | None = select_trigger_debug_events(
draft_workflow=draft_workflow, app_model=app_model, user_id=current_user.id, node_ids=node_ids
draft_workflow=draft_workflow,
app_model=app_model,
user_id=current_user.id,
node_ids=node_ids,
)
except ValueError as e:
raise e

View File

@ -81,6 +81,8 @@ class TriggerSubscriptionListApi(Resource):
tenant_id=user.current_tenant_id, provider_id=TriggerProviderID(provider)
)
)
except ValueError as e:
return jsonable_encoder({"error": str(e)}), 404
except Exception as e:
logger.exception("Error listing trigger providers", exc_info=e)
raise

View File

@ -1,5 +1,5 @@
import time
from collections.abc import Callable
from collections.abc import Callable, Mapping
from datetime import timedelta
from enum import StrEnum, auto
from functools import wraps
@ -13,6 +13,7 @@ from sqlalchemy import select, update
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden, NotFound, Unauthorized
from core.app.entities.app_invoke_entities import InvokeFrom
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.datetime_utils import naive_utc_now
@ -83,7 +84,7 @@ def validate_app_token(view: Callable[P, R] | None = None, *, fetch_user_arg: Fe
if user_id:
user_id = str(user_id)
end_user = create_or_update_end_user_for_user_id(app_model, user_id)
end_user = get_or_create_end_user(app_model, user_id)
kwargs["end_user"] = end_user
# Set EndUser as current logged-in user for flask_login.current_user
@ -308,10 +309,13 @@ def validate_and_get_api_token(scope: str | None = None):
return api_token
def create_or_update_end_user_for_user_id(app_model: App, user_id: str | None = None) -> EndUser:
"""
Create or update session terminal based on user ID.
"""
def get_or_create_end_user(app_model: App, user_id: str | None = None) -> EndUser:
return get_or_create_end_user_by_type(InvokeFrom.SERVICE_API, app_model.tenant_id, app_model.id, user_id)
def get_or_create_end_user_by_type(
type: InvokeFrom, tenant_id: str, app_id: str, user_id: str | None = None
) -> EndUser:
if not user_id:
user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID
@ -319,21 +323,22 @@ def create_or_update_end_user_for_user_id(app_model: App, user_id: str | None =
end_user = (
session.query(EndUser)
.where(
EndUser.tenant_id == app_model.tenant_id,
EndUser.app_id == app_model.id,
EndUser.tenant_id == tenant_id,
EndUser.app_id == app_id,
EndUser.session_id == user_id,
EndUser.type == "service_api",
EndUser.type == type,
)
.first()
)
if end_user is None:
end_user = EndUser(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type="service_api",
tenant_id=tenant_id,
app_id=app_id,
type=type,
is_anonymous=user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID,
session_id=user_id,
external_user_id=user_id,
)
session.add(end_user)
session.commit()
@ -341,6 +346,87 @@ def create_or_update_end_user_for_user_id(app_model: App, user_id: str | None =
return end_user
def create_end_user_batch(type: InvokeFrom, tenant_id: str, app_ids: list[str], user_id: str) -> Mapping[str, EndUser]:
"""Create end users in batch.
Creates end users in batch for the specified tenant and application IDs in O(1) time.
This batch creation is necessary because trigger subscriptions can span multiple applications,
and trigger events may be dispatched to multiple applications simultaneously.
For each app_id in app_ids, check if an `EndUser` with the given
`user_id` (as session_id/external_user_id) already exists for the
tenant/app and type `type`. If it exists, return it; otherwise,
create it. Operates with minimal DB I/O by querying and inserting in
batches.
Returns a mapping of `app_id -> EndUser`.
"""
# Normalize user_id to default if empty
if not user_id:
user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID
# Deduplicate app_ids while preserving input order
seen: set[str] = set()
unique_app_ids: list[str] = []
for app_id in app_ids:
if app_id not in seen:
seen.add(app_id)
unique_app_ids.append(app_id)
# Result is a simple app_id -> EndUser mapping
result: dict[str, EndUser] = {}
if not unique_app_ids:
return result
with Session(db.engine, expire_on_commit=False) as session:
# Fetch existing end users for all target apps in a single query
existing_end_users: list[EndUser] = (
session.query(EndUser)
.where(
EndUser.tenant_id == tenant_id,
EndUser.app_id.in_(unique_app_ids),
EndUser.session_id == user_id,
EndUser.type == type,
)
.all()
)
found_app_ids: set[str] = set()
for eu in existing_end_users:
# If duplicates exist due to weak DB constraints, prefer the first
if eu.app_id not in result:
result[eu.app_id] = eu
found_app_ids.add(eu.app_id)
# Determine which apps still need an EndUser created
missing_app_ids = [app_id for app_id in unique_app_ids if app_id not in found_app_ids]
if missing_app_ids:
new_end_users: list[EndUser] = []
is_anonymous = user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID
for app_id in missing_app_ids:
new_end_users.append(
EndUser(
tenant_id=tenant_id,
app_id=app_id,
type=type,
is_anonymous=is_anonymous,
session_id=user_id,
external_user_id=user_id,
)
)
session.add_all(new_end_users)
session.commit()
for eu in new_end_users:
result[eu.app_id] = eu
return result
class DatasetApiResource(Resource):
method_decorators = [validate_dataset_token]

View File

@ -32,6 +32,10 @@ class InvokeFrom(StrEnum):
# https://docs.dify.ai/en/guides/application-publishing/launch-your-webapp-quickly/README
WEB_APP = "web-app"
# TRIGGER indicates that this invocation is from a trigger.
# this is used for plugin trigger and webhook trigger.
TRIGGER = "trigger"
# EXPLORE indicates that this invocation is from
# the workflow (or chatflow) explore page.
EXPLORE = "explore"
@ -65,6 +69,8 @@ class InvokeFrom(StrEnum):
return "dev"
elif self == InvokeFrom.EXPLORE:
return "explore_app"
elif self == InvokeFrom.TRIGGER:
return "trigger"
elif self == InvokeFrom.SERVICE_API:
return "api"

View File

@ -4,7 +4,7 @@ from typing import Union
from sqlalchemy import select
from sqlalchemy.orm import Session
from controllers.service_api.wraps import create_or_update_end_user_for_user_id
from controllers.service_api.wraps import get_or_create_end_user
from core.app.app_config.common.parameters_mapping import get_parameters_from_feature_dict
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator
@ -64,7 +64,7 @@ class PluginAppBackwardsInvocation(BaseBackwardsInvocation):
"""
app = cls._get_app(app_id, tenant_id)
if not user_id:
user = create_or_update_end_user_for_user_id(app)
user = get_or_create_end_user(app)
else:
user = cls._get_user(user_id)

View File

@ -247,6 +247,7 @@ class TriggerInvokeEventResponse(BaseModel):
class PluginTriggerDispatchResponse(BaseModel):
user_id: str
events: list[str]
raw_http_response: str
@ -260,9 +261,11 @@ class TriggerValidateProviderCredentialsResponse(BaseModel):
class TriggerDispatchResponse:
user_id: str
events: list[str]
response: Response
def __init__(self, events: list[str], response: Response):
def __init__(self, user_id: str, events: list[str], response: Response):
self.user_id = user_id
self.events = events
self.response = response

View File

@ -156,7 +156,6 @@ class PluginTriggerManager(BasePluginClient):
def dispatch_event(
self,
tenant_id: str,
user_id: str,
provider: str,
subscription: Mapping[str, Any],
request: Request,
@ -173,7 +172,6 @@ class PluginTriggerManager(BasePluginClient):
path=f"plugin/{tenant_id}/dispatch/trigger/dispatch_event",
type_=PluginTriggerDispatchResponse,
data={
"user_id": user_id,
"data": {
"provider": provider_id.provider_name,
"subscription": subscription,
@ -191,6 +189,7 @@ class PluginTriggerManager(BasePluginClient):
for resp in response:
return TriggerDispatchResponse(
user_id=resp.user_id or "",
events=resp.events,
response=deserialize_response(binascii.unhexlify(resp.raw_http_response.encode())),
)

View File

@ -118,6 +118,7 @@ class TriggerDebugEventBus:
pool_key,
address_id,
)
logger.info("event_data: %s", event_data)
return event_type.model_validate_json(json_data=event_data) if event_data else None
except RedisError:
logger.exception("Failed to poll event from pool: %s", pool_key)

View File

@ -15,8 +15,6 @@ from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
from models.model import App
from models.provider_ids import TriggerProviderID
from models.workflow import Workflow
from services.trigger.trigger_service import TriggerService
from services.trigger.webhook_service import WebhookService
logger = logging.getLogger(__name__)
@ -47,6 +45,8 @@ class TriggerDebugEventPoller(ABC):
class PluginTriggerDebugEventPoller(TriggerDebugEventPoller):
def poll(self) -> TriggerDebugEvent | None:
from services.trigger.trigger_service import TriggerService
plugin_trigger_data = TriggerEventNodeData.model_validate(self.node_config.get("data", {}))
provider_id = TriggerProviderID(plugin_trigger_data.provider_id)
pool_key: str = PluginTriggerDebugEvent.build_pool_key(
@ -67,7 +67,7 @@ class PluginTriggerDebugEventPoller(TriggerDebugEventPoller):
return None
trigger_event_response: TriggerInvokeEventResponse = TriggerService.invoke_trigger_event(
event=plugin_trigger_event,
user_id=self.user_id,
user_id=plugin_trigger_event.user_id,
tenant_id=self.tenant_id,
node_config=self.node_config,
)
@ -103,6 +103,8 @@ class WebhookTriggerDebugEventPoller(TriggerDebugEventPoller):
if not webhook_event:
return None
from services.trigger.webhook_service import WebhookService
payload = webhook_event.payload or {}
workflow_inputs = payload.get("inputs")
if workflow_inputs is None:

View File

@ -71,6 +71,7 @@ class PluginTriggerDebugEvent(BaseDebugEvent):
"""Debug event for plugin triggers."""
name: str
user_id: str = Field(description="This is end user id, only for trigger the event. no related with account user id")
request_id: str
subscription_id: str
provider_id: str

View File

@ -267,7 +267,6 @@ class PluginTriggerProviderController:
def dispatch(
self,
user_id: str,
request: Request,
subscription: Subscription,
credentials: Mapping[str, str],
@ -288,7 +287,6 @@ class PluginTriggerProviderController:
response: TriggerDispatchResponse = manager.dispatch_event(
tenant_id=self.tenant_id,
user_id=user_id,
provider=str(provider_id),
subscription=subscription.model_dump(),
request=request,

View File

@ -13,7 +13,7 @@ import contexts
from configs import dify_config
from core.plugin.entities.plugin_daemon import CredentialType, PluginTriggerProviderEntity
from core.plugin.entities.request import TriggerInvokeEventResponse
from core.plugin.impl.exc import PluginDaemonError, PluginInvokeError
from core.plugin.impl.exc import PluginDaemonError, PluginInvokeError, PluginNotFoundError
from core.plugin.impl.trigger import PluginTriggerManager
from core.trigger.entities.entities import (
EventEntity,
@ -100,13 +100,13 @@ class TriggerManager:
if provider_id_str in plugin_trigger_providers:
return plugin_trigger_providers[provider_id_str]
manager = PluginTriggerManager()
provider = manager.fetch_trigger_provider(tenant_id, provider_id)
if not provider:
raise ValueError(f"Trigger provider {provider_id} not found")
try:
manager = PluginTriggerManager()
provider = manager.fetch_trigger_provider(tenant_id, provider_id)
if not provider:
raise ValueError(f"Trigger provider {provider_id} not found")
controller = PluginTriggerProviderController(
entity=provider.declaration,
plugin_id=provider.plugin_id,
@ -116,6 +116,8 @@ class TriggerManager:
)
plugin_trigger_providers[provider_id_str] = controller
return controller
except PluginNotFoundError as e:
raise ValueError(f"Trigger provider {provider_id} not found") from e
except PluginDaemonError as e:
raise e
except Exception as e:

View File

@ -1,12 +1,12 @@
import logging
import time
import uuid
from collections.abc import Mapping, Sequence
from collections.abc import Mapping
from typing import Any
from flask import Request, Response
from pydantic import BaseModel
from sqlalchemy import and_, func, select
from sqlalchemy import and_, select
from sqlalchemy.orm import Session
from core.plugin.entities.plugin_daemon import CredentialType
@ -14,25 +14,21 @@ from core.plugin.entities.request import TriggerDispatchResponse, TriggerInvokeE
from core.plugin.impl.exc import PluginNotFoundError
from core.plugin.utils.http_parser import deserialize_request, serialize_request
from core.trigger.debug.events import PluginTriggerDebugEvent
from core.trigger.entities.entities import EventEntity
from core.trigger.provider import PluginTriggerProviderController
from core.trigger.trigger_manager import TriggerManager
from core.trigger.utils.encryption import create_trigger_provider_encrypter_for_subscription
from core.workflow.enums import NodeType
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
from core.workflow.nodes.trigger_schedule.exc import TenantOwnerNotFoundError
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
from models.account import Account, TenantAccountJoin, TenantAccountRole
from models.enums import WorkflowRunTriggeredFrom
from models.model import App
from models.provider_ids import TriggerProviderID
from models.trigger import TriggerSubscription
from models.workflow import AppTrigger, AppTriggerStatus, Workflow, WorkflowPluginTrigger
from services.async_workflow_service import AsyncWorkflowService
from services.trigger.trigger_provider_service import TriggerProviderService
from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData
from services.workflow.entities import PluginTriggerDispatchData
from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async
logger = logging.getLogger(__name__)
@ -77,147 +73,6 @@ class TriggerService:
request=request,
)
@classmethod
def _get_latest_workflows_by_app_ids(
cls, session: Session, subscribers: Sequence[WorkflowPluginTrigger]
) -> Mapping[str, Workflow]:
"""Get the latest workflows by app_ids"""
workflow_query = (
select(Workflow.app_id, func.max(Workflow.created_at).label("max_created_at"))
.where(
Workflow.app_id.in_({t.app_id for t in subscribers}),
Workflow.version != Workflow.VERSION_DRAFT,
)
.group_by(Workflow.app_id)
.subquery()
)
workflows = session.scalars(
select(Workflow).join(
workflow_query,
(Workflow.app_id == workflow_query.c.app_id) & (Workflow.created_at == workflow_query.c.max_created_at),
)
).all()
return {w.app_id: w for w in workflows}
@classmethod
def _get_tenant_owner(cls, session: Session, tenant_id: str) -> Account:
"""Get the tenant owner account for workflow execution."""
owner = session.scalar(
select(Account)
.join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id)
.where(TenantAccountJoin.tenant_id == tenant_id, TenantAccountJoin.role == TenantAccountRole.OWNER)
)
if not owner:
raise TenantOwnerNotFoundError(f"Tenant owner not found for tenant {tenant_id}")
return owner
@classmethod
def dispatch_triggered_workflows(
cls, subscription: TriggerSubscription, event: EventEntity, request_id: str
) -> int:
"""Process triggered workflows.
Args:
subscription: The trigger subscription
event: The trigger entity that was activated
request_id: The ID of the stored request in storage system
"""
request = deserialize_request(storage.load_once(f"triggers/{request_id}"))
if not request:
logger.error("Request not found for request_id %s", request_id)
return 0
subscribers: list[WorkflowPluginTrigger] = cls.get_subscriber_triggers(
tenant_id=subscription.tenant_id, subscription_id=subscription.id, event_name=event.identity.name
)
if not subscribers:
logger.warning(
"No workflows found for trigger event '%s' in subscription '%s'",
event.identity.name,
subscription.id,
)
return 0
dispatched_count = 0
provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider(
tenant_id=subscription.tenant_id, provider_id=TriggerProviderID(subscription.provider_id)
)
with Session(db.engine) as session:
tenant_owner = cls._get_tenant_owner(session, subscription.tenant_id)
workflows = cls._get_latest_workflows_by_app_ids(session, subscribers)
for plugin_trigger in subscribers:
# Get workflow from mapping
workflow = workflows.get(plugin_trigger.app_id)
if not workflow:
logger.error(
"Workflow not found for app %s",
plugin_trigger.app_id,
)
continue
# Find the trigger node in the workflow
event_node = None
for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN):
if node_id == plugin_trigger.node_id:
event_node = node_config
break
if not event_node:
logger.error("Trigger event node not found for app %s", plugin_trigger.app_id)
continue
# invoke triger
node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(event_node)
invoke_response: TriggerInvokeEventResponse = TriggerManager.invoke_trigger_event(
tenant_id=subscription.tenant_id,
user_id=subscription.user_id,
provider_id=TriggerProviderID(subscription.provider_id),
event_name=event.identity.name,
parameters=node_data.resolve_parameters(
parameter_schemas=provider_controller.get_event_parameters(event_name=event.identity.name)
),
credentials=subscription.credentials,
credential_type=CredentialType.of(subscription.credential_type),
subscription=subscription.to_entity(),
request=request,
)
if invoke_response.cancelled:
logger.info(
"Trigger ignored for app %s with trigger event %s",
plugin_trigger.app_id,
event.identity.name,
)
continue
# Create trigger data for async execution
trigger_data = PluginTriggerData(
app_id=plugin_trigger.app_id,
tenant_id=subscription.tenant_id,
workflow_id=workflow.id,
root_node_id=plugin_trigger.node_id,
trigger_type=WorkflowRunTriggeredFrom.PLUGIN,
plugin_id=subscription.provider_id,
endpoint_id=subscription.endpoint_id,
inputs=invoke_response.variables,
)
# Trigger async workflow
try:
AsyncWorkflowService.trigger_workflow_async(session, tenant_owner, trigger_data)
dispatched_count += 1
logger.info(
"Triggered workflow for app %s with trigger event %s",
plugin_trigger.app_id,
event.identity.name,
)
except Exception:
logger.exception(
"Failed to trigger workflow for app %s",
plugin_trigger.app_id,
)
return dispatched_count
@classmethod
def process_endpoint(cls, endpoint_id: str, request: Request) -> Response | None:
"""
@ -249,7 +104,6 @@ class TriggerService:
subscription=subscription,
)
dispatch_response: TriggerDispatchResponse = controller.dispatch(
user_id=subscription.user_id,
request=request,
subscription=subscription.to_entity(),
credentials=encrypter.decrypt(subscription.credentials),
@ -261,10 +115,20 @@ class TriggerService:
serialized_request = serialize_request(request)
storage.save(f"triggers/{request_id}", serialized_request)
# Production dispatch
from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async
# Validate event names
for event_name in dispatch_response.events:
if controller.get_event(event_name) is None:
logger.error(
"Event name %s not found in provider %s for endpoint %s",
event_name,
subscription.provider_id,
endpoint_id,
)
raise ValueError(f"Event name {event_name} not found in provider {subscription.provider_id}")
plugin_trigger_dispatch_data = PluginTriggerDispatchData(
user_id=dispatch_response.user_id,
tenant_id=subscription.tenant_id,
endpoint_id=endpoint_id,
provider_id=subscription.provider_id,
subscription_id=subscription.id,

View File

@ -459,7 +459,6 @@ class TriggerSubscriptionBuilderService:
)
try:
dispatch_response: TriggerDispatchResponse = controller.dispatch(
user_id=subscription_builder.user_id,
request=request,
subscription=subscription_builder.to_subscription(),
credentials={},

View File

@ -12,6 +12,8 @@ from sqlalchemy.orm import Session
from werkzeug.exceptions import RequestEntityTooLarge
from configs import dify_config
from controllers.service_api.wraps import get_or_create_end_user_by_type
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file.models import FileTransferMethod
from core.tools.tool_file_manager import ToolFileManager
from core.variables.types import SegmentType
@ -19,7 +21,6 @@ from core.workflow.enums import NodeType
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from factories import file_factory
from models.account import Account, TenantAccountJoin, TenantAccountRole
from models.enums import WorkflowRunTriggeredFrom
from models.model import App
from models.workflow import AppTrigger, AppTriggerStatus, AppTriggerType, Workflow, WorkflowWebhookTrigger
@ -704,20 +705,6 @@ class WebhookService:
"""
try:
with Session(db.engine) as session:
# Get tenant owner as the user for webhook execution
tenant_owner = session.scalar(
select(Account)
.join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id)
.where(
TenantAccountJoin.tenant_id == webhook_trigger.tenant_id,
TenantAccountJoin.role == TenantAccountRole.OWNER,
)
)
if not tenant_owner:
logger.error("Tenant owner not found for tenant %s", webhook_trigger.tenant_id)
raise ValueError("Tenant owner not found")
# Prepare inputs for the webhook node
# The webhook node expects webhook_data in the inputs
workflow_inputs = cls.build_workflow_inputs(webhook_data)
@ -732,10 +719,17 @@ class WebhookService:
tenant_id=webhook_trigger.tenant_id,
)
end_user = get_or_create_end_user_by_type(
type=InvokeFrom.TRIGGER,
tenant_id=webhook_trigger.tenant_id,
app_id=webhook_trigger.app_id,
user_id=None,
)
# Trigger workflow execution asynchronously
AsyncWorkflowService.trigger_workflow_async(
session,
tenant_owner,
end_user,
trigger_data,
)

View File

@ -61,6 +61,8 @@ class PluginTriggerData(TriggerData):
class PluginTriggerDispatchData(BaseModel):
"""Plugin trigger dispatch data for Celery tasks"""
user_id: str
tenant_id: str
endpoint_id: str
provider_id: str
subscription_id: str

View File

@ -6,25 +6,32 @@ to avoid blocking the main request thread.
"""
import logging
from collections.abc import Mapping, Sequence
from celery import shared_task
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import InvokeFrom
from core.plugin.entities.plugin_daemon import CredentialType
from core.plugin.entities.request import TriggerInvokeEventResponse
from core.plugin.utils.http_parser import deserialize_request
from core.trigger.debug.event_bus import TriggerDebugEventBus
from core.trigger.debug.events import PluginTriggerDebugEvent
from core.trigger.entities.entities import EventEntity
from core.trigger.provider import PluginTriggerProviderController
from core.trigger.trigger_manager import TriggerManager
from core.trigger.utils.encryption import (
create_trigger_provider_encrypter_for_properties,
create_trigger_provider_encrypter_for_subscription,
)
from core.workflow.enums import NodeType
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
from extensions.ext_database import db
from extensions.ext_storage import storage
from models.enums import WorkflowRunTriggeredFrom
from models.model import EndUser
from models.provider_ids import TriggerProviderID
from models.trigger import TriggerSubscription
from services.trigger.trigger_service import TriggerService
from services.workflow.entities import PluginTriggerDispatchData
from models.workflow import Workflow, WorkflowPluginTrigger
from services.async_workflow_service import AsyncWorkflowService
from services.trigger.trigger_provider_service import TriggerProviderService
from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData
logger = logging.getLogger(__name__)
@ -32,6 +39,230 @@ logger = logging.getLogger(__name__)
TRIGGER_QUEUE = "triggered_workflow_dispatcher"
def dispatch_trigger_debug_event(
events: list[str],
user_id: str,
timestamp: int,
request_id: str,
subscription: TriggerSubscription,
) -> int:
debug_dispatched = 0
try:
for event_name in events:
pool_key: str = PluginTriggerDebugEvent.build_pool_key(
name=event_name,
tenant_id=subscription.tenant_id,
subscription_id=subscription.id,
provider_id=subscription.provider_id,
)
trigger_debug_event: PluginTriggerDebugEvent = PluginTriggerDebugEvent(
timestamp=timestamp,
user_id=user_id,
name=event_name,
request_id=request_id,
subscription_id=subscription.id,
provider_id=subscription.provider_id,
)
debug_dispatched += TriggerDebugEventBus.dispatch(
tenant_id=subscription.tenant_id,
event=trigger_debug_event,
pool_key=pool_key,
)
logger.debug(
"Trigger debug dispatched %d sessions to pool %s for event %s for subscription %s provider %s",
debug_dispatched,
pool_key,
event_name,
subscription.id,
subscription.provider_id,
)
return debug_dispatched
except Exception:
logger.exception("Failed to dispatch to debug sessions")
return 0
def _get_latest_workflows_by_app_ids(
session: Session, subscribers: Sequence[WorkflowPluginTrigger]
) -> Mapping[str, Workflow]:
"""Get the latest workflows by app_ids"""
workflow_query = (
select(Workflow.app_id, func.max(Workflow.created_at).label("max_created_at"))
.where(
Workflow.app_id.in_({t.app_id for t in subscribers}),
Workflow.version != Workflow.VERSION_DRAFT,
)
.group_by(Workflow.app_id)
.subquery()
)
workflows = session.scalars(
select(Workflow).join(
workflow_query,
(Workflow.app_id == workflow_query.c.app_id) & (Workflow.created_at == workflow_query.c.max_created_at),
)
).all()
return {w.app_id: w for w in workflows}
def dispatch_triggered_workflow(
user_id: str,
subscription: TriggerSubscription,
event_name: str,
request_id: str,
) -> int:
"""Process triggered workflows.
Args:
subscription: The trigger subscription
event: The trigger entity that was activated
request_id: The ID of the stored request in storage system
"""
request = deserialize_request(storage.load_once(f"triggers/{request_id}"))
if not request:
logger.error("Request not found for request_id %s", request_id)
return 0
from services.trigger.trigger_service import TriggerService
subscribers: list[WorkflowPluginTrigger] = TriggerService.get_subscriber_triggers(
tenant_id=subscription.tenant_id, subscription_id=subscription.id, event_name=event_name
)
if not subscribers:
logger.warning(
"No workflows found for trigger event '%s' in subscription '%s'",
event_name,
subscription.id,
)
return 0
dispatched_count = 0
provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider(
tenant_id=subscription.tenant_id, provider_id=TriggerProviderID(subscription.provider_id)
)
with Session(db.engine) as session:
workflows: Mapping[str, Workflow] = _get_latest_workflows_by_app_ids(session, subscribers)
# Lazy import to avoid circular import during app initialization
from controllers.service_api.wraps import create_end_user_batch
end_users: Mapping[str, EndUser] = create_end_user_batch(
type=InvokeFrom.TRIGGER,
tenant_id=subscription.tenant_id,
app_ids=[plugin_trigger.app_id for plugin_trigger in subscribers],
user_id=user_id,
)
for plugin_trigger in subscribers:
# Get workflow from mapping
workflow: Workflow | None = workflows.get(plugin_trigger.app_id)
if not workflow:
logger.error(
"Workflow not found for app %s",
plugin_trigger.app_id,
)
continue
# Find the trigger node in the workflow
event_node = None
for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN):
if node_id == plugin_trigger.node_id:
event_node = node_config
break
if not event_node:
logger.error("Trigger event node not found for app %s", plugin_trigger.app_id)
continue
# invoke triger
node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(event_node)
invoke_response: TriggerInvokeEventResponse = TriggerManager.invoke_trigger_event(
tenant_id=subscription.tenant_id,
user_id=user_id,
provider_id=TriggerProviderID(subscription.provider_id),
event_name=event_name,
parameters=node_data.resolve_parameters(
parameter_schemas=provider_controller.get_event_parameters(event_name=event_name)
),
credentials=subscription.credentials,
credential_type=CredentialType.of(subscription.credential_type),
subscription=subscription.to_entity(),
request=request,
)
if invoke_response.cancelled:
logger.info(
"Trigger ignored for app %s with trigger event %s",
plugin_trigger.app_id,
event_name,
)
continue
# Create trigger data for async execution
trigger_data = PluginTriggerData(
app_id=plugin_trigger.app_id,
tenant_id=subscription.tenant_id,
workflow_id=workflow.id,
root_node_id=plugin_trigger.node_id,
trigger_type=WorkflowRunTriggeredFrom.PLUGIN,
plugin_id=subscription.provider_id,
endpoint_id=subscription.endpoint_id,
inputs=invoke_response.variables,
)
# Trigger async workflow
try:
end_user = end_users.get(plugin_trigger.app_id)
if not end_user:
raise ValueError(f"End user not found for app {plugin_trigger.app_id}")
AsyncWorkflowService.trigger_workflow_async(session=session, user=end_user, trigger_data=trigger_data)
dispatched_count += 1
logger.info(
"Triggered workflow for app %s with trigger event %s",
plugin_trigger.app_id,
event_name,
)
except Exception:
logger.exception(
"Failed to trigger workflow for app %s",
plugin_trigger.app_id,
)
return dispatched_count
def dispatch_triggered_workflows(
user_id: str,
events: list[str],
subscription: TriggerSubscription,
request_id: str,
) -> int:
dispatched_count = 0
for event_name in events:
try:
dispatched_count += dispatch_triggered_workflow(
user_id=user_id,
subscription=subscription,
event_name=event_name,
request_id=request_id,
)
except Exception:
logger.exception(
"Failed to dispatch trigger '%s' for subscription %s and provider %s. Continuing...",
event_name,
subscription.id,
subscription.provider_id,
)
# Continue processing other triggers even if one fails
continue
logger.info(
"Completed async trigger dispatching: processed %d/%d triggers for subscription %s and provider %s",
dispatched_count,
len(events),
subscription.id,
subscription.provider_id,
)
return dispatched_count
@shared_task(queue=TRIGGER_QUEUE)
def dispatch_triggered_workflows_async(
dispatch_data: dict,
@ -51,6 +282,8 @@ def dispatch_triggered_workflows_async(
dict: Execution result with status and dispatched trigger count
"""
dispatch_params: PluginTriggerDispatchData = PluginTriggerDispatchData.model_validate(dispatch_data)
user_id = dispatch_params.user_id
tenant_id = dispatch_params.tenant_id
endpoint_id = dispatch_params.endpoint_id
provider_id = dispatch_params.provider_id
subscription_id = dispatch_params.subscription_id
@ -60,7 +293,8 @@ def dispatch_triggered_workflows_async(
try:
logger.info(
"Starting trigger dispatching endpoint=%s, events=%s, request_id=%s, subscription_id=%s, provider_id=%s",
"Starting trigger dispatching uid=%s, endpoint=%s, events=%s, req_id=%s, sub_id=%s, provider_id=%s",
user_id,
endpoint_id,
events,
request_id,
@ -68,125 +302,35 @@ def dispatch_triggered_workflows_async(
provider_id,
)
# Verify request exists in storage
try:
serialized_request = storage.load_once(f"triggers/{request_id}")
# Just verify it exists, we don't need to deserialize it here
if not serialized_request:
raise ValueError("Request not found in storage")
except Exception as e:
logger.exception("Failed to load request %s", request_id, exc_info=e)
return {"status": "failed", "error": f"Failed to load request: {str(e)}"}
subscription: TriggerSubscription | None = TriggerProviderService.get_subscription_by_id(
tenant_id=tenant_id,
subscription_id=subscription_id,
)
if not subscription:
logger.error("Subscription not found: %s", subscription_id)
return {"status": "failed", "error": "Subscription not found"}
with Session(db.engine) as session:
# Get subscription
subscription: TriggerSubscription | None = (
session.query(TriggerSubscription).filter_by(id=subscription_id).first()
)
if not subscription:
logger.error("Subscription not found: %s", subscription_id)
return {"status": "failed", "error": "Subscription not found"}
workflow_dispatched = dispatch_triggered_workflows(
user_id=user_id,
events=events,
subscription=subscription,
request_id=request_id,
)
# Get controller
controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider(
subscription.tenant_id, TriggerProviderID(provider_id)
)
if not controller:
logger.error("Controller not found for provider: %s", provider_id)
return {"status": "failed", "error": "Controller not found"}
debug_dispatched = dispatch_trigger_debug_event(
events=events,
user_id=user_id,
timestamp=timestamp,
request_id=request_id,
subscription=subscription,
)
credential_encrypter, _ = create_trigger_provider_encrypter_for_subscription(
tenant_id=subscription.tenant_id,
controller=controller,
subscription=subscription,
)
subscription.credentials = credential_encrypter.decrypt(subscription.credentials)
properties_encrypter, _ = create_trigger_provider_encrypter_for_properties(
tenant_id=subscription.tenant_id,
controller=controller,
subscription=subscription,
)
subscription.properties = properties_encrypter.decrypt(subscription.properties)
# Dispatch each trigger
dispatched_count = 0
for event_name in events:
try:
event: EventEntity | None = controller.get_event(event_name)
if event is None:
logger.error(
"Trigger '%s' not found in provider '%s'",
event_name,
provider_id,
)
continue
dispatched_count += TriggerService.dispatch_triggered_workflows(
subscription=subscription,
event=event,
request_id=request_id,
)
except Exception:
logger.exception(
"Failed to dispatch trigger '%s' for subscription %s and provider %s. Continuing...",
event_name,
subscription_id,
provider_id,
)
# Continue processing other triggers even if one fails
continue
# Dispatch to debug sessions after processing all triggers
debug_dispatched = 0
try:
for event_name in events:
pool_key: str = PluginTriggerDebugEvent.build_pool_key(
name=event_name,
tenant_id=subscription.tenant_id,
subscription_id=subscription_id,
provider_id=provider_id,
)
event = PluginTriggerDebugEvent(
provider_id=provider_id,
subscription_id=subscription_id,
request_id=request_id,
timestamp=timestamp,
name=event_name,
)
debug_dispatched += TriggerDebugEventBus.dispatch(
tenant_id=subscription.tenant_id,
event=event,
pool_key=pool_key,
)
logger.debug(
"Trigger debug dispatched %d sessions to pool %s for event %s for subscription %s provider %s",
debug_dispatched,
pool_key,
event_name,
subscription_id,
provider_id,
)
except Exception:
# Silent failure for debug dispatch
logger.exception("Failed to dispatch to debug sessions")
logger.info(
"Completed async trigger dispatching: processed %d/%d triggers for subscription %s and provider %s",
dispatched_count,
len(events),
subscription_id,
provider_id,
)
return {
"status": "completed",
"total_count": len(events),
"dispatched_count": dispatched_count,
"debug_dispatched_count": debug_dispatched,
}
return {
"status": "completed",
"total_count": len(events),
"workflows": workflow_dispatched,
"debug_events": debug_dispatched,
}
except Exception as e:
logger.exception(