diff --git a/api/controllers/console/app/workflow_app_log.py b/api/controllers/console/app/workflow_app_log.py index cbf4e84ff0..d7ecc7c91b 100644 --- a/api/controllers/console/app/workflow_app_log.py +++ b/api/controllers/console/app/workflow_app_log.py @@ -28,6 +28,7 @@ class WorkflowAppLogApi(Resource): "created_at__after": "Filter logs created after this timestamp", "created_by_end_user_session_id": "Filter by end user session ID", "created_by_account": "Filter by account", + "detail": "Whether to return detailed logs", "page": "Page number (1-99999)", "limit": "Number of items per page (1-100)", } @@ -68,6 +69,7 @@ class WorkflowAppLogApi(Resource): required=False, default=None, ) + .add_argument("detail", type=bool, location="args", required=False, default=False) .add_argument("page", type=int_range(1, 99999), default=1, location="args") .add_argument("limit", type=int_range(1, 100), default=20, location="args") ) @@ -92,6 +94,7 @@ class WorkflowAppLogApi(Resource): created_at_after=args.created_at__after, page=args.page, limit=args.limit, + detail=args.detail, created_by_end_user_session_id=args.created_by_end_user_session_id, created_by_account=args.created_by_account, ) diff --git a/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py b/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py index 7b7e634e0f..5cedaeb6cf 100644 --- a/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py +++ b/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py @@ -68,7 +68,6 @@ class TriggerEventNode(Node): inputs = dict(self.graph_runtime_state.variable_pool.user_inputs) metadata = { WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: { - **inputs, "provider_id": self._node_data.provider_id, "event_name": self._node_data.event_name, "plugin_unique_identifier": self._node_data.plugin_unique_identifier, diff --git a/api/fields/workflow_app_log_fields.py b/api/fields/workflow_app_log_fields.py index 243efd817c..4cbdf6f0ca 100644 --- a/api/fields/workflow_app_log_fields.py +++ b/api/fields/workflow_app_log_fields.py @@ -8,6 +8,7 @@ from libs.helper import TimestampField workflow_app_log_partial_fields = { "id": fields.String, "workflow_run": fields.Nested(workflow_run_for_log_fields, attribute="workflow_run", allow_null=True), + "details": fields.Raw(attribute="details"), "created_from": fields.String, "created_by_role": fields.String, "created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True), diff --git a/api/migrations/versions/2025_10_27_1752-5ed4b21dbb8d_trigger_log_metadata.py b/api/migrations/versions/2025_10_27_1752-5ed4b21dbb8d_trigger_log_metadata.py new file mode 100644 index 0000000000..089246d2fa --- /dev/null +++ b/api/migrations/versions/2025_10_27_1752-5ed4b21dbb8d_trigger_log_metadata.py @@ -0,0 +1,32 @@ +"""trigger_log_metadata + +Revision ID: 5ed4b21dbb8d +Revises: 132392a2635f +Create Date: 2025-10-27 17:52:35.658975 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '5ed4b21dbb8d' +down_revision = '132392a2635f' +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op: + batch_op.add_column(sa.Column('trigger_metadata', sa.Text(), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op: + batch_op.drop_column('trigger_metadata') + + # ### end Alembic commands ### diff --git a/api/models/trigger.py b/api/models/trigger.py index d34006589a..5237a512e4 100644 --- a/api/models/trigger.py +++ b/api/models/trigger.py @@ -161,6 +161,7 @@ class WorkflowTriggerLog(Base): - workflow_id (uuid) Workflow ID - workflow_run_id (uuid) Optional - Associated workflow run ID when execution starts - root_node_id (string) Optional - Custom starting node ID for workflow execution + - trigger_metadata (text) Optional - Trigger metadata (JSON) - trigger_type (string) Type of trigger: webhook, schedule, plugin - trigger_data (text) Full trigger data including inputs (JSON) - inputs (text) Input parameters (JSON) @@ -195,7 +196,7 @@ class WorkflowTriggerLog(Base): workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False) workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID, nullable=True) root_node_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) - + trigger_metadata: Mapped[Optional[str]] = mapped_column(sa.Text, nullable=True) trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False) trigger_data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Full TriggerData as JSON inputs: Mapped[str] = mapped_column(sa.Text, nullable=False) # Just inputs for easy viewing @@ -240,6 +241,8 @@ class WorkflowTriggerLog(Base): "app_id": self.app_id, "workflow_id": self.workflow_id, "workflow_run_id": self.workflow_run_id, + "root_node_id": self.root_node_id, + "trigger_metadata": json.loads(self.trigger_metadata) if self.trigger_metadata else None, "trigger_type": self.trigger_type, "trigger_data": json.loads(self.trigger_data), "inputs": json.loads(self.inputs), diff --git a/api/services/async_workflow_service.py b/api/services/async_workflow_service.py index ab5ef7b6d4..e325028ccf 100644 --- a/api/services/async_workflow_service.py +++ b/api/services/async_workflow_service.py @@ -111,6 +111,7 @@ class AsyncWorkflowService: app_id=trigger_data.app_id, workflow_id=workflow.id, root_node_id=trigger_data.root_node_id, + trigger_metadata=trigger_data.trigger_metadata.model_dump_json(), trigger_type=trigger_data.trigger_type, trigger_data=trigger_data.model_dump_json(), inputs=json.dumps(dict(trigger_data.inputs)), diff --git a/api/services/workflow/entities.py b/api/services/workflow/entities.py index dd126cdef4..30abde8329 100644 --- a/api/services/workflow/entities.py +++ b/api/services/workflow/entities.py @@ -19,6 +19,12 @@ class AsyncTriggerStatus(StrEnum): TIMEOUT = "timeout" +class TriggerMetadata(BaseModel): + """Trigger metadata""" + + pass + + class TriggerData(BaseModel): """Base trigger data model for async workflow execution""" @@ -30,6 +36,7 @@ class TriggerData(BaseModel): files: Sequence[Mapping[str, Any]] = Field(default_factory=list) trigger_type: AppTriggerType trigger_from: WorkflowRunTriggeredFrom + trigger_metadata: TriggerMetadata = Field(default_factory=TriggerMetadata) model_config = ConfigDict(use_enum_values=True) @@ -48,6 +55,17 @@ class ScheduleTriggerData(TriggerData): trigger_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.SCHEDULE +class PluginTriggerMetadata(TriggerMetadata): + """Plugin trigger metadata""" + + plugin_id: str + endpoint_id: str + plugin_unique_identifier: str + provider_id: str + icon_url: str + icon_dark_url: str + + class PluginTriggerData(TriggerData): """Plugin webhook trigger data""" diff --git a/api/services/workflow_app_service.py b/api/services/workflow_app_service.py index 23dd436675..378562e590 100644 --- a/api/services/workflow_app_service.py +++ b/api/services/workflow_app_service.py @@ -1,3 +1,4 @@ +import json import uuid from datetime import datetime @@ -7,6 +8,35 @@ from sqlalchemy.orm import Session from core.workflow.enums import WorkflowExecutionStatus from models import Account, App, EndUser, WorkflowAppLog, WorkflowRun from models.enums import CreatorUserRole +from models.trigger import WorkflowTriggerLog + + +# Since the workflow_app_log table has exceeded 100 million records, we use an additional details field to extend it +class LogView: + """Lightweight wrapper for WorkflowAppLog with computed details. + + - Exposes `details_` for marshalling to `details` in API response + - Proxies all other attributes to the underlying `WorkflowAppLog` + """ + + def __init__(self, log: WorkflowAppLog, details: dict | None): + self.log = log + self.details_ = details + + def __getattr__(self, name): + return getattr(self.log, name) + + +# Helpers +def _safe_json_loads(val): + if not val: + return None + if isinstance(val, str): + try: + return json.loads(val) + except Exception: + return None + return val class WorkflowAppService: @@ -21,6 +51,7 @@ class WorkflowAppService: created_at_after: datetime | None = None, page: int = 1, limit: int = 20, + detail: bool = False, created_by_end_user_session_id: str | None = None, created_by_account: str | None = None, ): @@ -34,6 +65,7 @@ class WorkflowAppService: :param created_at_after: filter logs created after this timestamp :param page: page number :param limit: items per page + :param detail: whether to return detailed logs :param created_by_end_user_session_id: filter by end user session id :param created_by_account: filter by account email :return: Pagination object @@ -43,8 +75,24 @@ class WorkflowAppService: WorkflowAppLog.tenant_id == app_model.tenant_id, WorkflowAppLog.app_id == app_model.id ) + if detail: + # Correlated scalar subquery: fetch latest trigger_metadata per workflow_run_id + meta_expr = ( + select(WorkflowTriggerLog.trigger_metadata) + .where( + WorkflowTriggerLog.workflow_run_id == WorkflowAppLog.workflow_run_id, + WorkflowTriggerLog.app_id == app_model.id, + WorkflowTriggerLog.tenant_id == app_model.tenant_id, + ) + .order_by(WorkflowTriggerLog.created_at.desc()) + .limit(1) + .scalar_subquery() + ) + stmt = stmt.add_columns(meta_expr) + if keyword or status: stmt = stmt.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id) + # Join to workflow run for filtering when needed. if keyword: keyword_like_val = f"%{keyword[:30].encode('unicode_escape').decode('utf-8')}%".replace(r"\u", r"\\u") @@ -108,9 +156,14 @@ class WorkflowAppService: # Apply pagination limits offset_stmt = stmt.offset((page - 1) * limit).limit(limit) - # Execute query and get items - items = list(session.scalars(offset_stmt).all()) + # wrapper moved to module scope as `LogView` + # Execute query and get items + if detail: + rows = session.execute(offset_stmt).all() + items = [LogView(log, {"trigger_metadata": _safe_json_loads(meta_val)}) for log, meta_val in rows] + else: + items = [LogView(log, None) for log in session.scalars(offset_stmt).all()] return { "page": page, "limit": limit, diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index d1639375ad..cb59d835ef 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -18,6 +18,7 @@ from core.plugin.entities.plugin_daemon import CredentialType from core.plugin.entities.request import TriggerInvokeEventResponse from core.trigger.debug.event_bus import TriggerDebugEventBus from core.trigger.debug.events import PluginTriggerDebugEvent, build_plugin_pool_key +from core.trigger.entities.api_entities import TriggerProviderApiEntity from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager from core.workflow.enums import NodeType @@ -31,7 +32,7 @@ from services.async_workflow_service import AsyncWorkflowService from services.end_user_service import EndUserService from services.trigger.trigger_provider_service import TriggerProviderService from services.trigger.trigger_request_service import TriggerHttpRequestCachingService -from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData +from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData, PluginTriggerMetadata logger = logging.getLogger(__name__) @@ -138,6 +139,7 @@ def dispatch_triggered_workflow( provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider( tenant_id=subscription.tenant_id, provider_id=TriggerProviderID(subscription.provider_id) ) + trigger_entity: TriggerProviderApiEntity = provider_controller.to_api_entity() with Session(db.engine) as session: workflows: Mapping[str, Workflow] = _get_latest_workflows_by_app_ids(session, subscribers) @@ -201,6 +203,14 @@ def dispatch_triggered_workflow( plugin_id=subscription.provider_id, endpoint_id=subscription.endpoint_id, inputs=invoke_response.variables, + trigger_metadata=PluginTriggerMetadata( + plugin_id=trigger_entity.plugin_id or "", + plugin_unique_identifier=trigger_entity.plugin_unique_identifier or "", + endpoint_id=subscription.endpoint_id, + provider_id=subscription.provider_id, + icon_url=trigger_entity.icon or "", + icon_dark_url=trigger_entity.icon_dark or "", + ), ) # Trigger async workflow