Initial commit

This commit is contained in:
2025-01-16 05:14:29 +00:00
commit a5357af694
2 changed files with 1871 additions and 0 deletions
+1131
View File
File diff suppressed because it is too large Load Diff
+740
View File
@@ -0,0 +1,740 @@
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Optional, Dict, Any
from datetime import datetime, timezone
import os
import json
import PyPDF2
import asyncio
from concurrent.futures import ThreadPoolExecutor
import aiohttp
from pydantic import BaseModel
from redis import asyncio as aioredis
# 配置
UPLOAD_PATH = "/obscura/task/paper"
DEEPSEEK_API_KEY = "sk-3027fb3c810b4e17985fa397d41250b9"
REDIS_URL = "redis://:Obscura@2024@222.186.10.253:6379"
# Redis数据库编号
ANALYSIS_DB = 190 # 文献分析结果
QA_HISTORY_DB = 191 # 问答记录
TASK_STATUS_DB = 192 # 任务状态
REFERENCE_DB = 193 # 文献信息
app = FastAPI()
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"]
)
# 创建线程池
pdf_thread_pool = ThreadPoolExecutor(max_workers=3)
# Redis连接
async def get_redis():
redis = aioredis.from_url(
REDIS_URL,
encoding="utf-8",
decode_responses=True,
)
return redis
# 添加问答相关的模型
class QuestionModel(BaseModel):
"""问题模型"""
question: str
class TaskStatus:
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@app.post("/upload")
async def upload_references(
files: List[UploadFile] = File(...)
):
"""批量上传文献"""
redis = await get_redis()
try:
uploaded_references = []
# 批量上传文件
for file in files:
# 验证文件类型
allowed_types = ["application/pdf", "application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"]
if file.content_type not in allowed_types:
continue # 跳过不支持的文件类型
# 确保上传目录存在
os.makedirs(UPLOAD_PATH, exist_ok=True)
# 生成安全的文件名
file_extension = os.path.splitext(file.filename)[1]
safe_filename = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{os.urandom(4).hex()}{file_extension}"
file_path = os.path.join(UPLOAD_PATH, safe_filename)
# 保存文件
with open(file_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
# 生成引用ID
reference_id = f"{int(datetime.now().timestamp())}_{os.urandom(4).hex()}"
# 创建引用记录
await redis.select(REFERENCE_DB)
reference = {
"reference_link": file_path,
"reference_title": file.filename,
"upload_time": datetime.now(timezone.utc).isoformat()
}
await redis.hmset(f"reference:{reference_id}", reference)
reference_info = {
"reference_id": reference_id,
"file_path": file_path,
"reference_title": reference["reference_title"]
}
uploaded_references.append(reference_info)
# 为每个文献创建初始状态
await redis.select(ANALYSIS_DB)
report_key = f"reference_report:{reference_id}"
initial_status = {
"status": "processing",
"message": "Analysis in progress"
}
await redis.set(report_key, json.dumps(initial_status))
# 在后台启动分析任务
if uploaded_references:
asyncio.create_task(process_batch_analysis(uploaded_references))
return {
"message": f"Successfully uploaded {len(uploaded_references)} files",
"uploaded_files": uploaded_references
}
except Exception as e:
print(f"Upload error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
@app.post("/{reference_id}/qa")
async def ask_reference_question(
reference_id: str,
question: QuestionModel
):
"""向文献提问(异步)"""
redis = await get_redis()
try:
# 验证文献是否存在
await redis.select(REFERENCE_DB)
reference = await redis.hgetall(f"reference:{reference_id}")
if not reference:
raise HTTPException(status_code=404, detail="文献不存在")
# 生成任务ID
task_id = f"{int(datetime.now().timestamp())}_{os.urandom(4).hex()}"
# 创建后台任务
asyncio.create_task(process_reference_question(
task_id=task_id,
reference_id=reference_id,
question=question.question,
reference=reference
))
return {
"task_id": task_id,
"status": TaskStatus.PENDING,
"message": "问题已提交,正在处理中"
}
except Exception as e:
print(f"Error in ask_reference_question: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
@app.get("/{task_id}")
async def get_task_status(task_id: str):
"""获取任务状态和结果"""
redis = await get_redis()
try:
await redis.select(TASK_STATUS_DB)
task_data = await redis.hgetall(f"task:{task_id}")
if not task_data:
raise HTTPException(status_code=404, detail="任务不存在")
response = {
"task_id": task_id,
"status": task_data.get("status", TaskStatus.PENDING)
}
# 如果任务完成,添加结果
if task_data.get("status") == TaskStatus.COMPLETED:
response.update({
"answer": task_data.get("answer"),
"reference_title": task_data.get("reference_title")
})
# 如果任务失败,添加错误信息
elif task_data.get("status") == TaskStatus.FAILED:
response.update({
"error": task_data.get("error")
})
return response
except Exception as e:
print(f"Error getting task status: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
@app.get("/{reference_id}/qa/history")
async def get_reference_qa_history(reference_id: str):
"""获取文献问答历史记录"""
redis = await get_redis()
try:
# 从Redis获取对话历史
await redis.select(QA_HISTORY_DB)
chat_history_key = f"chat_history:{reference_id}"
history_data = await redis.get(chat_history_key)
if not history_data:
return []
return json.loads(history_data)
except Exception as e:
print(f"Error getting QA history: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
@app.get("/{reference_id}/analysis")
async def get_reference_analysis(reference_id: str):
"""获取文献分析结果"""
redis = await get_redis()
try:
# 从Redis获取分析报告
await redis.select(ANALYSIS_DB)
report_key = f"reference_report:{reference_id}"
report_data = await redis.get(report_key)
if not report_data:
raise HTTPException(status_code=404, detail="文献分析报告不存在")
return json.loads(report_data)
except Exception as e:
print(f"Error getting analysis report: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
async def process_batch_analysis(references: List[dict]):
"""批量处理文献分析的后台任务"""
redis = await get_redis()
# 限制并发数量
semaphore = asyncio.Semaphore(3)
async def process_single_reference(ref: dict):
async with semaphore:
try:
reference_id = ref["reference_id"]
file_path = ref["file_path"]
if not os.path.exists(file_path):
print(f"Reference file not found: {file_path}")
return
print(f"\n开始处理文献 {ref.get('reference_title', '未知标题')}")
print(f"文献ID: {reference_id}")
print(f"文件路径: {file_path}")
# 异步读取PDF
print("\n=== 步骤1: 读取PDF文件 ===")
pdf_content = await read_pdf_async(file_path)
if not pdf_content:
raise Exception("Failed to read PDF content")
# 打印字符数
content_length = len(pdf_content)
print(f"\n=== 步骤2: 内容长度检查 ===")
print(f"PDF内容总字符数: {content_length}")
# 根据内容长度选择不同的处理方式
if content_length <= 200000:
print(f"\n=== 步骤3A: 使用直接分析方式 ===")
print(f"文档长度在处理范围内 ({content_length} <= 200000)")
# 直接分析文档内容
print("开始分析文档内容...")
document_analysis = await analyze_reference_document_async(pdf_content[:180000])
if not document_analysis:
raise Exception("Failed to analyze document")
print("文档分析完成")
else:
print(f"\n=== 步骤3B: 使用分段分析方式 ===")
print(f"文档超过200000字符 ({content_length} > 200000)")
# 分段分析长文档
print("\n--- 开始分段分析 ---")
print("正在调用 analyze_long_document_async...")
try:
analysis_results = await analyze_long_document_async(pdf_content)
except Exception as e:
print(f"分段分析过程中出错: {str(e)}")
raise
if not analysis_results:
raise Exception("Failed to analyze document in segments")
print(f"分段分析完成,共分析了 {len(analysis_results)} 个段落")
# 合并分析结果
try:
document_analysis = await merge_analysis_results(analysis_results)
except Exception as e:
raise
if not document_analysis:
raise Exception("Failed to merge analysis results")
# 等待一小段时间避免API限制
await asyncio.sleep(1)
# 异步分析文献价值
try:
value_evaluation = await analyze_reference_value_async(document_analysis)
print(f"analyze_reference_value_async 返回结果类型: {type(value_evaluation)}")
except Exception as e:
raise
if not value_evaluation:
raise Exception("Failed to evaluate value")
print("文献价值分析完成")
# 合并结果
print("\n=== 步骤5: 保存最终结果 ===")
analysis_result = {
**document_analysis,
**value_evaluation,
"status": "completed",
"completion_time": datetime.now(timezone.utc).isoformat()
}
# 保存结果
await redis.select(ANALYSIS_DB)
report_key = f"reference_report:{reference_id}"
await redis.set(report_key, json.dumps(analysis_result))
except Exception as e:
try:
await redis.select(ANALYSIS_DB)
report_key = f"reference_report:{ref['reference_id']}"
error_status = {
"status": "failed",
"message": str(e),
"error_time": datetime.now(timezone.utc).isoformat()
}
await redis.set(report_key, json.dumps(error_status))
except Exception as redis_error:
print(f"Error updating Redis status: {redis_error}")
try:
# 并发处理所有引用
await asyncio.gather(
*(process_single_reference(ref) for ref in references)
)
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
# 在全局范围创建线程池
pdf_thread_pool = ThreadPoolExecutor(max_workers=3) # 限制并发PDF处理数量
# 创建异步HTTP客户端会话
async def get_aiohttp_session():
return aiohttp.ClientSession(
base_url="https://api.deepseek.com/v1/",
headers={"Authorization": f"Bearer {DEEPSEEK_API_KEY}"}
)
async def read_pdf_async(file_path: str) -> str:
"""在线程池中异步读取PDF"""
def read_pdf():
try:
pdf_content = ""
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
content = page.extract_text()
pdf_content += content
# 根据内容长度返回不同的结果
content_length = len(pdf_content)
print(f"\nPDF原始内容字符数: {content_length}")
if content_length <= 180000:
print("文档长度 ≤ 180000,返回完整内容")
return pdf_content
elif content_length <= 200000:
print("文档长度在 180000-200000 之间,截取前 180000 个字符")
return pdf_content[:180000]
else:
print("文档长度 > 200000,返回完整内容供分段处理")
return pdf_content # 返回完整内容,由调用者处理分段
except Exception as e:
print(f"Error reading PDF: {e}")
return ""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(pdf_thread_pool, read_pdf)
async def call_deepseek_api_async(messages: list) -> dict:
"""异步调用DeepSeek API"""
async with await get_aiohttp_session() as session:
async with session.post("/chat/completions", json={
"model": "deepseek-chat",
"messages": messages,
"response_format": {"type": "json_object"}
}) as response:
if response.status == 200:
data = await response.json()
return json.loads(data["choices"][0]["message"]["content"])
else:
raise Exception(f"API调用失败: {await response.text()}")
async def analyze_long_document_async(content: str) -> List[dict]:
"""分段分析长文档"""
# 将内容分成多个段落,每段约60000个字符(估算后约50000 tokens
segments = []
content_length = len(content)
segment_size = 60000 # 减小段落大小
print(f"\n开始分段处理,总字符数: {content_length}")
print(f"每段大小: {segment_size} 字符")
for i in range(0, content_length, segment_size):
segment = content[i:i + segment_size]
segments.append(segment)
print(f"文档已分段,共 {len(segments)} 个段落")
# 对每个段落进行分析
analysis_results = []
for i, segment in enumerate(segments):
print(f"\n开始分析第 {i+1}/{len(segments)} 个段落...")
system_prompt = f"""
You are an AI assistant tasked with analyzing part {i+1} of {len(segments)} of an academic paper.
Generate a comprehensive analysis in JSON format covering both basic information and content analysis.
Note that this is part {i+1} of a longer document, so focus on the content provided.
The JSON structure must strictly follow the provided template.
"""
user_prompt = f"""Analyze the following paper segment and extract all relevant information:
Content: {segment}
Generate a JSON response with the following structure:
{{
"1. Basic Information": {{
"author": "[Author name(s) and affiliations]",
"publication_date": "[Publication date in YYYY-MM format]",
"title": "[Full title of the document]",
"journal_publisher": "[Journal name or publisher details]",
"document_type": "[Type: journal article/book/conference paper etc.]"
}},
"2. Content Analysis": {{
"abstract": "[Paper abstract]",
"research_purpose": "[Main objectives and research questions]",
"methodology": "[Research methods, data collection and analysis approaches]",
"main_arguments": "[Key theoretical frameworks and arguments]",
"conclusions": "[Primary findings and conclusions]",
"innovations": "[Novel contributions and original aspects]"
}}
}}
"""
try:
result = await call_deepseek_api_async([
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
])
if result:
print(f"{i+1} 个段落分析完成")
analysis_results.append(result)
# 等待一小段时间避免API限制
await asyncio.sleep(1)
else:
print(f"{i+1} 个段落分析失败")
except Exception as e:
print(f"分析段落时出错: {str(e)}")
continue
return analysis_results
async def merge_analysis_results(results: List[dict]) -> dict:
"""合并多个分析结果"""
if not results:
return {}
print(f"开始合并 {len(results)} 个分析结果...")
system_prompt = """
You are an AI assistant tasked with merging multiple analysis results of different parts of the same academic paper.
Generate a comprehensive merged analysis in JSON format.
The JSON structure must strictly follow the provided template.
Ensure the merged result is coherent and eliminates redundancy.
"""
user_prompt = f"""Merge the following analysis results into a single coherent analysis:
Analysis Results: {json.dumps(results, ensure_ascii=False)}
Generate a JSON response with the following structure:
{{
"1. Basic Information": {{
"author": "[Merged author information]",
"publication_date": "[Publication date]",
"title": "[Complete title]",
"journal_publisher": "[Journal/publisher information]",
"document_type": "[Document type]"
}},
"2. Content Analysis": {{
"abstract": "[Complete abstract]",
"research_purpose": "[Comprehensive research objectives]",
"methodology": "[Complete methodology description]",
"main_arguments": "[Comprehensive theoretical frameworks and arguments]",
"conclusions": "[Complete findings and conclusions]",
"innovations": "[Complete list of innovations]"
}}
}}
"""
result = await call_deepseek_api_async([
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
])
if result:
print("分析结果合并完成")
else:
print("分析结果合并失败")
return result
async def analyze_reference_document_async(content: str):
"""分析文献的基本信息和内容"""
system_prompt = """
You are an AI assistant tasked with analyzing academic paper.
Generate a comprehensive analysis in JSON format covering both basic information and content analysis.
The JSON structure must strictly follow the provided template.
"""
user_prompt = f"""Analyze the following paper and extract all relevant information:
Content: {content}
Generate a JSON response with the following structure:
{{
"1. Basic Information": {{
"author": "[Author name(s) and affiliations]",
"publication_date": "[Publication date in YYYY-MM format]",
"title": "[Full title of the document]",
"journal_publisher": "[Journal name or publisher details]",
"document_type": "[Type: journal article/book/conference paper etc.]"
}},
"2. Content Analysis": {{
"abstract": "[Paper abstract]",
"research_purpose": "[Main objectives and research questions]",
"methodology": "[Research methods, data collection and analysis approaches]",
"main_arguments": "[Key theoretical frameworks and arguments]",
"conclusions": "[Primary findings and conclusions]",
"innovations": "[Novel contributions and original aspects]"
}}
}}
"""
return await call_deepseek_api_async([
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
])
async def analyze_reference_value_async(content_analysis: dict):
"""基于内容分析结果评估文献的价值"""
system_prompt = """
You are an AI assistant tasked with evaluating the value of academic paper based on its content analysis.
Generate a comprehensive value evaluation in JSON format.
The JSON structure must strictly follow the provided template.
"""
user_prompt = f"""Based on the following content analysis, evaluate the paper's value:
Content Analysis: {json.dumps(content_analysis, ensure_ascii=False)}
Generate a JSON response with the following structure:
{{
"3. Value Evaluation": {{
"academic_contribution": "[Significance to the field of study]",
"practical_significance": "[Real-world applications and implications]",
"limitations": "[Research constraints and weaknesses]",
"implications": "[Suggestions for future research and practice]"
}}
}}
"""
return await call_deepseek_api_async([
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
])
async def process_reference_question(task_id: str, reference_id: str, question: str, reference: dict):
"""处理文献问答的后台任务"""
redis = await get_redis()
try:
# 更新任务状态为处理中
await redis.select(TASK_STATUS_DB)
await redis.hset(
f"task:{task_id}",
mapping={
"status": TaskStatus.PROCESSING,
"reference_id": reference_id,
"question": question
}
)
# 从Redis获取分析报告
await redis.select(ANALYSIS_DB)
report_key = f"reference_report:{reference_id}"
report_data = await redis.get(report_key)
if not report_data:
raise Exception("文献分析报告不存在,请先进行分析")
# 读取PDF文件内容
file_path = reference.get("reference_link")
if not file_path or not os.path.exists(file_path):
raise Exception("文献文件不存在")
# 读取PDF内容
pdf_content = ""
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
content = page.extract_text()
pdf_content += content
# 限制内容长度
MAX_CHARS = 180000
pdf_content = pdf_content[:MAX_CHARS]
# 构建系统提示和用户提示
system_prompt = """
你是一个专业的学术助手,负责回答关于学术文献的问题。
你应该基于文献内容和分析报告提供准确、专业的回答。
回答应当简洁明了,并尽可能引用文献中的具体内容。
"""
# 构建上下文
context = {
"文献内容": pdf_content,
"分析报告": json.loads(report_data)
}
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"基于以下文献内容和分析报告回答问题:\n\n文献信息:{json.dumps(context, ensure_ascii=False)}\n\n问题:{question}"}
]
# 调用DeepSeek API获取回答
response = await call_deepseek_api_async(messages)
answer = response.get("answer", "")
# 保存对话历史到Redis
await redis.select(QA_HISTORY_DB)
chat_history_key = f"chat_history:{reference_id}"
# 获取现有历史记录
existing_history = await redis.get(chat_history_key)
history = json.loads(existing_history) if existing_history else []
# 添加新的对话
history.append({
"question": question,
"answer": answer,
"timestamp": datetime.now(timezone.utc).isoformat()
})
# 保存更新后的历史记录
await redis.set(chat_history_key, json.dumps(history))
# 更新任务状态为完成
await redis.select(TASK_STATUS_DB)
await redis.hset(
f"task:{task_id}",
mapping={
"status": TaskStatus.COMPLETED,
"answer": answer,
"reference_title": reference.get("reference_title")
}
)
except Exception as e:
print(f"Error processing question: {e}")
# 更新任务状态为失败
await redis.select(TASK_STATUS_DB)
await redis.hset(
f"task:{task_id}",
mapping={
"status": TaskStatus.FAILED,
"error": str(e)
}
)
finally:
try:
await redis.aclose()
except Exception as e:
print(f"Error closing Redis connection: {e}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=9005)