```html 建造你自己的 ChatGPT 用于 PDF:全面指南以 AI 驱动的文档智能 ```
介绍
想要构建一个能够智能回答关于您的PDF文档的问题的系统吗?在这份全面指南中,我们将创建一个安全、可扩展的PDF问答系统,将向量搜索能力与大型语言模型的强大功能结合起来。这个系统不仅仅是关于搜索文本,更重要的是了解文档并提供相关、准确的答案,同时在不同团队和组织之间保持严格的安全边界。
```html
我们正在建设的内容
```我们的系统提供:
- 安全的团队隔离文档处理和存储
- 使用OpenAI的语言模型进行智能问答
- 高效的向量搜索与 Qdrant
- 授权范围内的文档和查询访问
- Here's the English text translated into simplified Chinese while keeping the HTML structure: ```html RESTful API 接口以便于集成 ```
```html
前提条件
```Sure! Here is the translation while keeping the HTML structure: ```html
在开始之前,请确保您已:
```- ```html Python 3.8 或更高版本已安装 ```
- 访问OpenAI API(Azure或普通)
- Qdrant 向量数据库(本地运行或在云中运行)
- Flask和REST API的基本理解
- 理解认证和授权原理
- 熟悉Python中的async/await模式
系统架构概述.
授权设计
```html 我们的系统实现了多租户架构,使用 team_id 作为主要授权范围: ```
- 每个团队都有一个独立的文件空间
- 所有操作(上传、查询)都针对特定团队进行范围限定。
- 跨团队访问被设计阻止
- 团队级别的速率限制和访问控制
- 不同组织之间的完全数据隔离
关键组成部分
PDF处理层:
- 文本提取和分块
- 元数据保护
- 团队范围的文档存储
矢量搜索层:
- 语义嵌入生成
- 高效的相似度搜索
- 团队-隔离的向量空间
答案生成层:
- 团队范围内的上下文检索
- AI 动力生成答案
- ```html
来源归属
```
```html API层: ```
- Here's the translation with the HTML structure preserved: ```html 安全终端 ```
- 授权中间件
- Here is the translation with the HTML structure kept intact: ```html 限流与监控 ```
详细的实施方案
1. OpenAI 和嵌入式设置
首先,让我们使用正确的配置来设置我们的人工智能组件:
from openai import AzureOpenAI
from sentence_transformers import SentenceTransformer
import os
from typing import Optional
class AIConfig:
"""
Configuration manager for AI services with security considerations
"""
def __init__(self):
# Load configuration from environment variables for security
self.openai_client = AzureOpenAI(
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_API_VERSION", "2024-02-01"),
azure_deployment=os.getenv("AZURE_DEPLOYMENT_NAME")
)
# Initialize embedding model
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
# Warm up the model to prevent cold starts
self._warmup()
def _warmup(self):
"""Warm up the embedding model"""
_ = self.embedding_model.encode("Warm up text")
def get_embedding(self, text: str) -> list:
"""Generate embeddings with error handling"""
try:
return self.embedding_model.encode(text).tolist()
except Exception as e:
print(f"Error generating embedding: {str(e)}")
raise
# Initialize global AI configuration
ai_config = AIConfig()
2. 向量数据库设置
Next, let’s configure our vector database with proper security measures: 接下来,让我们为我们的向量数据库配置适当的安全措施:
# src/vector_store.py
from qdrant_client import QdrantClient
from qdrant_client.http.models import NamedVector
from qdrant_client.models import PointStruct, Distance, VectorParams, models
from typing import List, Optional
from src.config import config
import logging
logger = logging.getLogger(__name__)
class VectorStore:
"""Secure vector storage management with team isolation"""
def __init__(self):
self.client = QdrantClient(
host=config.QDRANT_HOST,
port=config.QDRANT_PORT,
api_key=config.QDRANT_API_KEY,
https=config.QDRANT_HTTPS
)
self.collection_name = "pdf_embeddings"
self.embedding_dim = 384 # Dimension for all-MiniLM-L6-v2
def setup_collection(self) -> bool:
"""Create or recreate the vector collection"""
try:
# Remove existing collection if it exists
collections = self.client.get_collections().collections
if any(collection.name == self.collection_name for collection in collections):
self.client.delete_collection(self.collection_name)
logger.info(f"Deleted existing collection '{self.collection_name}'")
# Create new collection
self.client.create_collection(
collection_name=self.collection_name,
vectors_config={
"custom_vector": VectorParams(
size=self.embedding_dim,
distance=Distance.COSINE
)
}
)
logger.info(f"Created collection '{self.collection_name}'")
return True
except Exception as e:
logger.error(f"Error setting up collection: {str(e)}")
raise
def search_vectors(
self,
team_id: str,
query_vector: List[float],
limit: int = 10
) -> List[PointStruct]:
"""Search vectors within team's authorization scope"""
try:
team_filter = models.Filter(
must=[
models.FieldCondition(
key="team_id",
match=models.MatchValue(value=team_id)
)
]
)
# Use NamedVector for the query
return self.client.search(
collection_name=self.collection_name,
query_vector=NamedVector(
name="custom_vector",
vector=query_vector
),
query_filter=team_filter,
limit=limit
)
except Exception as e:
logger.error(f"Error searching vectors: {str(e)}")
raise
def upsert_points(self, points: List[PointStruct]) -> bool:
"""Insert or update points in the Qdrant collection"""
try:
response = self.client.upsert(
collection_name=self.collection_name,
points=points
)
logger.info(f"Successfully upserted {len(points)} points")
return True
except Exception as e:
logger.error(f"Error upserting points: {str(e)}")
raise
# Initialize global vector store
vector_store = VectorStore()
3. 安全的PDF处理流程
让我们通过适当的团队隔离来实现我们的PDF处理:
import pdfplumber
from typing import List, Any
from werkzeug.utils import secure_filename
from src.ai_service import ai_service
from src.vector_store import vector_store
from qdrant_client.models import PointStruct
import uuid
import logging
logger = logging.getLogger(__name__)
class DocumentProcessor:
"""Secure document processing with team isolation"""
def process_pdf(
self,
pdf_file: Any,
team_id: str,
doc_name: str,
document_id: str,
chunk_size: int = 500
) -> List[PointStruct]:
"""Process PDF with team-scoped authorization"""
try:
points = []
doc_name = secure_filename(doc_name)
with pdfplumber.open(pdf_file) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if not text:
continue
# Create chunks
chunks = self._create_chunks(text, chunk_size)
# Generate embeddings
embeddings = [ai_service.get_embedding(chunk) for chunk in chunks]
# Create points
points.extend(self._create_points(
chunks, embeddings, team_id, doc_name,
document_id, page_num
))
# Store vectors
if points:
vector_store.upsert_points(points)
return points
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
raise
def _create_chunks(self, text: str, chunk_size: int) -> List[str]:
"""Create overlapping chunks from text"""
chunks = []
for i in range(0, len(text), chunk_size):
chunk = text[max(0, i - 50):i + chunk_size]
chunks.append(chunk)
return chunks
def _create_points(
self,
chunks: List[str],
embeddings: List[List[float]],
team_id: str,
doc_name: str,
document_id: str,
page_num: int
) -> List[PointStruct]:
"""Create points for vector storage"""
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point = PointStruct(
id=str(uuid.uuid4()),
vector={"custom_vector": embedding},
payload={
"team_id": team_id,
"doc_name": doc_name,
"document_id": document_id,
"page_number": page_num,
"chunk_index": i,
"text": chunk,
"embedding_model": "all-MiniLM-L6-v2"
}
)
points.append(point)
return points
# Initialize global document processor
document_processor = DocumentProcessor()
4. 带有授权的答案生成
实现安全答案生成与团队隔离:
class AnswerGenerator:
"""
Generate answers within team authorization scope
"""
def __init__(self, ai_config: AIConfig, vector_store: VectorStore):
self.ai_config = ai_config
self.vector_store = vector_store
def generate_answer(self, team_id: str, question: str) -> Dict[str, Any]:
"""
Generate answers using only team-authorized documents
"""
# Validate authorization
if not self._validate_team_id(team_id):
return {
"answer": "Unauthorized access",
"sources": [],
"status": "error"
}
try:
# Generate question embedding
query_vector = self.ai_config.get_embedding(question)
# Get relevant documents within team scope
points = self.vector_store.search_vectors(team_id, query_vector, limit=15)
if not points:
return {
"answer": "No relevant documents found",
"sources": [],
"status": "no_context"
}
# Prepare context with source tracking
context_parts = []
sources = set()
seen_text = set()
for point in points:
if point.payload:
text = point.payload.get('text', '').strip()
# Deduplicate similar content
if text in seen_text:
continue
doc_info = f"[Document: {point.payload.get('doc_name')}, Page: {point.payload.get('page_number')}]"
context_parts.append(f"{doc_info}\n{text}")
sources.add((
point.payload.get('doc_name'),
point.payload.get('page_number')
))
seen_text.add(text)
# Generate answer using AI
messages = [
{
"role": "system",
"content": (
"You are a helpful assistant that provides accurate, "
"comprehensive answers based on the given context. "
"Always cite your sources using [Document: X, Page: Y] format."
)
},
{
"role": "user",
"content": (
f"Answer this question using only the context provided. "
f"If you cannot answer based on the context, say so.\n\n"
f"Context:\n{' '.join(context_parts)}\n\n"
f"Question: {question}"
)
}
]
response = self.ai_config.openai_client.chat.completions.create(
model=os.getenv("OPENAI_MODEL_DEPLOYMENT"),
messages=messages,
max_tokens=1000,
temperature=0.2
)
return {
"answer": response.choices[0].message.content.strip(),
"sources": list(sources),
"status": "success"
}
except Exception as e:
return {
"answer": "Error generating answer",
"sources": [],
"status": "error",
"error": str(e)
}
def _validate_team_id(self, team_id: str) -> bool:
"""
Validate team_id authorization
Implementation depends on your authentication system
"""
# Add your team validation logic here
return bool(team_id and isinstance(team_id, str))
# Initialize global answer generator
answer_generator = AnswerGenerator(ai_config, vector_store)
5. 安全API层
使用适当的授权和安全性实现API:
from flask import Flask, request, jsonify
from functools import wraps
import time
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
# Rate limiting configuration
RATE_LIMIT = {
"window": 60, # seconds
"max_requests": 100 # requests per window
}
class RateLimiter:
"""Simple in-memory rate limiter"""
def __init__(self):
self.requests = {}
def is_allowed(self, team_id: str) -> bool:
now = time.time()
team_requests = self.requests.get(team_id, [])
# Clean old requests
team_requests = [req_time for req_time in team_requests
if now - req_time < RATE_LIMIT["window"]]
if len(team_requests) >= RATE_LIMIT["max_requests"]:
return False
team_requests.append(now)
self.requests[team_id] = team_requests
return True
rate_limiter = RateLimiter()
def require_team_auth(f):
"""Authorization middleware"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Get team_id from request
team_id = request.form.get('team_id') or request.json.get('team_id')
if not team_id:
return jsonify({"error": "team_id is required"}), 401
# Check rate limit
if not rate_limiter.is_allowed(team_id):
return jsonify({"error": "Rate limit exceeded"}), 429
# Add your additional authorization checks here
# For example, validating JWT tokens, checking team membership, etc.
return f(*args, **kwargs)
return decorated_function
@app.post('/answer')
@require_team_auth
def get_answer():
"""
Generate answer for question within team scope
"""
try:
data = request.json
team_id = data.get('team_id')
question = data.get('question')
if not question:
return jsonify({"error": "Question is required"}), 400
response = answer_generator.generate_answer(team_id, question)
return jsonify(response)
except Exception as e:
return jsonify({"error": str(e)}), 500
```python
@app.route("/upload", methods=['POST'])
@require_team_auth
def upload_file():
"""
Upload and process PDF within team scope
"""
try:
# Validate request
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
if not file.filename.endswith('.pdf'):
return jsonify({"error": "Only PDF files are allowed"}), 400
team_id = request.form['team_id']
document_id = request.form['document_id']
# Process file with team scope
chunks = doc_processor.process_pdf(
pdf_file=file,
team_id=team_id,
doc_name=secure_filename(file.filename),
document_id=document_id
)
return jsonify({
"status": "success",
"chunks_processed": len(chunks),
"document_id": document_id
})
except Exception as e:
return jsonify({
"error": str(e),
"status": "error"
}), 500
@app.route("/documents", methods=['GET'])
@require_team_auth
def list_documents():
"""
List documents available for a team
"""
team_id = request.args.get('team_id')
try:
# Query vector store for team's documents
filter_query = Filter(
must=[
FieldCondition(
key="team_id",
match=MatchValue(value=team_id)
)
]
)
# Get unique documents
points = vector_store.client.scroll(
collection_name=vector_store.collection_name,
scroll_filter=filter_query,
limit=1000 # Adjust based on your needs
)
# Extract unique document information
documents = set()
for point in points[0]: # points[0] contains the actual points
if point.payload:
documents.add((
point.payload.get('document_id'),
point.payload.get('doc_name')
))
return jsonify({
"status": "success",
"documents": [
{"id": doc_id, "name": doc_name}
for doc_id, doc_name in documents
]
})
except Exception as e:
return jsonify({
"error": str(e),
"status": "error"
}), 500
# Environmental configuration
if __name__ == "__main__":
# Initialize vector store collection
vector_store.setup_collection()
# Configure server
app.run(
host='0.0.0.0',
port=int(os.getenv('PORT', 8000)),
debug=os.getenv('DEBUG', 'False').lower() == 'true',
ssl_context='adhoc' if os.getenv('ENABLE_HTTPS', 'False').lower() == 'true' else None
)
```html 系统使用示例 ```
Here's the translated text maintaining the HTML structure: ```html 1. 上传文档 ```
import requests
def upload_document(file_path: str, team_id: str, document_id: str):
"""Example: Upload a PDF document"""
with open(file_path, 'rb') as file:
response = requests.post(
'http://localhost:8000/upload',
files={'file': file},
data={
'team_id': team_id,
'document_id': document_id
}
)
return response.json()
```html 2. 提问 ```
def ask_question(team_id: str, question: str):
"""Example: Ask a question about uploaded documents"""
response = requests.post(
'http://localhost:8000/answer',
json={
'team_id': team_id,
'question': question
}
)
return response.json()
Certainly! Here’s the translation while keeping the HTML structure intact: ```html Deployment Guide 部署指南 ```
```html
Docker 设置
```为应用程序创建一个Dockerfile:
FROM python:3.9-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
# Run the application
CMD ["python", "app.py"]
Docker Compose配置
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
- AZURE_OPENAI_API_KEY=${AZURE_OPENAI_API_KEY}
- AZURE_API_VERSION=${AZURE_API_VERSION}
- AZURE_DEPLOYMENT_NAME=${AZURE_DEPLOYMENT_NAME}
- QDRANT_HOST=qdrant
- QDRANT_PORT=6333
- ENABLE_HTTPS=false
depends_on:
- qdrant
networks:
- app-network
qdrant:
image: qdrant/qdrant
ports:
- "6333:6333"
volumes:
- qdrant_data:/qdrant/storage
networks:
- app-network
networks:
app-network:
driver: bridge
volumes:
qdrant_data:
2. 安全头部
实现安全头部中间件:
from flask_talisman import Talisman
# Initialize Talisman with security headers
Talisman(app,
force_https=True,
strict_transport_security=True,
session_cookie_secure=True,
content_security_policy={
'default-src': "'self'",
'img-src': '*',
'script-src': "'self'"
}
)
监控和维护
Here is the translation while keeping the HTML structure intact: ```html 1. 健康检查实施 ```
@app.route("/health", methods=['GET'])
def health_check():
"""System health check endpoint"""
try:
# Check components
health_status = {
"vector_store": "healthy",
"openai": "healthy",
"timestamp": time.time()
}
# Test vector store
vector_store.client.get_collections()
# Test OpenAI connection
ai_config.get_embedding("test")
return jsonify(health_status)
except Exception as e:
return jsonify({
"status": "unhealthy",
"error": str(e)
}), 500
2. 记录配置
import logging.config
# Configure logging
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
},
},
'handlers': {
'default': {
'level': 'INFO',
'formatter': 'standard',
'class': 'logging.StreamHandler',
},
'file': {
'level': 'INFO',
'formatter': 'standard',
'class': 'logging.FileHandler',
'filename': 'app.log',
'mode': 'a',
},
},
'loggers': {
'': {
'handlers': ['default', 'file'],
'level': 'INFO',
'propagate': True
}
}
})
logger = logging.getLogger(__name__)
```html 结论 ```
```html
该实现为构建 PDF 问答系统提供了一个强大、安全的基础。主要特点包括:
```安全
- 基于团队的隔离
- Sure! Here’s the translation while keeping the HTML structure: ```html Rate limiting 限速 ``` If you need this in a specific HTML tag or format, please let me know!
- Here is the translation while keeping the HTML structure:
```html
安全文件处理
``` - 授权中间件
可伸缩性
- Docker 容器化
- 高效的矢量搜索
- Here is the translated text with the HTML structure kept intact: ```html Modular design 模块化设计 ```
可维护性
- Sure! Here’s the translation of "Comprehensive logging" into simplified Chinese while keeping the HTML structure: ```html Comprehensive logging 全面日志记录 ``` If you need it wrapped in specific HTML tags, please let me know!
- 健康监测
- 清晰的文档
Sure! Here is the translation, keeping the HTML structure: ```html 记得: ```
- 保持依赖项更新
- ```html 监控系统性能 ```
- 定期备份矢量数据。
- Sure! Here’s the translation while keeping the HTML structure: ```html 审核安全配置 ```
- 在部署之前请彻底测试
这个指南为构建智能文档系统提供了基础。 准备在您的组织中实施它或需要定制解决方案的帮助? 我可以提供选择的咨询项目和技术咨询角色,专注于生产级AI系统。 让我们讨论一下您的实施方案:me@arif.sh
完整的源代码:github.com/doganarif/pdf-gpt-vectordb-qa
如果您觉得这个指南有帮助,请给仓库点⭐️!
⚡ 快乐建筑!