dify/api/core/plugin/impl/exc.py
Asuka Minato a78339a040
Some checks are pending
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions
Main CI Pipeline / Check Changed Files (push) Waiting to run
Main CI Pipeline / API Tests (push) Blocked by required conditions
Main CI Pipeline / Web Tests (push) Blocked by required conditions
Main CI Pipeline / Style Check (push) Waiting to run
Main CI Pipeline / VDB Tests (push) Blocked by required conditions
Main CI Pipeline / DB Migration Test (push) Blocked by required conditions
remove bare list, dict, Sequence, None, Any (#25058)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>
2025-09-06 03:32:23 +08:00

72 lines
1.9 KiB
Python

from collections.abc import Mapping
from pydantic import TypeAdapter
from extensions.ext_logging import get_request_id
class PluginDaemonError(Exception):
"""Base class for all plugin daemon errors."""
def __init__(self, description: str):
self.description = description
def __str__(self) -> str:
# returns the class name and description
return f"req_id: {get_request_id()} {self.__class__.__name__}: {self.description}"
class PluginDaemonInternalError(PluginDaemonError):
pass
class PluginDaemonClientSideError(PluginDaemonError):
pass
class PluginDaemonInternalServerError(PluginDaemonInternalError):
description: str = "Internal Server Error"
class PluginDaemonUnauthorizedError(PluginDaemonInternalError):
description: str = "Unauthorized"
class PluginDaemonNotFoundError(PluginDaemonInternalError):
description: str = "Not Found"
class PluginDaemonBadRequestError(PluginDaemonClientSideError):
description: str = "Bad Request"
class PluginInvokeError(PluginDaemonClientSideError):
description: str = "Invoke Error"
def _get_error_object(self) -> Mapping:
try:
return TypeAdapter(Mapping).validate_json(self.description)
except Exception:
return {}
def get_error_type(self) -> str:
return self._get_error_object().get("error_type", "unknown")
def get_error_message(self) -> str:
try:
return self._get_error_object().get("message", "unknown")
except Exception:
return self.description
class PluginUniqueIdentifierError(PluginDaemonClientSideError):
description: str = "Unique Identifier Error"
class PluginNotFoundError(PluginDaemonClientSideError):
description: str = "Plugin Not Found"
class PluginPermissionDeniedError(PluginDaemonClientSideError):
description: str = "Permission Denied"