976 lines
38 KiB
Python
976 lines
38 KiB
Python
from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Query
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.responses import JSONResponse
|
||
from typing import Dict, List, Optional
|
||
from datetime import datetime, timezone
|
||
import asyncio
|
||
from motor.motor_asyncio import AsyncIOMotorClient
|
||
from bson import ObjectId
|
||
from pydantic import BaseModel, Field
|
||
from redis import asyncio as aioredis
|
||
from typing import Any
|
||
from contextlib import asynccontextmanager
|
||
import json
|
||
import os
|
||
import PyPDF2
|
||
import aiohttp
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from openai import OpenAI
|
||
import hashlib
|
||
|
||
# Database Configuration
|
||
MONGODB_URL = "mongodb://paper:SYX7cdJNMRbiytra@222.186.10.253:27017/paper"
|
||
REDIS_URL = "redis://:Obscura@2024@222.186.10.253:6379"
|
||
|
||
|
||
# Database connection
|
||
class Database:
|
||
"""数据库连接管理类"""
|
||
client: AsyncIOMotorClient = None
|
||
|
||
db = Database()
|
||
|
||
async def get_database() -> AsyncIOMotorClient:
|
||
return db.client["paper"]
|
||
|
||
async def connect_to_mongo():
|
||
try:
|
||
db.client = AsyncIOMotorClient(MONGODB_URL)
|
||
# 验证连接
|
||
await db.client.admin.command('ping')
|
||
print("Successfully connected to MongoDB")
|
||
except Exception as e:
|
||
print(f"Could not connect to MongoDB: {e}")
|
||
raise
|
||
|
||
async def close_mongo_connection():
|
||
db.client.close()
|
||
|
||
# Redis setup
|
||
async def get_redis():
|
||
redis = aioredis.from_url(
|
||
REDIS_URL,
|
||
encoding="utf-8",
|
||
decode_responses=True,
|
||
)
|
||
return redis
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
# Startup
|
||
await connect_to_mongo()
|
||
yield
|
||
# Shutdown
|
||
await close_mongo_connection()
|
||
|
||
# 更新 FastAPI 实例化
|
||
app = FastAPI(lifespan=lifespan)
|
||
|
||
# CORS configuration
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
expose_headers=["*"]
|
||
)
|
||
|
||
|
||
# 在其他 Pydantic 模型后添加
|
||
class paperModel(BaseModel):
|
||
"""文献引用模型"""
|
||
paper_link: str
|
||
paper_title: str
|
||
upload_time: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||
|
||
class Config:
|
||
populate_by_name = True
|
||
arbitrary_types_allowed = True
|
||
json_encoders = {ObjectId: str}
|
||
|
||
|
||
# 删除文献
|
||
@app.delete("/paper/delete/{paper_id}")
|
||
async def delete_paper(
|
||
paper_id: str
|
||
):
|
||
"""删除文献"""
|
||
db = await get_database()
|
||
|
||
try:
|
||
# 获取文献信息
|
||
paper = await db.papers.find_one({"_id": ObjectId(paper_id)})
|
||
if not paper:
|
||
raise HTTPException(status_code=404, detail="paper not found")
|
||
|
||
# 从文件存储服务删除文件
|
||
file_hash = paper.get("file_hash")
|
||
# 删除数据库记录
|
||
await db.papers.delete_one({"_id": ObjectId(paper_id)})
|
||
|
||
# 删除Redis中的分析报告(如果存在)
|
||
try:
|
||
redis = await get_redis()
|
||
await redis.select(190)
|
||
report_key = f"paper_report:{file_hash}"
|
||
await redis.delete(report_key)
|
||
|
||
# 删除聊天历史(如果存在)
|
||
await redis.select(191)
|
||
chat_history_key = f"chat_history:{paper_id}"
|
||
await redis.delete(chat_history_key)
|
||
except Exception as redis_error:
|
||
print(f"Warning: Failed to delete Redis keys: {str(redis_error)}")
|
||
finally:
|
||
try:
|
||
await redis.aclose()
|
||
except Exception as e:
|
||
print(f"Error closing Redis connection: {e}")
|
||
|
||
return {"message": "paper successfully deleted"}
|
||
|
||
except Exception as e:
|
||
print(f"Error deleting paper: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"Failed to delete paper: {str(e)}")
|
||
|
||
@app.get("/paper/papers")
|
||
async def get_papers():
|
||
"""获取文献列表"""
|
||
db = await get_database()
|
||
|
||
try:
|
||
papers = []
|
||
async for ref in db.papers.find():
|
||
papers.append({
|
||
"_id": str(ref["_id"]),
|
||
"paper_link": ref["paper_link"],
|
||
"paper_title": ref["paper_title"],
|
||
"upload_time": ref["upload_time"],
|
||
"file_hash": ref.get("file_hash")
|
||
})
|
||
return papers
|
||
|
||
except Exception as e:
|
||
print(f"Error getting papers: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
@app.get("/paper/report/{file_hash}")
|
||
async def get_report(
|
||
file_hash: str
|
||
):
|
||
"""从 Redis db190 直接通过文件哈希值读取已保存的文献报告"""
|
||
redis = await get_redis()
|
||
|
||
try:
|
||
# 选择 db190
|
||
await redis.select(190)
|
||
report_key = f"paper_report:{file_hash}"
|
||
|
||
# 获取已保存的报告
|
||
existing_report = await redis.get(report_key)
|
||
|
||
if not existing_report:
|
||
raise HTTPException(status_code=404, detail="No saved paper report found")
|
||
|
||
# 返回报告
|
||
return json.loads(existing_report)
|
||
|
||
except Exception as e:
|
||
print(f"Error getting paper report: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
finally:
|
||
try:
|
||
await redis.aclose()
|
||
except Exception as e:
|
||
print(f"Error closing Redis connection: {e}")
|
||
|
||
class FileUpload(BaseModel):
|
||
filename: str
|
||
hash: str
|
||
|
||
class BatchUploadRequest(BaseModel):
|
||
files: List[FileUpload]
|
||
|
||
@app.post("/paper/upload")
|
||
async def batch_upload(
|
||
request: BatchUploadRequest
|
||
):
|
||
"""批量上传项目相关文献"""
|
||
db = await get_database()
|
||
redis = await get_redis()
|
||
|
||
try:
|
||
uploaded_papers = []
|
||
papers_to_analyze = []
|
||
|
||
# 批量处理文件
|
||
for file in request.files:
|
||
try:
|
||
# 检查文件是否已存在分析报告
|
||
await redis.select(190)
|
||
report_key = f"paper_report:{file.hash}"
|
||
existing_report = await redis.get(report_key)
|
||
print(f"[Redis] existing_report: {existing_report}")
|
||
print(f"[Redis] file.hash: {file.hash}")
|
||
if existing_report:
|
||
report_data = json.loads(existing_report)
|
||
if report_data.get("status") == "completed":
|
||
print(f"[Redis] Report already exists for file {file.filename} with hash {file.hash}")
|
||
# 即使报告存在,也创建新的文献记录
|
||
paper = {
|
||
"paper_link": f"https://files.aiot.ml/pdf/content/{file.hash}",
|
||
"paper_title": file.filename,
|
||
"upload_time": datetime.now(timezone.utc),
|
||
"file_hash": file.hash
|
||
}
|
||
|
||
result = await db.papers.insert_one(paper)
|
||
paper_info = {
|
||
"paper_id": str(result.inserted_id),
|
||
"file_hash": file.hash,
|
||
"paper_title": paper["paper_title"]
|
||
}
|
||
uploaded_papers.append(paper_info)
|
||
continue
|
||
|
||
# 创建记录
|
||
paper = {
|
||
"paper_link": f"https://files.aiot.ml/pdf/content/{file.hash}",
|
||
"paper_title": file.filename,
|
||
"upload_time": datetime.now(timezone.utc),
|
||
"file_hash": file.hash
|
||
}
|
||
|
||
result = await db.papers.insert_one(paper)
|
||
paper_info = {
|
||
"paper_id": str(result.inserted_id),
|
||
"file_hash": file.hash,
|
||
"paper_title": paper["paper_title"]
|
||
}
|
||
uploaded_papers.append(paper_info)
|
||
|
||
# 如果没有现有报告或报告未完成,创建初始状态并添加到待分析列表
|
||
if not existing_report or json.loads(existing_report).get("status") != "completed":
|
||
initial_status = {
|
||
"status": "processing",
|
||
"message": "Analysis in progress"
|
||
}
|
||
await redis.set(report_key, json.dumps(initial_status))
|
||
papers_to_analyze.append(paper_info)
|
||
|
||
except Exception as file_error:
|
||
print(f"处理文件 {file.filename} 时出错: {str(file_error)}")
|
||
continue
|
||
|
||
# 只对没有报告的文件启动分析任务
|
||
if papers_to_analyze:
|
||
asyncio.create_task(batch_analysis(papers_to_analyze))
|
||
print(f"[Redis] 报告不存在,开始分析: {papers_to_analyze}")
|
||
return {
|
||
"message": f"Successfully uploaded {len(uploaded_papers)} files",
|
||
"uploaded_files": uploaded_papers,
|
||
"files_to_analyze": len(papers_to_analyze)
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"批量上传错误: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
finally:
|
||
try:
|
||
await redis.aclose()
|
||
except Exception as e:
|
||
print(f"Error closing Redis connection: {e}")
|
||
|
||
|
||
async def batch_analysis(papers: List[dict]):
|
||
"""批量处理文献分析的后台任务"""
|
||
redis = await get_redis()
|
||
|
||
# 限制并发数量
|
||
semaphore = asyncio.Semaphore(3)
|
||
|
||
async def single_paper(ref: dict):
|
||
async with semaphore:
|
||
try:
|
||
paper_id = ref["paper_id"]
|
||
file_hash = ref["file_hash"]
|
||
|
||
# 再次检查报告是否存在(以防在开始分析前已经被其他进程分析)
|
||
await redis.select(190)
|
||
report_key = f"paper_report:{file_hash}"
|
||
existing_report = await redis.get(report_key)
|
||
if existing_report:
|
||
report_data = json.loads(existing_report)
|
||
if report_data.get("status") == "completed":
|
||
print(f"Report already exists for file hash: {file_hash}, skipping analysis")
|
||
return
|
||
# 从文件存储服务获取PDF内容
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(f'https://files.aiot.ml/pdf/content/{file_hash}') as response:
|
||
if response.status != 200:
|
||
print(f"[Redis] 获取PDF内容失败: {response.status}")
|
||
raise Exception(f"Failed to get PDF content for file hash: {file_hash}")
|
||
|
||
pdf_content = await response.json()
|
||
if not pdf_content.get('content'):
|
||
raise Exception("No PDF content returned")
|
||
|
||
print(f"\n开始处理文献 {ref.get('paper_title', '未知标题')}")
|
||
print(f"文献ID: {paper_id}")
|
||
print(f"文件哈希: {file_hash}")
|
||
|
||
# 使用获取到的PDF内容继续处理
|
||
content = pdf_content['content']
|
||
# 如果content是列表,将其合并为单个字符串
|
||
if isinstance(content, list):
|
||
content = '\n'.join(content)
|
||
|
||
# 打印字符数
|
||
content_length = len(content)
|
||
print(f"\n=== 步骤2: 内容长度检查 ===")
|
||
print(f"PDF内容总字符数: {content_length}")
|
||
|
||
# 根据内容长度选择不同的处理方式
|
||
if content_length <= 200000:
|
||
print(f"\n=== 步骤3A: 使用直接分析方式 ===")
|
||
print(f"文档长度在处理范围内 ({content_length} <= 200000)")
|
||
document_analysis = await analyze_paper(content[:180000])
|
||
if not document_analysis:
|
||
raise Exception("Failed to analyze document")
|
||
print("文档分析完成")
|
||
else:
|
||
print(f"\n=== 步骤3B: 使用分段分析方式 ===")
|
||
print(f"文档超过200000字符 ({content_length} > 200000)")
|
||
analysis_results = await analyze_long_file(content)
|
||
if not analysis_results:
|
||
raise Exception("Failed to analyze document in segments")
|
||
print(f"分段分析完成,共分析了 {len(analysis_results)} 个段落")
|
||
|
||
document_analysis = await merge_results(analysis_results)
|
||
if not document_analysis:
|
||
raise Exception("Failed to merge analysis results")
|
||
|
||
# 等待一小段时间避免API限制
|
||
await asyncio.sleep(1)
|
||
|
||
# 异步分析文献价值
|
||
value_evaluation = await paper_value(document_analysis)
|
||
if not value_evaluation:
|
||
raise Exception("Failed to evaluate value")
|
||
print("文献价值分析完成")
|
||
|
||
# 合并结果
|
||
print("\n=== 步骤5: 保存最终结果 ===")
|
||
analysis_result = {
|
||
**document_analysis,
|
||
**value_evaluation,
|
||
"status": "completed"
|
||
}
|
||
|
||
# 保存结果
|
||
await redis.select(190)
|
||
await redis.set(report_key, json.dumps(analysis_result))
|
||
|
||
except Exception as e:
|
||
try:
|
||
await redis.select(190)
|
||
report_key = f"paper_report:{file_hash}"
|
||
error_status = {
|
||
"status": "failed",
|
||
"message": str(e)
|
||
}
|
||
await redis.set(report_key, json.dumps(error_status))
|
||
except Exception as redis_error:
|
||
print(f"Error updating Redis status: {redis_error}")
|
||
|
||
try:
|
||
# 并发处理所有引用
|
||
await asyncio.gather(
|
||
*(single_paper(ref) for ref in papers)
|
||
)
|
||
finally:
|
||
try:
|
||
await redis.aclose()
|
||
except Exception as e:
|
||
print(f"Error closing Redis connection: {e}")
|
||
|
||
# 在全局范围创建线程池
|
||
pdf_thread_pool = ThreadPoolExecutor(max_workers=3) # 限制并发PDF处理数量
|
||
# DeepSeek API Configuration
|
||
client = OpenAI(
|
||
base_url="https://api.deepseek.com/v1",
|
||
api_key="sk-3027fb3c810b4e17985fa397d41250b9"
|
||
)
|
||
# 创建异步HTTP客户端会话
|
||
async def get_aiohttp_session():
|
||
return aiohttp.ClientSession(
|
||
base_url="https://api.deepseek.com/v1/", # 添加了末尾的斜杠
|
||
headers={"Authorization": f"Bearer sk-3027fb3c810b4e17985fa397d41250b9"}
|
||
)
|
||
|
||
async def read_pdf(file_path: str) -> str:
|
||
"""在线程池中异步读取PDF"""
|
||
def read_pdf():
|
||
try:
|
||
pdf_content = ""
|
||
with open(file_path, 'rb') as file:
|
||
pdf_reader = PyPDF2.PdfReader(file)
|
||
for page in pdf_reader.pages:
|
||
content = page.extract_text()
|
||
pdf_content += content
|
||
|
||
# 根据内容长度返回不同的结果
|
||
content_length = len(pdf_content)
|
||
print(f"\nPDF原始内容字符数: {content_length}")
|
||
|
||
if content_length <= 180000:
|
||
print("文档长度 ≤ 180000,返回完整内容")
|
||
return pdf_content
|
||
elif content_length <= 200000:
|
||
print("文档长度在 180000-200000 之间,截取前 180000 个字符")
|
||
return pdf_content[:180000]
|
||
else:
|
||
print("文档长度 > 200000,返回完整内容供分段处理")
|
||
return pdf_content # 返回完整内容,由调用者处理分段
|
||
|
||
except Exception as e:
|
||
print(f"Error reading PDF: {e}")
|
||
return ""
|
||
|
||
loop = asyncio.get_event_loop()
|
||
return await loop.run_in_executor(pdf_thread_pool, read_pdf)
|
||
|
||
async def call_deepseek(messages: list) -> dict:
|
||
"""异步调用DeepSeek API"""
|
||
async with await get_aiohttp_session() as session:
|
||
async with session.post("/chat/completions", json={
|
||
"model": "deepseek-chat",
|
||
"messages": messages,
|
||
"response_format": {"type": "json_object"}
|
||
}) as response:
|
||
if response.status == 200:
|
||
data = await response.json()
|
||
return json.loads(data["choices"][0]["message"]["content"])
|
||
else:
|
||
raise Exception(f"API调用失败: {await response.text()}")
|
||
|
||
async def analyze_long_file(content: str) -> List[dict]:
|
||
"""分段分析长文档"""
|
||
# 将内容分成多个段落,每段约60000个字符(估算后约50000 tokens)
|
||
segments = []
|
||
content_length = len(content)
|
||
segment_size = 60000 # 减小段落大小
|
||
|
||
print(f"\n开始分段处理,总字符数: {content_length}")
|
||
print(f"每段大小: {segment_size} 字符")
|
||
|
||
for i in range(0, content_length, segment_size):
|
||
segment = content[i:i + segment_size]
|
||
segments.append(segment)
|
||
|
||
print(f"文档已分段,共 {len(segments)} 个段落")
|
||
|
||
# 对每个段落进行分析
|
||
analysis_results = []
|
||
for i, segment in enumerate(segments):
|
||
print(f"\n开始分析第 {i+1}/{len(segments)} 个段落...")
|
||
|
||
system_prompt = f"""
|
||
You are an AI assistant tasked with analyzing part {i+1} of {len(segments)} of an academic paper.
|
||
Generate a comprehensive analysis in JSON format covering both basic information and content analysis.
|
||
Note that this is part {i+1} of a longer document, so focus on the content provided.
|
||
The JSON structure must strictly follow the provided template.
|
||
Also generate a Mermaid flowchart code to visualize the research methodology if this segment contains methodology information.
|
||
Create a detailed and comprehensive flowchart that accurately represents the paper's research methodology.
|
||
"""
|
||
|
||
user_prompt = f"""Analyze the following paper segment and extract all relevant information:
|
||
Content: {segment}
|
||
|
||
Generate a JSON response with the following structure:
|
||
|
||
{{
|
||
"1. Basic Information": {{
|
||
"author": "[Author name(s) and affiliations]",
|
||
"publication_date": "[Publication date in YYYY-MM format]",
|
||
"title": "[Full title of the document]",
|
||
"journal_publisher": "[Journal name or publisher details]",
|
||
"document_type": "[Type: journal article/book/conference paper etc.]"
|
||
}},
|
||
"2. Content Analysis": {{
|
||
"abstract": "[Paper abstract]",
|
||
"research_purpose": "[Main objectives and research questions]",
|
||
"methodology": "[Research methods, data collection and analysis approaches]",
|
||
"main_arguments": "[Key theoretical frameworks and arguments]",
|
||
"conclusions": "[Complete findings and conclusions]",
|
||
"innovations": "[Novel contributions and original aspects]"
|
||
}},
|
||
"flowchart": "[If this segment contains methodology information, generate a Mermaid flowchart code that visualizes the research methodology. Follow these rules:
|
||
1. Use 'graph TD' for top-down flow
|
||
2. Each node should be in format: id[text] where:
|
||
- id is a unique identifier (A, B1, B2, etc.)
|
||
- text should be simple and clear, using only letters, numbers, and spaces
|
||
- DO NOT use any special characters including parentheses, colons, commas
|
||
- abbreviations should be written without parentheses, e.g., 'DNN' not '(DNN)'
|
||
- use space instead of special characters, e.g., 'Deep Learning Model' not 'Deep-Learning/Model'
|
||
3. Connections use '-->' between nodes
|
||
4. Ensure each line ends with a proper node paper
|
||
|
||
Create a detailed flowchart that shows:
|
||
- Research objectives and questions
|
||
- All major research methods used
|
||
- Data collection and analysis processes
|
||
- Key experimental or analytical steps
|
||
- Result synthesis and conclusion formation
|
||
|
||
Make the flowchart as detailed as possible while maintaining clarity.
|
||
If this segment does not contain methodology information, set this field to null.]"
|
||
}}
|
||
"""
|
||
|
||
try:
|
||
result = await call_deepseek([
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": user_prompt}
|
||
])
|
||
|
||
if result:
|
||
print(f"第 {i+1} 个段落分析完成")
|
||
analysis_results.append(result)
|
||
# 等待一小段时间避免API限制
|
||
await asyncio.sleep(1)
|
||
else:
|
||
print(f"第 {i+1} 个段落分析失败")
|
||
except Exception as e:
|
||
print(f"分析段落时出错: {str(e)}")
|
||
continue
|
||
|
||
return analysis_results
|
||
|
||
async def merge_results(results: List[dict]) -> dict:
|
||
"""合并多个分析结果"""
|
||
if not results:
|
||
return {}
|
||
|
||
print(f"开始合并 {len(results)} 个分析结果...")
|
||
|
||
system_prompt = """
|
||
You are an AI assistant tasked with merging multiple analysis results of different parts of the same academic paper.
|
||
Generate a comprehensive merged analysis in JSON format.
|
||
The JSON structure must strictly follow the provided template.
|
||
Ensure the merged result is coherent and eliminates redundancy.
|
||
For the flowchart, select the most comprehensive one from the input results, or combine multiple flowcharts if they contain complementary information.
|
||
When combining flowcharts, ensure the result follows Mermaid syntax rules and avoids special characters.
|
||
Create a detailed and comprehensive flowchart that accurately represents the paper's complete research methodology.
|
||
"""
|
||
|
||
user_prompt = f"""Merge the following analysis results into a single coherent analysis:
|
||
Analysis Results: {json.dumps(results, ensure_ascii=False)}
|
||
|
||
Generate a JSON response with the following structure:
|
||
|
||
{{
|
||
"1. Basic Information": {{
|
||
"author": "[Merged author information]",
|
||
"publication_date": "[Publication date]",
|
||
"title": "[Complete title]",
|
||
"journal_publisher": "[Journal/publisher information]",
|
||
"document_type": "[Document type]"
|
||
}},
|
||
"2. Content Analysis": {{
|
||
"abstract": "[Complete abstract]",
|
||
"research_purpose": "[Comprehensive research objectives]",
|
||
"methodology": "[Complete methodology description]",
|
||
"main_arguments": "[Comprehensive theoretical frameworks and arguments]",
|
||
"conclusions": "[Complete findings and conclusions]",
|
||
"innovations": "[Complete list of innovations]"
|
||
}},
|
||
"flowchart": "[Select or combine the flowcharts following these rules:
|
||
1. Use 'graph TD' for top-down flow
|
||
2. Each node should be in format: id[text] where:
|
||
- id is a unique identifier (A, B1, B2, etc.)
|
||
- text should be simple and clear, using only letters, numbers, and spaces
|
||
- DO NOT use any special characters including parentheses, colons, commas
|
||
- abbreviations should be written without parentheses, e.g., 'DNN' not '(DNN)'
|
||
- use space instead of special characters, e.g., 'Deep Learning Model' not 'Deep-Learning/Model'
|
||
3. Connections use '-->' between nodes
|
||
4. Ensure each line ends with a proper node paper
|
||
|
||
Create a detailed flowchart that shows:
|
||
- Research objectives and questions
|
||
- All major research methods used
|
||
- Data collection and analysis processes
|
||
- Key experimental or analytical steps
|
||
- Result synthesis and conclusion formation
|
||
|
||
Make the flowchart as detailed as possible while maintaining clarity.
|
||
If no valid flowchart is found in any segment, set this field to null.]"
|
||
}}
|
||
"""
|
||
|
||
result = await call_deepseek([
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": user_prompt}
|
||
])
|
||
|
||
if result:
|
||
print("分析结果合并完成")
|
||
else:
|
||
print("分析结果合并失败")
|
||
|
||
return result
|
||
async def analyze_paper(content: str):
|
||
"""分析文献的基本信息和内容"""
|
||
system_prompt = """
|
||
You are an AI assistant tasked with analyzing academic paper.
|
||
Generate a comprehensive analysis in JSON format covering both basic information and content analysis.
|
||
The JSON structure must strictly follow the provided template.
|
||
Also generate a Mermaid flowchart code to visualize the research methodology.
|
||
Ensure the flowchart follows strict formatting rules to avoid parsing errors.
|
||
Create a detailed and comprehensive flowchart that accurately represents the paper's research methodology.
|
||
"""
|
||
|
||
user_prompt = f"""Analyze the following paper and extract all relevant information:
|
||
Content: {content}
|
||
|
||
Generate a JSON response with the following structure:
|
||
|
||
{{
|
||
"1. Basic Information": {{
|
||
"author": "[Author name(s) and affiliations]",
|
||
"publication_date": "[Publication date in YYYY-MM format]",
|
||
"title": "[Full title of the document]",
|
||
"journal_publisher": "[Journal name or publisher details]",
|
||
"document_type": "[Type: journal article/book/conference paper etc.]"
|
||
}},
|
||
"2. Content Analysis": {{
|
||
"abstract": "[Paper abstract]",
|
||
"research_purpose": "[Main objectives and research questions]",
|
||
"methodology": "[Research methods, data collection and analysis approaches]",
|
||
"main_arguments": "[Key theoretical frameworks and arguments]",
|
||
"conclusions": "[Primary findings and conclusions]",
|
||
"innovations": "[Novel contributions and original aspects]"
|
||
}},
|
||
"flowchart": "[Generate a Mermaid flowchart code that visualizes the research methodology. Follow these rules:
|
||
1. Use 'graph TD' for top-down flow
|
||
2. Each node should be in format: id[text] where:
|
||
- id is a unique identifier (A, B1, B2, etc.)
|
||
- text should be simple and clear, using only letters, numbers, and spaces
|
||
- DO NOT use any special characters including parentheses, colons, commas
|
||
- abbreviations should be written without parentheses, e.g., 'DNN' not '(DNN)'
|
||
- use space instead of special characters, e.g., 'Deep Learning Model' not 'Deep-Learning/Model'
|
||
3. Connections use '-->' between nodes
|
||
4. Ensure each line ends with a proper node paper
|
||
|
||
Create a detailed flowchart that shows:
|
||
- Research objectives and questions
|
||
- All major research methods used
|
||
- Data collection and analysis processes
|
||
- Key experimental or analytical steps
|
||
- Result synthesis and conclusion formation
|
||
|
||
Make the flowchart as detailed as possible while maintaining clarity.
|
||
If the paper does not contain clear methodology information, set this field to null.]"
|
||
}}
|
||
"""
|
||
|
||
return await call_deepseek([
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": user_prompt}
|
||
])
|
||
|
||
async def paper_value(content_analysis: dict):
|
||
"""基于内容分析结果评估文献的价值"""
|
||
system_prompt = """
|
||
You are an AI assistant tasked with evaluating the value of academic paper based on its content analysis.
|
||
Generate a comprehensive value evaluation in JSON format.
|
||
The JSON structure must strictly follow the provided template.
|
||
"""
|
||
|
||
user_prompt = f"""Based on the following content analysis, evaluate the paper's value:
|
||
Content Analysis: {json.dumps(content_analysis, ensure_ascii=False)}
|
||
|
||
Generate a JSON response with the following structure:
|
||
|
||
{{
|
||
"3. Value Evaluation": {{
|
||
"academic_contribution": "[Significance to the field of study]",
|
||
"practical_significance": "[Real-world applications and implications]",
|
||
"limitations": "[Research constraints and weaknesses]",
|
||
"implications": "[Suggestions for future research and practice]"
|
||
}}
|
||
}}
|
||
"""
|
||
|
||
return await call_deepseek([
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": user_prompt}
|
||
])
|
||
|
||
# 添加问答相关的模型
|
||
class QuestionModel(BaseModel):
|
||
"""问题模型"""
|
||
question: str
|
||
|
||
class TaskStatus:
|
||
PENDING = "pending"
|
||
PROCESSING = "processing"
|
||
COMPLETED = "completed"
|
||
FAILED = "failed"
|
||
|
||
@app.post("/paper/{paper_id}/qa")
|
||
async def ask_reference_question(
|
||
paper_id: str,
|
||
question: QuestionModel
|
||
):
|
||
"""Ask questions about the paper (async)"""
|
||
db = await get_database()
|
||
redis = await get_redis()
|
||
|
||
try:
|
||
# Verify paper exists
|
||
paper = await db.papers.find_one({"_id": ObjectId(paper_id)})
|
||
if not paper:
|
||
raise HTTPException(status_code=404, detail="Paper not found")
|
||
|
||
# Generate task ID
|
||
task_id = str(ObjectId())
|
||
|
||
# Create background task
|
||
asyncio.create_task(paper_question(
|
||
task_id=task_id,
|
||
paper_id=paper_id,
|
||
question=question.question,
|
||
paper=paper
|
||
))
|
||
|
||
return {
|
||
"task_id": task_id,
|
||
"status": TaskStatus.PENDING,
|
||
"message": "Question submitted, processing in progress"
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"Error in ask_reference_question: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
finally:
|
||
try:
|
||
await redis.aclose()
|
||
except Exception as e:
|
||
print(f"Error closing Redis connection: {e}")
|
||
|
||
async def paper_question(task_id: str, paper_id: str, question: str, paper: dict):
|
||
"""Process paper Q&A background task"""
|
||
redis = await get_redis()
|
||
|
||
try:
|
||
# Update task status to processing
|
||
await redis.select(192) # Use db192 to store task status
|
||
await redis.hset(
|
||
f"task:{task_id}",
|
||
mapping={
|
||
"status": TaskStatus.PROCESSING,
|
||
"paper_id": paper_id,
|
||
"question": question
|
||
}
|
||
)
|
||
|
||
# Get analysis report from Redis
|
||
await redis.select(190)
|
||
report_key = f"paper_report:{paper_id}"
|
||
report_data = await redis.get(report_key)
|
||
|
||
if not report_data:
|
||
raise Exception("Paper analysis report does not exist, please analyze first")
|
||
|
||
# Read PDF file content
|
||
file_path = paper.get("paper_link")
|
||
if not file_path or not os.path.exists(file_path):
|
||
raise Exception("Paper file does not exist")
|
||
|
||
# Read PDF content
|
||
pdf_content = ""
|
||
with open(file_path, 'rb') as file:
|
||
pdf_reader = PyPDF2.PdfReader(file)
|
||
for page in pdf_reader.pages:
|
||
content = page.extract_text()
|
||
pdf_content += content
|
||
|
||
# Limit content length
|
||
MAX_CHARS = 180000
|
||
pdf_content = pdf_content[:MAX_CHARS]
|
||
|
||
# Build system prompt and user prompt
|
||
system_prompt = """
|
||
You are a professional academic assistant responsible for answering questions about academic papers.
|
||
You should provide accurate and professional answers based on the paper content and analysis report.
|
||
Answers should be concise and clear, citing specific content from the paper whenever possible.
|
||
"""
|
||
|
||
# Build context
|
||
context = {
|
||
"Paper Content": pdf_content,
|
||
"Analysis Report": json.loads(report_data)
|
||
}
|
||
|
||
messages = [
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": f"Answer the question based on the following paper content and analysis report:\n\nPaper Information:{json.dumps(context, ensure_ascii=False)}\n\nQuestion:{question}"}
|
||
]
|
||
|
||
# Call DeepSeek API for answer
|
||
response = client.chat.completions.create(
|
||
model="deepseek-chat",
|
||
messages=messages
|
||
)
|
||
answer = response.choices[0].message.content
|
||
|
||
# Save conversation history to Redis db191
|
||
await redis.select(191)
|
||
chat_history_key = f"chat_history:{paper_id}"
|
||
|
||
# Get existing history
|
||
existing_history = await redis.get(chat_history_key)
|
||
history = json.loads(existing_history) if existing_history else []
|
||
|
||
# Add new conversation
|
||
history.append({
|
||
"question": question,
|
||
"answer": answer,
|
||
"timestamp": datetime.now(timezone.utc).isoformat()
|
||
})
|
||
|
||
# Save updated history
|
||
await redis.set(chat_history_key, json.dumps(history))
|
||
|
||
# Update task status to completed
|
||
await redis.select(192)
|
||
await redis.hset(
|
||
f"task:{task_id}",
|
||
mapping={
|
||
"status": TaskStatus.COMPLETED,
|
||
"answer": answer,
|
||
"paper_title": paper.get("paper_title")
|
||
}
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f"Error processing question: {e}")
|
||
# Update task status to failed
|
||
await redis.select(192)
|
||
await redis.hset(
|
||
f"task:{task_id}",
|
||
mapping={
|
||
"status": TaskStatus.FAILED,
|
||
"error": str(e)
|
||
}
|
||
)
|
||
finally:
|
||
try:
|
||
await redis.aclose()
|
||
except Exception as e:
|
||
print(f"Error closing Redis connection: {e}")
|
||
|
||
@app.get("/paper/task/{task_id}")
|
||
async def get_task_status(task_id: str):
|
||
"""Get task status and result"""
|
||
redis = await get_redis()
|
||
|
||
try:
|
||
await redis.select(192)
|
||
task_data = await redis.hgetall(f"task:{task_id}")
|
||
|
||
if not task_data:
|
||
raise HTTPException(status_code=404, detail="Task not found")
|
||
|
||
response = {
|
||
"task_id": task_id,
|
||
"status": task_data.get("status", TaskStatus.PENDING)
|
||
}
|
||
|
||
# If task completed, add results
|
||
if task_data.get("status") == TaskStatus.COMPLETED:
|
||
response.update({
|
||
"answer": task_data.get("answer"),
|
||
"paper_title": task_data.get("paper_title")
|
||
})
|
||
# If task failed, add error information
|
||
elif task_data.get("status") == TaskStatus.FAILED:
|
||
response.update({
|
||
"error": task_data.get("error")
|
||
})
|
||
|
||
return response
|
||
|
||
except Exception as e:
|
||
print(f"Error getting task status: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
finally:
|
||
try:
|
||
await redis.aclose()
|
||
except Exception as e:
|
||
print(f"Error closing Redis connection: {e}")
|
||
|
||
@app.get("/paper/{paper_id}/qa/history")
|
||
async def get_paper_qa_history(
|
||
paper_id: str
|
||
):
|
||
"""Get paper Q&A history"""
|
||
redis = await get_redis()
|
||
|
||
try:
|
||
# Get conversation history from Redis db191
|
||
await redis.select(191)
|
||
chat_history_key = f"chat_history:{paper_id}"
|
||
|
||
history_data = await redis.get(chat_history_key)
|
||
if not history_data:
|
||
return []
|
||
|
||
return json.loads(history_data)
|
||
|
||
except Exception as e:
|
||
print(f"Error getting QA history: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
finally:
|
||
try:
|
||
await redis.aclose()
|
||
except Exception as e:
|
||
print(f"Error closing Redis connection: {e}")
|
||
|
||
# 添加新的路由,通过哈希值获取报告
|
||
@app.get("/paper/check/{file_hash}")
|
||
async def check_report(
|
||
file_hash: str
|
||
):
|
||
"""直接通过文件哈希值从 Redis db190 读取已保存的文献报告"""
|
||
redis = await get_redis()
|
||
|
||
try:
|
||
# 选择 db190
|
||
await redis.select(190)
|
||
report_key = f"paper_report:{file_hash}"
|
||
|
||
# 获取已保存的报告
|
||
existing_report = await redis.get(report_key)
|
||
|
||
if not existing_report:
|
||
return {"status": "not_found"}
|
||
|
||
# 返回报告
|
||
return json.loads(existing_report)
|
||
|
||
except Exception as e:
|
||
print(f"Error getting paper report by hash: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
finally:
|
||
try:
|
||
await redis.aclose()
|
||
except Exception as e:
|
||
print(f"Error closing Redis connection: {e}")
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run(app, host="0.0.0.0", port=9005)
|