feat(api): implement web app api properly

This commit is contained in:
QuantumGhost 2026-01-08 10:07:16 +08:00
parent 2a6b6a873e
commit 5d4f06fa67
2 changed files with 61 additions and 21 deletions

View File

@ -23,6 +23,7 @@ from . import (
feature,
files,
forgot_password,
human_input_form,
login,
message,
passport,
@ -45,6 +46,7 @@ __all__ = [
"feature",
"files",
"forgot_password",
"human_input_form",
"login",
"message",
"passport",

View File

@ -2,21 +2,29 @@
Web App Workflow Resume APIs.
"""
import logging
import json
from collections.abc import Generator
from flask import Response
from sqlalchemy.orm import sessionmaker
from controllers.web import api
from controllers.web.error import InvalidArgumentError, NotFoundError
from controllers.web.wraps import WebApiResource
logger = logging.getLogger(__name__)
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.apps.message_generator import MessageGenerator
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from extensions.ext_database import db
from models.enums import CreatorUserRole
from models.model import App, AppMode, EndUser
from repositories.factory import DifyAPIRepositoryFactory
class WorkflowEventsApi(WebApiResource):
"""API for getting workflow execution events after resume."""
def get(self, task_id: str):
def get(self, app_model: App, end_user: EndUser, task_id: str):
"""
Get workflow execution events stream after resume.
@ -24,29 +32,59 @@ class WorkflowEventsApi(WebApiResource):
Returns Server-Sent Events stream.
"""
workflow_run_id = task_id
session_maker = sessionmaker(db.engine)
repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
workflow_run = repo.get_workflow_run_by_id_and_tenant_id(
tenant_id=app_model.tenant_id,
run_id=workflow_run_id,
)
def generate_events() -> Generator[str, None, None]:
"""Generate SSE events for workflow execution."""
try:
# TODO: Implement actual event streaming
# This would connect to the workflow execution engine
# and stream real-time events
if workflow_run is None:
raise NotFoundError(f"WorkflowRun not found, id={workflow_run_id}")
# For demo purposes, send a basic event
yield f"data: {{'event': 'workflow_resumed', 'task_id': '{task_id}'}}\n\n"
if workflow_run.app_id != app_model.id:
raise NotFoundError(f"WorkflowRun not found, id={workflow_run_id}")
# In real implementation, this would:
# 1. Connect to workflow execution engine
# 2. Stream real-time execution events
# 3. Handle client disconnection
# 4. Clean up resources on completion
if workflow_run.created_by_role != CreatorUserRole.END_USER:
raise NotFoundError(f"WorkflowRun not created by end user, id={workflow_run_id}")
except Exception as e:
logger.exception("Error streaming events for task %s", task_id)
yield f"data: {{'error': 'Stream error: {str(e)}'}}\n\n"
if workflow_run.created_by != end_user.id:
raise NotFoundError(f"WorkflowRun not created by the current end user, id={workflow_run_id}")
if workflow_run.finished_at is not None:
response = WorkflowResponseConverter.workflow_run_result_to_finish_response(
task_id=workflow_run.id,
workflow_run=workflow_run,
creator_user=end_user,
)
payload = response.model_dump(mode="json")
payload["event"] = response.event.value
def _generate_finished_events() -> Generator[str, None, None]:
yield f"data: {json.dumps(payload)}\n\n"
event_generator = _generate_finished_events
else:
app_mode = AppMode.value_of(app_model.mode)
msg_generator = MessageGenerator()
if app_mode == AppMode.ADVANCED_CHAT:
generator = AdvancedChatAppGenerator()
elif app_mode == AppMode.WORKFLOW:
generator = WorkflowAppGenerator()
else:
raise InvalidArgumentError(f"cannot subscribe to workflow run, workflow_run_id={workflow_run.id}")
def _generate_stream_events():
return generator.convert_to_event_stream(
msg_generator.retrieve_events(app_mode, workflow_run.id),
)
event_generator = _generate_stream_events
return Response(
generate_events(),
event_generator(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",