commit a5357af69475c4d3869bfb1e54f8c10d16a630a1 Author: zydi Date: Thu Jan 16 05:14:29 2025 +0000 Initial commit diff --git a/paper.html b/paper.html new file mode 100644 index 0000000..99094e9 --- /dev/null +++ b/paper.html @@ -0,0 +1,1131 @@ + + + + + + Paper Analysis + + + + + + +
+
+ +
+ +
+ + +
+
+ + + + + +
+
+
+
+ + + + + \ No newline at end of file diff --git a/paper.py b/paper.py new file mode 100644 index 0000000..2c8a00f --- /dev/null +++ b/paper.py @@ -0,0 +1,740 @@ +from fastapi import FastAPI, UploadFile, File, HTTPException +from fastapi.responses import JSONResponse +from fastapi.middleware.cors import CORSMiddleware +from typing import List, Optional, Dict, Any +from datetime import datetime, timezone +import os +import json +import PyPDF2 +import asyncio +from concurrent.futures import ThreadPoolExecutor +import aiohttp +from pydantic import BaseModel +from redis import asyncio as aioredis + +# 配置 +UPLOAD_PATH = "/obscura/task/paper" +DEEPSEEK_API_KEY = "sk-3027fb3c810b4e17985fa397d41250b9" +REDIS_URL = "redis://:Obscura@2024@222.186.10.253:6379" + +# Redis数据库编号 +ANALYSIS_DB = 190 # 文献分析结果 +QA_HISTORY_DB = 191 # 问答记录 +TASK_STATUS_DB = 192 # 任务状态 +REFERENCE_DB = 193 # 文献信息 + +app = FastAPI() +# CORS configuration +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + expose_headers=["*"] +) +# 创建线程池 +pdf_thread_pool = ThreadPoolExecutor(max_workers=3) + +# Redis连接 +async def get_redis(): + redis = aioredis.from_url( + REDIS_URL, + encoding="utf-8", + decode_responses=True, + ) + return redis + +# 添加问答相关的模型 +class QuestionModel(BaseModel): + """问题模型""" + question: str + +class TaskStatus: + PENDING = "pending" + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + +@app.post("/upload") +async def upload_references( + files: List[UploadFile] = File(...) +): + """批量上传文献""" + redis = await get_redis() + + try: + uploaded_references = [] + + # 批量上传文件 + for file in files: + # 验证文件类型 + allowed_types = ["application/pdf", "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document"] + if file.content_type not in allowed_types: + continue # 跳过不支持的文件类型 + + # 确保上传目录存在 + os.makedirs(UPLOAD_PATH, exist_ok=True) + + # 生成安全的文件名 + file_extension = os.path.splitext(file.filename)[1] + safe_filename = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{os.urandom(4).hex()}{file_extension}" + file_path = os.path.join(UPLOAD_PATH, safe_filename) + + # 保存文件 + with open(file_path, "wb") as buffer: + content = await file.read() + buffer.write(content) + + # 生成引用ID + reference_id = f"{int(datetime.now().timestamp())}_{os.urandom(4).hex()}" + + # 创建引用记录 + await redis.select(REFERENCE_DB) + reference = { + "reference_link": file_path, + "reference_title": file.filename, + "upload_time": datetime.now(timezone.utc).isoformat() + } + await redis.hmset(f"reference:{reference_id}", reference) + + reference_info = { + "reference_id": reference_id, + "file_path": file_path, + "reference_title": reference["reference_title"] + } + uploaded_references.append(reference_info) + + # 为每个文献创建初始状态 + await redis.select(ANALYSIS_DB) + report_key = f"reference_report:{reference_id}" + initial_status = { + "status": "processing", + "message": "Analysis in progress" + } + await redis.set(report_key, json.dumps(initial_status)) + + # 在后台启动分析任务 + if uploaded_references: + asyncio.create_task(process_batch_analysis(uploaded_references)) + + return { + "message": f"Successfully uploaded {len(uploaded_references)} files", + "uploaded_files": uploaded_references + } + + except Exception as e: + print(f"Upload error: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + try: + await redis.aclose() + except Exception as e: + print(f"Error closing Redis connection: {e}") + +@app.post("/{reference_id}/qa") +async def ask_reference_question( + reference_id: str, + question: QuestionModel +): + """向文献提问(异步)""" + redis = await get_redis() + + try: + # 验证文献是否存在 + await redis.select(REFERENCE_DB) + reference = await redis.hgetall(f"reference:{reference_id}") + if not reference: + raise HTTPException(status_code=404, detail="文献不存在") + + # 生成任务ID + task_id = f"{int(datetime.now().timestamp())}_{os.urandom(4).hex()}" + + # 创建后台任务 + asyncio.create_task(process_reference_question( + task_id=task_id, + reference_id=reference_id, + question=question.question, + reference=reference + )) + + return { + "task_id": task_id, + "status": TaskStatus.PENDING, + "message": "问题已提交,正在处理中" + } + + except Exception as e: + print(f"Error in ask_reference_question: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + try: + await redis.aclose() + except Exception as e: + print(f"Error closing Redis connection: {e}") + +@app.get("/{task_id}") +async def get_task_status(task_id: str): + """获取任务状态和结果""" + redis = await get_redis() + + try: + await redis.select(TASK_STATUS_DB) + task_data = await redis.hgetall(f"task:{task_id}") + + if not task_data: + raise HTTPException(status_code=404, detail="任务不存在") + + response = { + "task_id": task_id, + "status": task_data.get("status", TaskStatus.PENDING) + } + + # 如果任务完成,添加结果 + if task_data.get("status") == TaskStatus.COMPLETED: + response.update({ + "answer": task_data.get("answer"), + "reference_title": task_data.get("reference_title") + }) + # 如果任务失败,添加错误信息 + elif task_data.get("status") == TaskStatus.FAILED: + response.update({ + "error": task_data.get("error") + }) + + return response + + except Exception as e: + print(f"Error getting task status: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + try: + await redis.aclose() + except Exception as e: + print(f"Error closing Redis connection: {e}") + +@app.get("/{reference_id}/qa/history") +async def get_reference_qa_history(reference_id: str): + """获取文献问答历史记录""" + redis = await get_redis() + + try: + # 从Redis获取对话历史 + await redis.select(QA_HISTORY_DB) + chat_history_key = f"chat_history:{reference_id}" + + history_data = await redis.get(chat_history_key) + if not history_data: + return [] + + return json.loads(history_data) + + except Exception as e: + print(f"Error getting QA history: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + try: + await redis.aclose() + except Exception as e: + print(f"Error closing Redis connection: {e}") + +@app.get("/{reference_id}/analysis") +async def get_reference_analysis(reference_id: str): + """获取文献分析结果""" + redis = await get_redis() + + try: + # 从Redis获取分析报告 + await redis.select(ANALYSIS_DB) + report_key = f"reference_report:{reference_id}" + report_data = await redis.get(report_key) + + if not report_data: + raise HTTPException(status_code=404, detail="文献分析报告不存在") + + return json.loads(report_data) + + except Exception as e: + print(f"Error getting analysis report: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + try: + await redis.aclose() + except Exception as e: + print(f"Error closing Redis connection: {e}") + +async def process_batch_analysis(references: List[dict]): + """批量处理文献分析的后台任务""" + redis = await get_redis() + + # 限制并发数量 + semaphore = asyncio.Semaphore(3) + + async def process_single_reference(ref: dict): + async with semaphore: + try: + reference_id = ref["reference_id"] + file_path = ref["file_path"] + + if not os.path.exists(file_path): + print(f"Reference file not found: {file_path}") + return + + print(f"\n开始处理文献 {ref.get('reference_title', '未知标题')}") + print(f"文献ID: {reference_id}") + print(f"文件路径: {file_path}") + + # 异步读取PDF + print("\n=== 步骤1: 读取PDF文件 ===") + pdf_content = await read_pdf_async(file_path) + if not pdf_content: + raise Exception("Failed to read PDF content") + + # 打印字符数 + content_length = len(pdf_content) + print(f"\n=== 步骤2: 内容长度检查 ===") + print(f"PDF内容总字符数: {content_length}") + + # 根据内容长度选择不同的处理方式 + if content_length <= 200000: + print(f"\n=== 步骤3A: 使用直接分析方式 ===") + print(f"文档长度在处理范围内 ({content_length} <= 200000)") + # 直接分析文档内容 + print("开始分析文档内容...") + document_analysis = await analyze_reference_document_async(pdf_content[:180000]) + if not document_analysis: + raise Exception("Failed to analyze document") + print("文档分析完成") + else: + print(f"\n=== 步骤3B: 使用分段分析方式 ===") + print(f"文档超过200000字符 ({content_length} > 200000)") + # 分段分析长文档 + print("\n--- 开始分段分析 ---") + print("正在调用 analyze_long_document_async...") + try: + analysis_results = await analyze_long_document_async(pdf_content) + except Exception as e: + print(f"分段分析过程中出错: {str(e)}") + raise + + if not analysis_results: + raise Exception("Failed to analyze document in segments") + print(f"分段分析完成,共分析了 {len(analysis_results)} 个段落") + + # 合并分析结果 + try: + document_analysis = await merge_analysis_results(analysis_results) + except Exception as e: + raise + + if not document_analysis: + raise Exception("Failed to merge analysis results") + + # 等待一小段时间避免API限制 + await asyncio.sleep(1) + + # 异步分析文献价值 + try: + value_evaluation = await analyze_reference_value_async(document_analysis) + print(f"analyze_reference_value_async 返回结果类型: {type(value_evaluation)}") + except Exception as e: + raise + + if not value_evaluation: + raise Exception("Failed to evaluate value") + print("文献价值分析完成") + + # 合并结果 + print("\n=== 步骤5: 保存最终结果 ===") + analysis_result = { + **document_analysis, + **value_evaluation, + "status": "completed", + "completion_time": datetime.now(timezone.utc).isoformat() + } + + # 保存结果 + await redis.select(ANALYSIS_DB) + report_key = f"reference_report:{reference_id}" + await redis.set(report_key, json.dumps(analysis_result)) + + except Exception as e: + try: + await redis.select(ANALYSIS_DB) + report_key = f"reference_report:{ref['reference_id']}" + error_status = { + "status": "failed", + "message": str(e), + "error_time": datetime.now(timezone.utc).isoformat() + } + await redis.set(report_key, json.dumps(error_status)) + except Exception as redis_error: + print(f"Error updating Redis status: {redis_error}") + + try: + # 并发处理所有引用 + await asyncio.gather( + *(process_single_reference(ref) for ref in references) + ) + finally: + try: + await redis.aclose() + except Exception as e: + print(f"Error closing Redis connection: {e}") + +# 在全局范围创建线程池 +pdf_thread_pool = ThreadPoolExecutor(max_workers=3) # 限制并发PDF处理数量 + +# 创建异步HTTP客户端会话 +async def get_aiohttp_session(): + return aiohttp.ClientSession( + base_url="https://api.deepseek.com/v1/", + headers={"Authorization": f"Bearer {DEEPSEEK_API_KEY}"} + ) + +async def read_pdf_async(file_path: str) -> str: + """在线程池中异步读取PDF""" + def read_pdf(): + try: + pdf_content = "" + with open(file_path, 'rb') as file: + pdf_reader = PyPDF2.PdfReader(file) + for page in pdf_reader.pages: + content = page.extract_text() + pdf_content += content + + # 根据内容长度返回不同的结果 + content_length = len(pdf_content) + print(f"\nPDF原始内容字符数: {content_length}") + + if content_length <= 180000: + print("文档长度 ≤ 180000,返回完整内容") + return pdf_content + elif content_length <= 200000: + print("文档长度在 180000-200000 之间,截取前 180000 个字符") + return pdf_content[:180000] + else: + print("文档长度 > 200000,返回完整内容供分段处理") + return pdf_content # 返回完整内容,由调用者处理分段 + + except Exception as e: + print(f"Error reading PDF: {e}") + return "" + + loop = asyncio.get_event_loop() + return await loop.run_in_executor(pdf_thread_pool, read_pdf) + +async def call_deepseek_api_async(messages: list) -> dict: + """异步调用DeepSeek API""" + async with await get_aiohttp_session() as session: + async with session.post("/chat/completions", json={ + "model": "deepseek-chat", + "messages": messages, + "response_format": {"type": "json_object"} + }) as response: + if response.status == 200: + data = await response.json() + return json.loads(data["choices"][0]["message"]["content"]) + else: + raise Exception(f"API调用失败: {await response.text()}") + +async def analyze_long_document_async(content: str) -> List[dict]: + """分段分析长文档""" + # 将内容分成多个段落,每段约60000个字符(估算后约50000 tokens) + segments = [] + content_length = len(content) + segment_size = 60000 # 减小段落大小 + + print(f"\n开始分段处理,总字符数: {content_length}") + print(f"每段大小: {segment_size} 字符") + + for i in range(0, content_length, segment_size): + segment = content[i:i + segment_size] + segments.append(segment) + + print(f"文档已分段,共 {len(segments)} 个段落") + + # 对每个段落进行分析 + analysis_results = [] + for i, segment in enumerate(segments): + print(f"\n开始分析第 {i+1}/{len(segments)} 个段落...") + + system_prompt = f""" + You are an AI assistant tasked with analyzing part {i+1} of {len(segments)} of an academic paper. + Generate a comprehensive analysis in JSON format covering both basic information and content analysis. + Note that this is part {i+1} of a longer document, so focus on the content provided. + The JSON structure must strictly follow the provided template. + """ + + user_prompt = f"""Analyze the following paper segment and extract all relevant information: + Content: {segment} + + Generate a JSON response with the following structure: + + {{ + "1. Basic Information": {{ + "author": "[Author name(s) and affiliations]", + "publication_date": "[Publication date in YYYY-MM format]", + "title": "[Full title of the document]", + "journal_publisher": "[Journal name or publisher details]", + "document_type": "[Type: journal article/book/conference paper etc.]" + }}, + "2. Content Analysis": {{ + "abstract": "[Paper abstract]", + "research_purpose": "[Main objectives and research questions]", + "methodology": "[Research methods, data collection and analysis approaches]", + "main_arguments": "[Key theoretical frameworks and arguments]", + "conclusions": "[Primary findings and conclusions]", + "innovations": "[Novel contributions and original aspects]" + }} + }} + """ + + try: + result = await call_deepseek_api_async([ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ]) + + if result: + print(f"第 {i+1} 个段落分析完成") + analysis_results.append(result) + # 等待一小段时间避免API限制 + await asyncio.sleep(1) + else: + print(f"第 {i+1} 个段落分析失败") + except Exception as e: + print(f"分析段落时出错: {str(e)}") + continue + + return analysis_results + +async def merge_analysis_results(results: List[dict]) -> dict: + """合并多个分析结果""" + if not results: + return {} + + print(f"开始合并 {len(results)} 个分析结果...") + + system_prompt = """ + You are an AI assistant tasked with merging multiple analysis results of different parts of the same academic paper. + Generate a comprehensive merged analysis in JSON format. + The JSON structure must strictly follow the provided template. + Ensure the merged result is coherent and eliminates redundancy. + """ + + user_prompt = f"""Merge the following analysis results into a single coherent analysis: + Analysis Results: {json.dumps(results, ensure_ascii=False)} + + Generate a JSON response with the following structure: + + {{ + "1. Basic Information": {{ + "author": "[Merged author information]", + "publication_date": "[Publication date]", + "title": "[Complete title]", + "journal_publisher": "[Journal/publisher information]", + "document_type": "[Document type]" + }}, + "2. Content Analysis": {{ + "abstract": "[Complete abstract]", + "research_purpose": "[Comprehensive research objectives]", + "methodology": "[Complete methodology description]", + "main_arguments": "[Comprehensive theoretical frameworks and arguments]", + "conclusions": "[Complete findings and conclusions]", + "innovations": "[Complete list of innovations]" + }} + }} + """ + + result = await call_deepseek_api_async([ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ]) + + if result: + print("分析结果合并完成") + else: + print("分析结果合并失败") + + return result + +async def analyze_reference_document_async(content: str): + """分析文献的基本信息和内容""" + system_prompt = """ + You are an AI assistant tasked with analyzing academic paper. + Generate a comprehensive analysis in JSON format covering both basic information and content analysis. + The JSON structure must strictly follow the provided template. + """ + + user_prompt = f"""Analyze the following paper and extract all relevant information: + Content: {content} + + Generate a JSON response with the following structure: + + {{ + "1. Basic Information": {{ + "author": "[Author name(s) and affiliations]", + "publication_date": "[Publication date in YYYY-MM format]", + "title": "[Full title of the document]", + "journal_publisher": "[Journal name or publisher details]", + "document_type": "[Type: journal article/book/conference paper etc.]" + }}, + "2. Content Analysis": {{ + "abstract": "[Paper abstract]", + "research_purpose": "[Main objectives and research questions]", + "methodology": "[Research methods, data collection and analysis approaches]", + "main_arguments": "[Key theoretical frameworks and arguments]", + "conclusions": "[Primary findings and conclusions]", + "innovations": "[Novel contributions and original aspects]" + }} + }} + """ + + return await call_deepseek_api_async([ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ]) + +async def analyze_reference_value_async(content_analysis: dict): + """基于内容分析结果评估文献的价值""" + system_prompt = """ + You are an AI assistant tasked with evaluating the value of academic paper based on its content analysis. + Generate a comprehensive value evaluation in JSON format. + The JSON structure must strictly follow the provided template. + """ + + user_prompt = f"""Based on the following content analysis, evaluate the paper's value: + Content Analysis: {json.dumps(content_analysis, ensure_ascii=False)} + + Generate a JSON response with the following structure: + + {{ + "3. Value Evaluation": {{ + "academic_contribution": "[Significance to the field of study]", + "practical_significance": "[Real-world applications and implications]", + "limitations": "[Research constraints and weaknesses]", + "implications": "[Suggestions for future research and practice]" + }} + }} + """ + + return await call_deepseek_api_async([ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ]) + + +async def process_reference_question(task_id: str, reference_id: str, question: str, reference: dict): + """处理文献问答的后台任务""" + redis = await get_redis() + + try: + # 更新任务状态为处理中 + await redis.select(TASK_STATUS_DB) + await redis.hset( + f"task:{task_id}", + mapping={ + "status": TaskStatus.PROCESSING, + "reference_id": reference_id, + "question": question + } + ) + + # 从Redis获取分析报告 + await redis.select(ANALYSIS_DB) + report_key = f"reference_report:{reference_id}" + report_data = await redis.get(report_key) + + if not report_data: + raise Exception("文献分析报告不存在,请先进行分析") + + # 读取PDF文件内容 + file_path = reference.get("reference_link") + if not file_path or not os.path.exists(file_path): + raise Exception("文献文件不存在") + + # 读取PDF内容 + pdf_content = "" + with open(file_path, 'rb') as file: + pdf_reader = PyPDF2.PdfReader(file) + for page in pdf_reader.pages: + content = page.extract_text() + pdf_content += content + + # 限制内容长度 + MAX_CHARS = 180000 + pdf_content = pdf_content[:MAX_CHARS] + + # 构建系统提示和用户提示 + system_prompt = """ + 你是一个专业的学术助手,负责回答关于学术文献的问题。 + 你应该基于文献内容和分析报告提供准确、专业的回答。 + 回答应当简洁明了,并尽可能引用文献中的具体内容。 + """ + + # 构建上下文 + context = { + "文献内容": pdf_content, + "分析报告": json.loads(report_data) + } + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"基于以下文献内容和分析报告回答问题:\n\n文献信息:{json.dumps(context, ensure_ascii=False)}\n\n问题:{question}"} + ] + + # 调用DeepSeek API获取回答 + response = await call_deepseek_api_async(messages) + answer = response.get("answer", "") + + # 保存对话历史到Redis + await redis.select(QA_HISTORY_DB) + chat_history_key = f"chat_history:{reference_id}" + + # 获取现有历史记录 + existing_history = await redis.get(chat_history_key) + history = json.loads(existing_history) if existing_history else [] + + # 添加新的对话 + history.append({ + "question": question, + "answer": answer, + "timestamp": datetime.now(timezone.utc).isoformat() + }) + + # 保存更新后的历史记录 + await redis.set(chat_history_key, json.dumps(history)) + + # 更新任务状态为完成 + await redis.select(TASK_STATUS_DB) + await redis.hset( + f"task:{task_id}", + mapping={ + "status": TaskStatus.COMPLETED, + "answer": answer, + "reference_title": reference.get("reference_title") + } + ) + + except Exception as e: + print(f"Error processing question: {e}") + # 更新任务状态为失败 + await redis.select(TASK_STATUS_DB) + await redis.hset( + f"task:{task_id}", + mapping={ + "status": TaskStatus.FAILED, + "error": str(e) + } + ) + finally: + try: + await redis.aclose() + except Exception as e: + print(f"Error closing Redis connection: {e}") + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=9005) \ No newline at end of file