WIP: huamninput email sending

This commit is contained in:
QuantumGhost 2025-12-08 09:26:41 +08:00
parent e6fbf3a198
commit 203a3a68af
10 changed files with 532 additions and 31 deletions

View File

@ -1,3 +1,4 @@
import logging
import time
from collections.abc import Mapping, Sequence
from typing import Any, cast
@ -54,6 +55,7 @@ from core.workflow.graph_events import (
NodeRunSucceededEvent,
)
from core.workflow.graph_events.graph import GraphRunAbortedEvent
from core.workflow.entities.pause_reason import HumanInputRequired
from core.workflow.nodes import NodeType
from core.workflow.nodes.node_factory import DifyNodeFactory
from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
@ -61,9 +63,12 @@ from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool
from core.workflow.workflow_entry import WorkflowEntry
from tasks.mail_human_input_delivery_task import dispatch_human_input_email_task
from models.enums import UserFrom
from models.workflow import Workflow
logger = logging.getLogger(__name__)
class WorkflowBasedAppRunner:
def __init__(
@ -367,6 +372,7 @@ class WorkflowBasedAppRunner:
elif isinstance(event, GraphRunPausedEvent):
runtime_state = workflow_entry.graph_engine.graph_runtime_state
paused_nodes = runtime_state.get_paused_nodes()
self._enqueue_human_input_notifications(event.reasons)
self._publish_event(
QueueWorkflowPausedEvent(
reasons=event.reasons,
@ -580,5 +586,19 @@ class WorkflowBasedAppRunner:
)
)
def _enqueue_human_input_notifications(self, reasons: Sequence[object]) -> None:
for reason in reasons:
if not isinstance(reason, HumanInputRequired):
continue
if not reason.form_id:
continue
try:
dispatch_human_input_email_task.apply_async(
kwargs={"form_id": reason.form_id, "node_title": reason.node_title},
queue="mail",
)
except Exception: # pragma: no cover - defensive logging
logger.exception("Failed to enqueue human input email task for form %s", reason.form_id)
def _publish_event(self, event: AppQueueEvent):
self._queue_manager.publish(event, PublishFrom.APPLICATION_MANAGER)

View File

@ -8,7 +8,7 @@ import uuid
from collections.abc import Mapping, Sequence
from datetime import datetime, timedelta
from enum import StrEnum
from typing import Annotated, Any, Literal, Optional, Self
from typing import Annotated, Any, ClassVar, Literal, Optional, Self
from pydantic import BaseModel, Field, field_validator, model_validator
@ -105,10 +105,26 @@ class EmailRecipients(BaseModel):
class EmailDeliveryConfig(BaseModel):
"""Configuration for email delivery method."""
URL_PLACEHOLDER: ClassVar[str] = "{{#url#}}"
recipients: EmailRecipients
# the subject of email
subject: str
# Body is the content of email, it may contain the speical placeholder `{{#url#}}`, which
# represent the url to submit the form.
body: str
@classmethod
def replace_url_placeholder(cls, body: str, url: str | None) -> str:
"""Replace the url placeholder with provided value."""
return body.replace(cls.URL_PLACEHOLDER, url or "")
def body_with_url(self, url: str | None) -> str:
"""Return body content with url placeholder replaced."""
return self.replace_url_placeholder(self.body, url)
class _DeliveryMethodBase(BaseModel):
"""Base delivery method configuration."""

View File

@ -0,0 +1,49 @@
"""
Email template rendering helpers with configurable safety modes.
"""
import time
from collections.abc import Mapping
from typing import Any
from flask import render_template_string
from jinja2.runtime import Context
from jinja2.sandbox import ImmutableSandboxedEnvironment
from configs import dify_config
from configs.feature import TemplateMode
class SandboxedEnvironment(ImmutableSandboxedEnvironment):
"""Sandboxed environment with execution timeout."""
def __init__(self, timeout: int, *args: Any, **kwargs: Any):
self._deadline = time.time() + timeout if timeout else None
super().__init__(*args, **kwargs)
def call(self, context: Context, obj: Any, *args: Any, **kwargs: Any) -> Any:
if self._deadline is not None and time.time() > self._deadline:
raise TimeoutError("Template rendering timeout")
return super().call(context, obj, *args, **kwargs)
def render_email_template(template: str, substitutions: Mapping[str, str]) -> str:
"""
Render email template content according to the configured template mode.
In unsafe mode, Jinja expressions are evaluated directly.
In sandbox mode, a sandboxed environment with timeout is used.
In disabled mode, the template is returned without rendering.
"""
mode = dify_config.MAIL_TEMPLATING_MODE
timeout = dify_config.MAIL_TEMPLATING_TIMEOUT
if mode == TemplateMode.UNSAFE:
return render_template_string(template, **substitutions)
if mode == TemplateMode.SANDBOX:
env = SandboxedEnvironment(timeout=timeout)
tmpl = env.from_string(template)
return tmpl.render(substitutions)
if mode == TemplateMode.DISABLED:
return template
raise ValueError(f"Unsupported mail templating mode: {mode}")

View File

@ -0,0 +1,177 @@
import json
import logging
import time
from dataclasses import dataclass
from typing import Any
import click
from celery import shared_task
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from configs import dify_config
from core.workflow.nodes.human_input.entities import EmailDeliveryConfig, EmailDeliveryMethod
from extensions.ext_database import db
from extensions.ext_mail import mail
from libs.email_template_renderer import render_email_template
from models.human_input import (
DeliveryMethodType,
HumanInputDelivery,
HumanInputForm,
HumanInputFormRecipient,
RecipientType,
)
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class _EmailRecipient:
email: str
token: str
@dataclass(frozen=True)
class _EmailDeliveryJob:
form_id: str
workflow_run_id: str
subject: str
body: str
form_content: str
recipients: list[_EmailRecipient]
def _build_form_link(token: str | None) -> str | None:
if not token:
return None
base_url = dify_config.CONSOLE_WEB_URL
if not base_url:
return None
return f"{base_url.rstrip('/')}/api/form/human_input/{token}"
def _parse_recipient_payload(payload: str) -> tuple[str | None, RecipientType | None]:
try:
payload_dict: dict[str, Any] = json.loads(payload)
except Exception:
logger.exception("Failed to parse recipient payload")
return None, None
return payload_dict.get("email"), payload_dict.get("TYPE")
def _load_email_jobs(session: Session, form_id: str) -> list[_EmailDeliveryJob]:
form = session.get(HumanInputForm, form_id)
if form is None:
logger.warning("Human input form not found, form_id=%s", form_id)
return []
deliveries = session.scalars(
select(HumanInputDelivery).where(
HumanInputDelivery.form_id == form_id,
HumanInputDelivery.delivery_method_type == DeliveryMethodType.EMAIL,
)
).all()
jobs: list[_EmailDeliveryJob] = []
for delivery in deliveries:
delivery_config = EmailDeliveryMethod.model_validate_json(delivery.channel_payload)
recipients = session.scalars(
select(HumanInputFormRecipient).where(HumanInputFormRecipient.delivery_id == delivery.id)
).all()
recipient_entities: list[_EmailRecipient] = []
for recipient in recipients:
email, recipient_type = _parse_recipient_payload(recipient.recipient_payload)
if recipient_type not in {RecipientType.EMAIL_MEMBER, RecipientType.EMAIL_EXTERNAL}:
continue
if not email:
continue
token = recipient.access_token
if not token:
continue
recipient_entities.append(_EmailRecipient(email=email, token=token))
if not recipient_entities:
continue
jobs.append(
_EmailDeliveryJob(
form_id=form_id,
workflow_run_id=form.workflow_run_id,
subject=delivery_config.config.subject,
body=delivery_config.config.body,
form_content=form.rendered_content,
recipients=recipient_entities,
)
)
return jobs
def _render_subject(subject_template: str, substitutions: dict[str, str]) -> str:
return render_email_template(subject_template, substitutions)
def _render_body(body_template: str, substitutions: dict[str, str]) -> str:
templated_body = EmailDeliveryConfig.replace_url_placeholder(body_template, substitutions.get("form_link"))
return render_email_template(templated_body, substitutions)
def _build_substitutions(
*,
job: _EmailDeliveryJob,
recipient: _EmailRecipient,
node_title: str | None,
) -> dict[str, str]:
raw_values: dict[str, str | None] = {
"form_id": job.form_id,
"workflow_run_id": job.workflow_run_id,
"node_title": node_title,
"form_token": recipient.token,
"form_link": _build_form_link(recipient.token),
"form_content": job.form_content,
"recipient_email": recipient.email,
}
return {key: value or "" for key, value in raw_values.items()}
def _open_session(session_factory: sessionmaker | Session | None):
if session_factory is None:
return Session(db.engine)
if isinstance(session_factory, Session):
return session_factory
return session_factory()
@shared_task(queue="mail")
def dispatch_human_input_email_task(form_id: str, node_title: str | None = None, session_factory=None):
if not mail.is_inited():
return
logger.info(click.style(f"Start human input email delivery for form {form_id}", fg="green"))
start_at = time.perf_counter()
try:
with _open_session(session_factory) as session:
jobs = _load_email_jobs(session, form_id)
for job in jobs:
for recipient in job.recipients:
substitutions = _build_substitutions(job=job, recipient=recipient, node_title=node_title)
subject = _render_subject(job.subject, substitutions)
body = _render_body(job.body, substitutions)
mail.send(
to=recipient.email,
subject=subject,
html=body,
)
end_at = time.perf_counter()
logger.info(
click.style(
f"Human input email delivery succeeded for form {form_id}: latency: {end_at - start_at}", fg="green"
)
)
except Exception:
logger.exception("Send human input email failed, form_id=%s", form_id)

View File

@ -5,42 +5,15 @@ from typing import Any
import click
from celery import shared_task
from flask import render_template_string
from jinja2.runtime import Context
from jinja2.sandbox import ImmutableSandboxedEnvironment
from configs import dify_config
from configs.feature import TemplateMode
from extensions.ext_mail import mail
from libs.email_template_renderer import render_email_template
from libs.email_i18n import get_email_i18n_service
logger = logging.getLogger(__name__)
class SandboxedEnvironment(ImmutableSandboxedEnvironment):
def __init__(self, timeout: int, *args: Any, **kwargs: Any):
self._timeout_time = time.time() + timeout
super().__init__(*args, **kwargs)
def call(self, context: Context, obj: Any, *args: Any, **kwargs: Any) -> Any:
if time.time() > self._timeout_time:
raise TimeoutError("Template rendering timeout")
return super().call(context, obj, *args, **kwargs)
def _render_template_with_strategy(body: str, substitutions: Mapping[str, str]) -> str:
mode = dify_config.MAIL_TEMPLATING_MODE
timeout = dify_config.MAIL_TEMPLATING_TIMEOUT
if mode == TemplateMode.UNSAFE:
return render_template_string(body, **substitutions)
if mode == TemplateMode.SANDBOX:
tmpl = SandboxedEnvironment(timeout=timeout).from_string(body)
return tmpl.render(substitutions)
if mode == TemplateMode.DISABLED:
return body
raise ValueError(f"Unsupported mail templating mode: {mode}")
@shared_task(queue="mail")
def send_inner_email_task(to: list[str], subject: str, body: str, substitutions: Mapping[str, str]):
if not mail.is_inited():
@ -50,7 +23,7 @@ def send_inner_email_task(to: list[str], subject: str, body: str, substitutions:
start_at = time.perf_counter()
try:
html_content = _render_template_with_strategy(body, substitutions)
html_content = render_email_template(body, substitutions)
email_service = get_email_i18n_service()
email_service.send_raw_email(to=to, subject=subject, html_content=html_content)

View File

@ -0,0 +1,115 @@
from datetime import UTC, datetime
import uuid
from unittest.mock import patch
import pytest
from configs import dify_config
from core.repositories.human_input_reposotiry import FormCreateParams, HumanInputFormRepositoryImpl
from core.workflow.nodes.human_input.entities import (
EmailDeliveryConfig,
EmailDeliveryMethod,
EmailRecipients,
ExternalRecipient,
HumanInputNodeData,
MemberRecipient,
)
from models.account import Account, AccountStatus, Tenant, TenantAccountJoin, TenantAccountRole
from models.human_input import HumanInputDelivery, HumanInputForm, HumanInputFormRecipient
from tasks.mail_human_input_delivery_task import dispatch_human_input_email_task
@pytest.fixture(autouse=True)
def cleanup_database(db_session_with_containers):
db_session_with_containers.query(HumanInputFormRecipient).delete()
db_session_with_containers.query(HumanInputDelivery).delete()
db_session_with_containers.query(HumanInputForm).delete()
db_session_with_containers.query(TenantAccountJoin).delete()
db_session_with_containers.query(Tenant).delete()
db_session_with_containers.query(Account).delete()
db_session_with_containers.commit()
def _create_workspace_member(db_session_with_containers):
account = Account(
email="owner@example.com",
name="Owner",
password="password",
interface_language="en-US",
status=AccountStatus.ACTIVE,
)
account.created_at = datetime.now(UTC)
account.updated_at = datetime.now(UTC)
db_session_with_containers.add(account)
db_session_with_containers.commit()
db_session_with_containers.refresh(account)
tenant = Tenant(name="Test Tenant")
tenant.created_at = datetime.now(UTC)
tenant.updated_at = datetime.now(UTC)
db_session_with_containers.add(tenant)
db_session_with_containers.commit()
db_session_with_containers.refresh(tenant)
tenant_join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=TenantAccountRole.OWNER,
)
tenant_join.created_at = datetime.now(UTC)
tenant_join.updated_at = datetime.now(UTC)
db_session_with_containers.add(tenant_join)
db_session_with_containers.commit()
return tenant, account
def _build_form(db_session_with_containers, tenant, account):
delivery_method = EmailDeliveryMethod(
config=EmailDeliveryConfig(
recipients=EmailRecipients(
whole_workspace=False,
items=[
MemberRecipient(user_id=account.id),
ExternalRecipient(email="external@example.com"),
],
),
subject="Action needed {{ node_title }}",
body="Token {{ form_token }} link {{ form_link }} content {{ form_content }}",
)
)
node_data = HumanInputNodeData(
title="Review",
form_content="Form content",
delivery_methods=[delivery_method],
)
engine = db_session_with_containers.get_bind()
repo = HumanInputFormRepositoryImpl(session_factory=engine, tenant_id=tenant.id)
params = FormCreateParams(
workflow_execution_id=str(uuid.uuid4()),
node_id="node-1",
form_config=node_data,
rendered_content="Rendered",
resolved_placeholder_values={},
)
return repo.create_form(params)
def test_dispatch_human_input_email_task_integration(monkeypatch: pytest.MonkeyPatch, db_session_with_containers):
tenant, account = _create_workspace_member(db_session_with_containers)
form_entity = _build_form(db_session_with_containers, tenant, account)
monkeypatch.setattr(dify_config, "CONSOLE_WEB_URL", "https://console.example.com")
with patch("tasks.mail_human_input_delivery_task.mail") as mock_mail:
mock_mail.is_inited.return_value = True
dispatch_human_input_email_task(form_id=form_entity.id, node_title="Approval")
assert mock_mail.send.call_count == 2
send_args = [call.kwargs for call in mock_mail.send.call_args_list]
recipients = {kwargs["to"] for kwargs in send_args}
assert recipients == {"owner@example.com", "external@example.com"}
assert all("console.example.com/api/form/human_input/" in kwargs["html"] for kwargs in send_args)

View File

@ -15,7 +15,7 @@ class TestMailInnerTask:
with (
patch("tasks.mail_inner_task.mail") as mock_mail,
patch("tasks.mail_inner_task.get_email_i18n_service") as mock_get_email_i18n_service,
patch("tasks.mail_inner_task._render_template_with_strategy") as mock_render_template,
patch("tasks.mail_inner_task.render_email_template") as mock_render_template,
):
# Setup mock mail service
mock_mail.is_inited.return_value = True

View File

@ -0,0 +1,59 @@
from unittest.mock import MagicMock
import pytest
from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner
from core.app.entities.queue_entities import QueueWorkflowPausedEvent
from core.workflow.entities.pause_reason import HumanInputRequired
from core.workflow.graph_events.graph import GraphRunPausedEvent
class _DummyQueueManager:
def __init__(self):
self.published = []
def publish(self, event, _from):
self.published.append(event)
class _DummyRuntimeState:
def get_paused_nodes(self):
return ["node-1"]
class _DummyGraphEngine:
def __init__(self):
self.graph_runtime_state = _DummyRuntimeState()
class _DummyWorkflowEntry:
def __init__(self):
self.graph_engine = _DummyGraphEngine()
def test_handle_pause_event_enqueues_email_task(monkeypatch: pytest.MonkeyPatch):
queue_manager = _DummyQueueManager()
runner = WorkflowBasedAppRunner(queue_manager=queue_manager, app_id="app-id")
workflow_entry = _DummyWorkflowEntry()
reason = HumanInputRequired(
form_id="form-123",
form_content="content",
inputs=[],
actions=[],
node_id="node-1",
node_title="Review",
)
event = GraphRunPausedEvent(reasons=[reason], outputs={})
email_task = MagicMock()
monkeypatch.setattr("core.app.apps.workflow_app_runner.dispatch_human_input_email_task", email_task)
runner._handle_event(workflow_entry, event)
email_task.apply_async.assert_called_once()
kwargs = email_task.apply_async.call_args.kwargs["kwargs"]
assert kwargs["form_id"] == "form-123"
assert kwargs["node_title"] == "Review"
assert any(isinstance(evt, QueueWorkflowPausedEvent) for evt in queue_manager.published)

View File

@ -0,0 +1,27 @@
import pytest
from core.workflow.nodes.human_input.entities import EmailDeliveryConfig, EmailRecipients
def test_replace_url_placeholder_with_value():
config = EmailDeliveryConfig(
recipients=EmailRecipients(),
subject="Subject",
body="Click here {{#url#}} to open.",
)
result = config.body_with_url("https://example.com/link")
assert result == "Click here https://example.com/link to open."
def test_replace_url_placeholder_missing_value():
config = EmailDeliveryConfig(
recipients=EmailRecipients(),
subject="Subject",
body="No link {{#url#}} available.",
)
result = config.body_with_url(None)
assert result == "No link available."

View File

@ -0,0 +1,65 @@
import types
from collections.abc import Sequence
import pytest
from tasks import mail_human_input_delivery_task as task_module
class _DummyMail:
def __init__(self):
self.sent: list[dict[str, str]] = []
self._inited = True
def is_inited(self) -> bool:
return self._inited
def send(self, *, to: str, subject: str, html: str):
self.sent.append({"to": to, "subject": subject, "html": html})
class _DummySession:
def __enter__(self):
return None
def __exit__(self, exc_type, exc_val, exc_tb):
return False
def _build_job(recipient_count: int = 1) -> task_module._EmailDeliveryJob:
recipients: list[task_module._EmailRecipient] = []
for idx in range(recipient_count):
recipients.append(task_module._EmailRecipient(email=f"user{idx}@example.com", token=f"token-{idx}"))
return task_module._EmailDeliveryJob(
form_id="form-1",
workflow_run_id="run-1",
subject="Subject for {{ form_token }}",
body="Body for {{ form_link }}",
form_content="content",
recipients=recipients,
)
def test_dispatch_human_input_email_task_sends_to_each_recipient(monkeypatch: pytest.MonkeyPatch):
mail = _DummyMail()
def fake_render(template: str, substitutions: dict[str, str]) -> str:
return template.replace("{{ form_token }}", substitutions["form_token"]).replace(
"{{ form_link }}", substitutions["form_link"]
)
monkeypatch.setattr(task_module, "mail", mail)
monkeypatch.setattr(task_module, "render_email_template", fake_render)
jobs: Sequence[task_module._EmailDeliveryJob] = [_build_job(recipient_count=2)]
monkeypatch.setattr(task_module, "_load_email_jobs", lambda _session, _form_id: jobs)
task_module.dispatch_human_input_email_task(
form_id="form-1",
node_title="Approve",
session_factory=lambda: _DummySession(),
)
assert len(mail.sent) == 2
assert all(payload["subject"].startswith("Subject for token-") for payload in mail.sent)
assert all("Body for" in payload["html"] for payload in mail.sent)