Files
zydi-paper/paper.py
T
2025-01-19 06:58:40 +00:00

956 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from typing import Dict, List, Optional
from datetime import datetime, timezone
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from bson import ObjectId
from pydantic import BaseModel, Field
from redis import asyncio as aioredis
from typing import Any
from contextlib import asynccontextmanager
import json
import os
import PyPDF2
import aiohttp
from concurrent.futures import ThreadPoolExecutor
from openai import OpenAI
import hashlib
# Database Configuration
MONGODB_URL = "mongodb://paper:SYX7cdJNMRbiytra@222.186.10.253:27017/paper"
REDIS_URL = "redis://:Obscura@2024@222.186.10.253:6379"
# Database connection
class Database:
"""数据库连接管理类"""
client: AsyncIOMotorClient = None
db = Database()
async def get_database() -> AsyncIOMotorClient:
return db.client["paper"]
async def connect_to_mongo():
try:
db.client = AsyncIOMotorClient(MONGODB_URL)
# 验证连接
await db.client.admin.command('ping')
print("Successfully connected to MongoDB")
except Exception as e:
print(f"Could not connect to MongoDB: {e}")
raise
async def close_mongo_connection():
db.client.close()
# Redis setup
async def get_redis():
redis = aioredis.from_url(
REDIS_URL,
encoding="utf-8",
decode_responses=True,
)
return redis
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await connect_to_mongo()
yield
# Shutdown
await close_mongo_connection()
# 更新 FastAPI 实例化
app = FastAPI(lifespan=lifespan)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"]
)
# 在其他 Pydantic 模型后添加
class paperModel(BaseModel):
"""文献引用模型"""
paper_link: str
paper_title: str
upload_time: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class Config:
populate_by_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
# 删除文献
@app.delete("/paper/delete/{paper_id}")
async def delete_paper(
paper_id: str
):
"""删除文献"""
db = await get_database()
try:
# 获取文献信息
paper = await db.papers.find_one({"file_hash": paper_id})
if not paper:
raise HTTPException(status_code=404, detail="paper not found")
# 从文件存储服务删除文件
file_hash = paper.get("file_hash")
# 删除数据库记录
await db.papers.delete_one({"file_hash": paper_id})
# 删除Redis中的分析报告(如果存在)
try:
redis = await get_redis()
await redis.select(190)
report_key = f"paper_report:{file_hash}"
await redis.delete(report_key)
# 删除聊天历史(如果存在)
await redis.select(191)
chat_history_key = f"chat_history:{paper_id}"
await redis.delete(chat_history_key)
except Exception as redis_error:
print(f"Warning: Failed to delete Redis keys: {str(redis_error)}")
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
return {"message": "paper successfully deleted"}
except Exception as e:
print(f"Error deleting paper: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to delete paper: {str(e)}")
@app.get("/paper/papers")
async def get_papers():
"""获取文献列表"""
db = await get_database()
try:
papers = []
async for ref in db.papers.find():
papers.append({
"_id": str(ref["_id"]),
"paper_link": ref["paper_link"],
"paper_title": ref["paper_title"],
"upload_time": ref["upload_time"],
"file_hash": ref.get("file_hash")
})
return papers
except Exception as e:
print(f"Error getting papers: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/paper/report/{file_hash}")
async def get_report(
file_hash: str
):
"""从 Redis db190 直接通过文件哈希值读取已保存的文献报告"""
redis = await get_redis()
try:
# 选择 db190
await redis.select(190)
report_key = f"paper_report:{file_hash}"
# 获取已保存的报告
existing_report = await redis.get(report_key)
if not existing_report:
raise HTTPException(status_code=404, detail="No saved paper report found")
# 返回报告
return json.loads(existing_report)
except Exception as e:
print(f"Error getting paper report: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
class FileUpload(BaseModel):
filename: str
hash: str
class BatchUploadRequest(BaseModel):
files: List[FileUpload]
@app.post("/paper/upload")
async def batch_upload(request: BatchUploadRequest):
"""批量上传项目相关文献"""
db = await get_database()
redis = await get_redis()
try:
uploaded_papers = []
papers_to_analyze = []
for file in request.files:
try:
# 1. 创建新记录
paper = {
"paper_link": f"https://files.aiot.ml/pdf/{file.hash}",
"paper_title": file.filename,
"upload_time": datetime.now(timezone.utc),
"file_hash": file.hash
}
result = await db.papers.insert_one(paper)
paper_info = {
"paper_id": str(result.inserted_id),
"file_hash": file.hash,
"paper_title": paper["paper_title"]
}
uploaded_papers.append(paper_info)
# 2. 设置分析状态
await redis.select(190)
report_key = f"paper_report:{file.hash}"
initial_status = {
"status": "processing",
"message": "Analysis in progress"
}
await redis.set(report_key, json.dumps(initial_status))
papers_to_analyze.append(paper_info)
except Exception as file_error:
print(f"处理文件 {file.filename} 时出错: {str(file_error)}")
uploaded_papers.append({
"file_hash": file.hash,
"paper_title": file.filename,
"status": "error",
"error_message": str(file_error)
})
continue
# 启动分析任务
if papers_to_analyze:
asyncio.create_task(batch_analysis(papers_to_analyze))
print(f"[Redis] 开始分析新文件: {papers_to_analyze}")
return {
"message": f"Successfully processed {len(uploaded_papers)} files",
"uploaded_files": uploaded_papers,
"files_to_analyze": len(papers_to_analyze)
}
except Exception as e:
print(f"批量上传错误: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
async def batch_analysis(papers: List[dict]):
"""批量处理文献分析的后台任务"""
redis = await get_redis()
# 限制并发数量
semaphore = asyncio.Semaphore(3)
async def single_paper(ref: dict):
async with semaphore:
try:
paper_id = ref["paper_id"]
file_hash = ref["file_hash"]
# 再次检查报告是否存在(以防在开始分析前已经被其他进程分析)
await redis.select(190)
report_key = f"paper_report:{file_hash}"
existing_report = await redis.get(report_key)
if existing_report:
report_data = json.loads(existing_report)
if report_data.get("status") == "completed":
print(f"Report already exists for file hash: {file_hash}, skipping analysis")
return
# 从文件存储服务获取PDF内容
async with aiohttp.ClientSession() as session:
async with session.get(f'https://files.aiot.ml/pdf/{file_hash}') as response:
if response.status != 200:
print(f"[Redis] 获取PDF内容失败: {response.status}")
raise Exception(f"Failed to get PDF content for file hash: {file_hash}")
pdf_content = await response.json()
if not pdf_content.get('content'):
raise Exception("No PDF content returned")
print(f"\n开始处理文献 {ref.get('paper_title', '未知标题')}")
print(f"文献ID: {paper_id}")
print(f"文件哈希: {file_hash}")
# 使用获取到的PDF内容继续处理
content = pdf_content['content']
# 如果content是列表,将其合并为单个字符串
if isinstance(content, list):
content = '\n'.join(content)
# 打印字符数
content_length = len(content)
print(f"\n=== 步骤2: 内容长度检查 ===")
print(f"PDF内容总字符数: {content_length}")
# 根据内容长度选择不同的处理方式
if content_length <= 200000:
print(f"\n=== 步骤3A: 使用直接分析方式 ===")
print(f"文档长度在处理范围内 ({content_length} <= 200000)")
document_analysis = await analyze_paper(content[:180000])
if not document_analysis:
raise Exception("Failed to analyze document")
print("文档分析完成")
else:
print(f"\n=== 步骤3B: 使用分段分析方式 ===")
print(f"文档超过200000字符 ({content_length} > 200000)")
analysis_results = await analyze_long_file(content)
if not analysis_results:
raise Exception("Failed to analyze document in segments")
print(f"分段分析完成,共分析了 {len(analysis_results)} 个段落")
document_analysis = await merge_results(analysis_results)
if not document_analysis:
raise Exception("Failed to merge analysis results")
# 等待一小段时间避免API限制
await asyncio.sleep(1)
# 异步分析文献价值
value_evaluation = await paper_value(document_analysis)
if not value_evaluation:
raise Exception("Failed to evaluate value")
print("文献价值分析完成")
# 合并结果
print("\n=== 步骤5: 保存最终结果 ===")
analysis_result = {
**document_analysis,
**value_evaluation,
"status": "completed"
}
# 保存结果
await redis.select(190)
await redis.set(report_key, json.dumps(analysis_result))
except Exception as e:
try:
await redis.select(190)
report_key = f"paper_report:{file_hash}"
error_status = {
"status": "failed",
"message": str(e)
}
await redis.set(report_key, json.dumps(error_status))
except Exception as redis_error:
print(f"Error updating Redis status: {redis_error}")
try:
# 并发处理所有引用
await asyncio.gather(
*(single_paper(ref) for ref in papers)
)
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
# 在全局范围创建线程池
pdf_thread_pool = ThreadPoolExecutor(max_workers=3) # 限制并发PDF处理数量
# DeepSeek API Configuration
client = OpenAI(
base_url="https://api.deepseek.com/v1",
api_key="sk-3027fb3c810b4e17985fa397d41250b9"
)
# 创建异步HTTP客户端会话
async def get_aiohttp_session():
return aiohttp.ClientSession(
base_url="https://api.deepseek.com/v1/", # 添加了末尾的斜杠
headers={"Authorization": f"Bearer sk-3027fb3c810b4e17985fa397d41250b9"}
)
async def read_pdf(file_path: str) -> str:
"""在线程池中异步读取PDF"""
def read_pdf():
try:
pdf_content = ""
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
content = page.extract_text()
pdf_content += content
# 根据内容长度返回不同的结果
content_length = len(pdf_content)
print(f"\nPDF原始内容字符数: {content_length}")
if content_length <= 180000:
print("文档长度 ≤ 180000,返回完整内容")
return pdf_content
elif content_length <= 200000:
print("文档长度在 180000-200000 之间,截取前 180000 个字符")
return pdf_content[:180000]
else:
print("文档长度 > 200000,返回完整内容供分段处理")
return pdf_content # 返回完整内容,由调用者处理分段
except Exception as e:
print(f"Error reading PDF: {e}")
return ""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(pdf_thread_pool, read_pdf)
async def call_deepseek(messages: list) -> dict:
"""异步调用DeepSeek API"""
async with await get_aiohttp_session() as session:
async with session.post("/chat/completions", json={
"model": "deepseek-chat",
"messages": messages,
"response_format": {"type": "json_object"}
}) as response:
if response.status == 200:
data = await response.json()
return json.loads(data["choices"][0]["message"]["content"])
else:
raise Exception(f"API调用失败: {await response.text()}")
async def analyze_long_file(content: str) -> List[dict]:
"""分段分析长文档"""
# 将内容分成多个段落,每段约60000个字符(估算后约50000 tokens
segments = []
content_length = len(content)
segment_size = 60000 # 减小段落大小
print(f"\n开始分段处理,总字符数: {content_length}")
print(f"每段大小: {segment_size} 字符")
for i in range(0, content_length, segment_size):
segment = content[i:i + segment_size]
segments.append(segment)
print(f"文档已分段,共 {len(segments)} 个段落")
# 对每个段落进行分析
analysis_results = []
for i, segment in enumerate(segments):
print(f"\n开始分析第 {i+1}/{len(segments)} 个段落...")
system_prompt = f"""
You are an AI assistant tasked with analyzing part {i+1} of {len(segments)} of an academic paper.
Generate a comprehensive analysis in JSON format covering both basic information and content analysis.
Note that this is part {i+1} of a longer document, so focus on the content provided.
The JSON structure must strictly follow the provided template.
Also generate a Mermaid flowchart code to visualize the research methodology if this segment contains methodology information.
Create a detailed and comprehensive flowchart that accurately represents the paper's research methodology.
"""
user_prompt = f"""Analyze the following paper segment and extract all relevant information:
Content: {segment}
Generate a JSON response with the following structure:
{{
"1. Basic Information": {{
"author": "[Author name(s) and affiliations]",
"publication_date": "[Publication date in YYYY-MM format]",
"title": "[Full title of the document]",
"journal_publisher": "[Journal name or publisher details]",
"document_type": "[Type: journal article/book/conference paper etc.]"
}},
"2. Content Analysis": {{
"abstract": "[Paper abstract]",
"research_purpose": "[Main objectives and research questions]",
"methodology": "[Research methods, data collection and analysis approaches]",
"main_arguments": "[Key theoretical frameworks and arguments]",
"conclusions": "[Complete findings and conclusions]",
"innovations": "[Novel contributions and original aspects]"
}},
"flowchart": "[If this segment contains methodology information, generate a Mermaid flowchart code that visualizes the research methodology. Follow these rules:
1. Use 'graph TD' for top-down flow
2. Each node should be in format: id[text] where:
- id is a unique identifier (A, B1, B2, etc.)
- text should be simple and clear, using only letters, numbers, and spaces
- DO NOT use any special characters including parentheses, colons, commas
- abbreviations should be written without parentheses, e.g., 'DNN' not '(DNN)'
- use space instead of special characters, e.g., 'Deep Learning Model' not 'Deep-Learning/Model'
3. Connections use '-->' between nodes
4. Ensure each line ends with a proper node paper
Create a detailed flowchart that shows:
- Research objectives and questions
- All major research methods used
- Data collection and analysis processes
- Key experimental or analytical steps
- Result synthesis and conclusion formation
Make the flowchart as detailed as possible while maintaining clarity.
If this segment does not contain methodology information, set this field to null.]"
}}
"""
try:
result = await call_deepseek([
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
])
if result:
print(f"{i+1} 个段落分析完成")
analysis_results.append(result)
# 等待一小段时间避免API限制
await asyncio.sleep(1)
else:
print(f"{i+1} 个段落分析失败")
except Exception as e:
print(f"分析段落时出错: {str(e)}")
continue
return analysis_results
async def merge_results(results: List[dict]) -> dict:
"""合并多个分析结果"""
if not results:
return {}
print(f"开始合并 {len(results)} 个分析结果...")
system_prompt = """
You are an AI assistant tasked with merging multiple analysis results of different parts of the same academic paper.
Generate a comprehensive merged analysis in JSON format.
The JSON structure must strictly follow the provided template.
Ensure the merged result is coherent and eliminates redundancy.
For the flowchart, select the most comprehensive one from the input results, or combine multiple flowcharts if they contain complementary information.
When combining flowcharts, ensure the result follows Mermaid syntax rules and avoids special characters.
Create a detailed and comprehensive flowchart that accurately represents the paper's complete research methodology.
"""
user_prompt = f"""Merge the following analysis results into a single coherent analysis:
Analysis Results: {json.dumps(results, ensure_ascii=False)}
Generate a JSON response with the following structure:
{{
"1. Basic Information": {{
"author": "[Merged author information]",
"publication_date": "[Publication date]",
"title": "[Complete title]",
"journal_publisher": "[Journal/publisher information]",
"document_type": "[Document type]"
}},
"2. Content Analysis": {{
"abstract": "[Complete abstract]",
"research_purpose": "[Comprehensive research objectives]",
"methodology": "[Complete methodology description]",
"main_arguments": "[Comprehensive theoretical frameworks and arguments]",
"conclusions": "[Complete findings and conclusions]",
"innovations": "[Complete list of innovations]"
}},
"flowchart": "[Select or combine the flowcharts following these rules:
1. Use 'graph TD' for top-down flow
2. Each node should be in format: id[text] where:
- id is a unique identifier (A, B1, B2, etc.)
- text should be simple and clear, using only letters, numbers, and spaces
- DO NOT use any special characters including parentheses, colons, commas
- abbreviations should be written without parentheses, e.g., 'DNN' not '(DNN)'
- use space instead of special characters, e.g., 'Deep Learning Model' not 'Deep-Learning/Model'
3. Connections use '-->' between nodes
4. Ensure each line ends with a proper node paper
Create a detailed flowchart that shows:
- Research objectives and questions
- All major research methods used
- Data collection and analysis processes
- Key experimental or analytical steps
- Result synthesis and conclusion formation
Make the flowchart as detailed as possible while maintaining clarity.
If no valid flowchart is found in any segment, set this field to null.]"
}}
"""
result = await call_deepseek([
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
])
if result:
print("分析结果合并完成")
else:
print("分析结果合并失败")
return result
async def analyze_paper(content: str):
"""分析文献的基本信息和内容"""
system_prompt = """
You are an AI assistant tasked with analyzing academic paper.
Generate a comprehensive analysis in JSON format covering both basic information and content analysis.
The JSON structure must strictly follow the provided template.
Also generate a Mermaid flowchart code to visualize the research methodology.
Ensure the flowchart follows strict formatting rules to avoid parsing errors.
Create a detailed and comprehensive flowchart that accurately represents the paper's research methodology.
"""
user_prompt = f"""Analyze the following paper and extract all relevant information:
Content: {content}
Generate a JSON response with the following structure:
{{
"1. Basic Information": {{
"author": "[Author name(s) and affiliations]",
"publication_date": "[Publication date in YYYY-MM format]",
"title": "[Full title of the document]",
"journal_publisher": "[Journal name or publisher details]",
"document_type": "[Type: journal article/book/conference paper etc.]"
}},
"2. Content Analysis": {{
"abstract": "[Paper abstract]",
"research_purpose": "[Main objectives and research questions]",
"methodology": "[Research methods, data collection and analysis approaches]",
"main_arguments": "[Key theoretical frameworks and arguments]",
"conclusions": "[Primary findings and conclusions]",
"innovations": "[Novel contributions and original aspects]"
}},
"flowchart": "[Generate a Mermaid flowchart code that visualizes the research methodology. Follow these rules:
1. Use 'graph TD' for top-down flow
2. Each node should be in format: id[text] where:
- id is a unique identifier (A, B1, B2, etc.)
- text should be simple and clear, using only letters, numbers, and spaces
- DO NOT use any special characters including parentheses, colons, commas
- abbreviations should be written without parentheses, e.g., 'DNN' not '(DNN)'
- use space instead of special characters, e.g., 'Deep Learning Model' not 'Deep-Learning/Model'
3. Connections use '-->' between nodes
4. Ensure each line ends with a proper node paper
Create a detailed flowchart that shows:
- Research objectives and questions
- All major research methods used
- Data collection and analysis processes
- Key experimental or analytical steps
- Result synthesis and conclusion formation
Make the flowchart as detailed as possible while maintaining clarity.
If the paper does not contain clear methodology information, set this field to null.]"
}}
"""
return await call_deepseek([
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
])
async def paper_value(content_analysis: dict):
"""基于内容分析结果评估文献的价值"""
system_prompt = """
You are an AI assistant tasked with evaluating the value of academic paper based on its content analysis.
Generate a comprehensive value evaluation in JSON format.
The JSON structure must strictly follow the provided template.
"""
user_prompt = f"""Based on the following content analysis, evaluate the paper's value:
Content Analysis: {json.dumps(content_analysis, ensure_ascii=False)}
Generate a JSON response with the following structure:
{{
"3. Value Evaluation": {{
"academic_contribution": "[Significance to the field of study]",
"practical_significance": "[Real-world applications and implications]",
"limitations": "[Research constraints and weaknesses]",
"implications": "[Suggestions for future research and practice]"
}}
}}
"""
return await call_deepseek([
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
])
# 添加问答相关的模型
class QuestionModel(BaseModel):
"""问题模型"""
question: str
class TaskStatus:
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@app.post("/paper/{file_hash}/qa")
async def ask_reference_question(
file_hash: str,
question: QuestionModel
):
"""Ask questions about the paper (async)"""
db = await get_database()
redis = await get_redis()
try:
# Verify paper exists
paper = await db.papers.find_one({"file_hash": file_hash})
if not paper:
raise HTTPException(status_code=404, detail="Paper not found")
# Generate task ID
task_id = str(ObjectId())
# Create background task
asyncio.create_task(paper_question(
task_id=task_id,
file_hash=file_hash,
question=question.question,
paper=paper
))
return {
"task_id": task_id,
"status": TaskStatus.PENDING,
"message": "Question submitted, processing in progress"
}
except Exception as e:
print(f"Error in ask_reference_question: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
async def paper_question(task_id: str, file_hash: str, question: str, paper: dict):
"""Process paper Q&A background task"""
redis = await get_redis()
try:
# Update task status to processing
await redis.select(192) # Use db192 to store task status
await redis.hset(
f"task:{task_id}",
mapping={
"status": TaskStatus.PROCESSING,
"file_hash": file_hash,
"question": question
}
)
# Get analysis report from Redis
await redis.select(190)
report_key = f"paper_report:{file_hash}"
report_data = await redis.get(report_key)
if not report_data:
raise Exception("Paper analysis report does not exist, please analyze first")
# Get PDF content from file storage service
async with aiohttp.ClientSession() as session:
async with session.get(f'https://files.aiot.ml/pdf/{file_hash}') as response:
if response.status != 200:
raise Exception(f"Failed to get PDF content: HTTP {response.status}")
pdf_content = await response.json()
if not pdf_content.get('content'):
raise Exception("No PDF content returned")
content = pdf_content['content']
# If content is a list, join it into a single string
if isinstance(content, list):
content = '\n'.join(content)
# Limit content length
MAX_CHARS = 180000
content = content[:MAX_CHARS]
# Build system prompt and user prompt
system_prompt = """
You are a professional academic assistant responsible for answering questions about academic papers.
You should provide accurate and professional answers based on the paper content and analysis report.
Answers should be concise and clear, citing specific content from the paper whenever possible.
"""
# Build context
context = {
"Paper Content": content,
"Analysis Report": json.loads(report_data)
}
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Answer the question based on the following paper content and analysis report:\n\nPaper Information:{json.dumps(context, ensure_ascii=False)}\n\nQuestion:{question}"}
]
# Call DeepSeek API for answer
response = client.chat.completions.create(
model="deepseek-chat",
messages=messages
)
answer = response.choices[0].message.content
# Save conversation history to Redis db191
await redis.select(191)
chat_history_key = f"chat_history:{file_hash}"
# Get existing history
existing_history = await redis.get(chat_history_key)
history = json.loads(existing_history) if existing_history else []
# Add new conversation
history.append({
"question": question,
"answer": answer,
"timestamp": datetime.now(timezone.utc).isoformat()
})
# Save updated history
await redis.set(chat_history_key, json.dumps(history))
# Update task status to completed
await redis.select(192)
await redis.hset(
f"task:{task_id}",
mapping={
"status": TaskStatus.COMPLETED,
"answer": answer,
"paper_title": paper.get("paper_title")
}
)
except Exception as e:
print(f"Error processing question: {e}")
# Update task status to failed
await redis.select(192)
await redis.hset(
f"task:{task_id}",
mapping={
"status": TaskStatus.FAILED,
"error": str(e)
}
)
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
@app.get("/paper/{file_hash}/qa/history")
async def get_paper_qa_history(
file_hash: str
):
"""Get paper Q&A history"""
redis = await get_redis()
try:
# Get conversation history from Redis db191
await redis.select(191)
chat_history_key = f"chat_history:{file_hash}"
history_data = await redis.get(chat_history_key)
if not history_data:
return []
return json.loads(history_data)
except Exception as e:
print(f"Error getting QA history: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
@app.get("/paper/task/{task_id}")
async def get_task_status(task_id: str):
"""Get task status and result"""
redis = await get_redis()
try:
await redis.select(192)
task_data = await redis.hgetall(f"task:{task_id}")
if not task_data:
raise HTTPException(status_code=404, detail="Task not found")
response = {
"task_id": task_id,
"status": task_data.get("status", TaskStatus.PENDING)
}
# If task completed, add results
if task_data.get("status") == TaskStatus.COMPLETED:
response.update({
"answer": task_data.get("answer"),
"paper_title": task_data.get("paper_title")
})
# If task failed, add error information
elif task_data.get("status") == TaskStatus.FAILED:
response.update({
"error": task_data.get("error")
})
return response
except Exception as e:
print(f"Error getting task status: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
# 添加新的路由,通过哈希值获取报告
@app.get("/paper/check/{file_hash}")
async def check_report(
file_hash: str
):
"""直接通过文件哈希值从 Redis db190 读取已保存的文献报告"""
redis = await get_redis()
try:
# 选择 db190
await redis.select(190)
report_key = f"paper_report:{file_hash}"
# 获取已保存的报告
existing_report = await redis.get(report_key)
if not existing_report:
return {"status": "not_found"}
# 返回报告
return json.loads(existing_report)
except Exception as e:
print(f"Error getting paper report by hash: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=9005)