mirror of
https://github.com/langgenius/dify.git
synced 2026-01-14 06:07:33 +08:00
- Modified `PluginTriggerApi` to accept `trigger_name` as a JSON argument and return encoded plugin triggers. - Updated `WorkflowPluginTrigger` model to replace `trigger_id` with `trigger_name` for better clarity. - Adjusted `WorkflowPluginTriggerService` to handle the new `trigger_name` field and ensure proper error handling for subscriptions. - Enhanced `workflow_trigger_fields` to include `trigger_name` in the plugin trigger schema. This change improves the API's clarity and aligns the model with the updated naming conventions.
220 lines
8.5 KiB
Python
220 lines
8.5 KiB
Python
import json
|
|
import logging
|
|
import uuid
|
|
|
|
from flask import Request, Response
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from core.plugin.entities.plugin import TriggerProviderID
|
|
from core.plugin.utils.http_parser import serialize_request
|
|
from core.trigger.entities.entities import TriggerEntity
|
|
from core.trigger.trigger_manager import TriggerManager
|
|
from extensions.ext_database import db
|
|
from extensions.ext_redis import redis_client
|
|
from extensions.ext_storage import storage
|
|
from models.account import Account, TenantAccountJoin, TenantAccountRole
|
|
from models.enums import WorkflowRunTriggeredFrom
|
|
from models.trigger import TriggerSubscription
|
|
from models.workflow import Workflow, WorkflowPluginTrigger
|
|
from services.async_workflow_service import AsyncWorkflowService
|
|
from services.trigger.trigger_provider_service import TriggerProviderService
|
|
from services.workflow.entities import PluginTriggerData
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TriggerService:
|
|
__TEMPORARY_ENDPOINT_EXPIRE_MS__ = 5 * 60 * 1000
|
|
__ENDPOINT_REQUEST_CACHE_COUNT__ = 10
|
|
__ENDPOINT_REQUEST_CACHE_EXPIRE_MS__ = 5 * 60 * 1000
|
|
|
|
@classmethod
|
|
def process_triggered_workflows(
|
|
cls, subscription: TriggerSubscription, trigger: TriggerEntity, request: Request
|
|
) -> None:
|
|
"""Process triggered workflows."""
|
|
|
|
subscribers = cls._get_subscriber_triggers(subscription=subscription, trigger=trigger)
|
|
if not subscribers:
|
|
logger.warning(
|
|
"No workflows found for trigger '%s' in subscription '%s'",
|
|
trigger.identity.name,
|
|
subscription.id,
|
|
)
|
|
return
|
|
|
|
with Session(db.engine) as session:
|
|
# Get tenant owner for workflow execution
|
|
tenant_owner = session.scalar(
|
|
select(Account)
|
|
.join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id)
|
|
.where(
|
|
TenantAccountJoin.tenant_id == subscription.tenant_id,
|
|
TenantAccountJoin.role == TenantAccountRole.OWNER,
|
|
)
|
|
)
|
|
|
|
if not tenant_owner:
|
|
logger.error("Tenant owner not found for tenant %s", subscription.tenant_id)
|
|
return
|
|
|
|
for plugin_trigger in subscribers:
|
|
# 2. Get workflow
|
|
workflow = session.scalar(
|
|
select(Workflow)
|
|
.where(
|
|
Workflow.app_id == plugin_trigger.app_id,
|
|
Workflow.version != Workflow.VERSION_DRAFT,
|
|
)
|
|
.order_by(Workflow.created_at.desc())
|
|
)
|
|
|
|
if not workflow:
|
|
logger.error(
|
|
"Workflow not found for app %s",
|
|
plugin_trigger.app_id,
|
|
)
|
|
continue
|
|
|
|
# Get trigger parameters from node configuration
|
|
node_config = workflow.get_node_config_by_id(plugin_trigger.node_id)
|
|
parameters = node_config.get("data", {}).get("parameters", {}) if node_config else {}
|
|
|
|
# 3. Store trigger data
|
|
storage_key = cls._store_trigger_data(request, subscription, trigger, parameters)
|
|
|
|
# 4. Create trigger data for async execution
|
|
trigger_data = PluginTriggerData(
|
|
app_id=plugin_trigger.app_id,
|
|
tenant_id=subscription.tenant_id,
|
|
workflow_id=workflow.id,
|
|
root_node_id=plugin_trigger.node_id,
|
|
trigger_type=WorkflowRunTriggeredFrom.PLUGIN,
|
|
plugin_id=subscription.provider_id,
|
|
webhook_url=f"trigger/endpoint/{subscription.endpoint_id}", # For tracking
|
|
inputs={"storage_key": storage_key}, # Pass storage key to async task
|
|
)
|
|
|
|
# 5. Trigger async workflow
|
|
try:
|
|
AsyncWorkflowService.trigger_workflow_async(session, tenant_owner, trigger_data)
|
|
logger.info(
|
|
"Triggered workflow for app %s with trigger %s",
|
|
plugin_trigger.app_id,
|
|
trigger.identity.name,
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to trigger workflow for app %s",
|
|
plugin_trigger.app_id,
|
|
)
|
|
|
|
@classmethod
|
|
def select_triggers(cls, controller, dispatch_response, provider_id, subscription) -> list[TriggerEntity]:
|
|
triggers = []
|
|
for trigger_name in dispatch_response.triggers:
|
|
trigger = controller.get_trigger(trigger_name)
|
|
if trigger is None:
|
|
logger.error(
|
|
"Trigger '%s' not found in provider '%s' for tenant '%s'",
|
|
trigger_name,
|
|
provider_id,
|
|
subscription.tenant_id,
|
|
)
|
|
raise ValueError(f"Trigger '{trigger_name}' not found")
|
|
triggers.append(trigger)
|
|
return triggers
|
|
|
|
@classmethod
|
|
def process_endpoint(cls, endpoint_id: str, request: Request) -> Response | None:
|
|
"""Extract and process data from incoming endpoint request."""
|
|
subscription = TriggerProviderService.get_subscription_by_endpoint(endpoint_id)
|
|
if not subscription:
|
|
return None
|
|
|
|
provider_id = TriggerProviderID(subscription.provider_id)
|
|
controller = TriggerManager.get_trigger_provider(subscription.tenant_id, provider_id)
|
|
if not controller:
|
|
return None
|
|
|
|
dispatch_response = controller.dispatch(
|
|
user_id=subscription.user_id, request=request, subscription=subscription.to_entity()
|
|
)
|
|
|
|
if dispatch_response.triggers:
|
|
request_id = f"trigger_request_{uuid.uuid4().hex}"
|
|
serialized_request = serialize_request(request)
|
|
storage.save(f"triggers/{request_id}", serialized_request)
|
|
|
|
from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async
|
|
|
|
dispatch_triggered_workflows_async(
|
|
endpoint_id=endpoint_id,
|
|
provider_id=subscription.provider_id,
|
|
subscription_id=subscription.id,
|
|
triggers=list(dispatch_response.triggers),
|
|
request_id=request_id,
|
|
)
|
|
|
|
logger.info(
|
|
"Queued async dispatching for %d triggers on endpoint %s with request_id %s",
|
|
len(dispatch_response.triggers),
|
|
endpoint_id,
|
|
request_id,
|
|
)
|
|
|
|
return dispatch_response.response
|
|
|
|
@classmethod
|
|
def _get_subscriber_triggers(
|
|
cls, subscription: TriggerSubscription, trigger: TriggerEntity
|
|
) -> list[WorkflowPluginTrigger]:
|
|
"""Get WorkflowPluginTriggers for a subscription and trigger."""
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
subscribers = session.scalars(
|
|
select(WorkflowPluginTrigger).where(
|
|
WorkflowPluginTrigger.tenant_id == subscription.tenant_id,
|
|
WorkflowPluginTrigger.subscription_id == subscription.id,
|
|
WorkflowPluginTrigger.trigger_name == trigger.identity.name,
|
|
)
|
|
).all()
|
|
return list(subscribers)
|
|
|
|
@classmethod
|
|
def _store_trigger_data(
|
|
cls,
|
|
request: Request,
|
|
subscription: TriggerSubscription,
|
|
trigger: TriggerEntity,
|
|
parameters: dict,
|
|
) -> str:
|
|
"""Store trigger data in storage and return key."""
|
|
storage_key = f"trigger_data_{uuid.uuid4().hex}"
|
|
|
|
# Prepare data to store
|
|
trigger_data = {
|
|
"request": {
|
|
"method": request.method,
|
|
"headers": dict(request.headers),
|
|
"query_params": dict(request.args),
|
|
"body": request.get_data(as_text=True),
|
|
},
|
|
"subscription": {
|
|
"id": subscription.id,
|
|
"provider_id": subscription.provider_id,
|
|
"credentials": subscription.credentials,
|
|
"credential_type": subscription.credential_type,
|
|
},
|
|
"trigger": {
|
|
"name": trigger.identity.name,
|
|
"parameters": parameters,
|
|
},
|
|
"user_id": subscription.user_id,
|
|
}
|
|
|
|
# Store with 1 hour TTL using Redis
|
|
redis_client.setex(storage_key, 3600, json.dumps(trigger_data))
|
|
|
|
return storage_key
|