```html 建造你自己的 ChatGPT 用于 PDF:全面指南以 AI 驱动的文档智能 ```

Photo by Humberto Santos on Unsplash

介绍

想要构建一个能够智能回答关于您的PDF文档的问题的系统吗?在这份全面指南中,我们将创建一个安全、可扩展的PDF问答系统,将向量搜索能力与大型语言模型的强大功能结合起来。这个系统不仅仅是关于搜索文本,更重要的是了解文档并提供相关、准确的答案,同时在不同团队和组织之间保持严格的安全边界。

```html

我们正在建设的内容

```

我们的系统提供:

  • 安全的团队隔离文档处理和存储
  • 使用OpenAI的语言模型进行智能问答
  • 高效的向量搜索与 Qdrant
  • 授权范围内的文档和查询访问
  • Here's the English text translated into simplified Chinese while keeping the HTML structure: ```html RESTful API 接口以便于集成 ```

```html

前提条件

```

Sure! Here is the translation while keeping the HTML structure: ```html

在开始之前,请确保您已:

```
  • ```html Python 3.8 或更高版本已安装 ```
  • 访问OpenAI API(Azure或普通)
  • Qdrant 向量数据库(本地运行或在云中运行)
  • Flask和REST API的基本理解
  • 理解认证和授权原理
  • 熟悉Python中的async/await模式

系统架构概述.

授权设计

```html 我们的系统实现了多租户架构,使用 team_id 作为主要授权范围: ```

  • 每个团队都有一个独立的文件空间
  • 所有操作(上传、查询)都针对特定团队进行范围限定。
  • 跨团队访问被设计阻止
  • 团队级别的速率限制和访问控制
  • 不同组织之间的完全数据隔离

关键组成部分

PDF处理层:

  • 文本提取和分块
  • 元数据保护
  • 团队范围的文档存储

矢量搜索层:

  • 语义嵌入生成
  • 高效的相似度搜索
  • 团队-隔离的向量空间

答案生成层:

  • 团队范围内的上下文检索
  • AI 动力生成答案
  • ```html

    来源归属

    ```

```html API层: ```

  • Here's the translation with the HTML structure preserved: ```html 安全终端 ```
  • 授权中间件
  • Here is the translation with the HTML structure kept intact: ```html 限流与监控 ```

详细的实施方案

1. OpenAI 和嵌入式设置

首先,让我们使用正确的配置来设置我们的人工智能组件:

from openai import AzureOpenAI
from sentence_transformers import SentenceTransformer
import os
from typing import Optional

class AIConfig:
"""
Configuration manager for AI services with security considerations
"""
def __init__(self):
# Load configuration from environment variables for security
self.openai_client = AzureOpenAI(
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_API_VERSION", "2024-02-01"),
azure_deployment=os.getenv("AZURE_DEPLOYMENT_NAME")
)

# Initialize embedding model
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

# Warm up the model to prevent cold starts
self._warmup()

def _warmup(self):
"""Warm up the embedding model"""
_ = self.embedding_model.encode("Warm up text")

def get_embedding(self, text: str) -> list:
"""Generate embeddings with error handling"""
try:
return self.embedding_model.encode(text).tolist()
except Exception as e:
print(f"Error generating embedding: {str(e)}")
raise

# Initialize global AI configuration
ai_config = AIConfig()

2. 向量数据库设置

Next, let’s configure our vector database with proper security measures: 接下来,让我们为我们的向量数据库配置适当的安全措施:

# src/vector_store.py
from qdrant_client import QdrantClient
from qdrant_client.http.models import NamedVector
from qdrant_client.models import PointStruct, Distance, VectorParams, models
from typing import List, Optional
from src.config import config
import logging

logger = logging.getLogger(__name__)


class VectorStore:
"""Secure vector storage management with team isolation"""

def __init__(self):
self.client = QdrantClient(
host=config.QDRANT_HOST,
port=config.QDRANT_PORT,
api_key=config.QDRANT_API_KEY,
https=config.QDRANT_HTTPS
)
self.collection_name = "pdf_embeddings"
self.embedding_dim = 384 # Dimension for all-MiniLM-L6-v2

def setup_collection(self) -> bool:
"""Create or recreate the vector collection"""
try:
# Remove existing collection if it exists
collections = self.client.get_collections().collections
if any(collection.name == self.collection_name for collection in collections):
self.client.delete_collection(self.collection_name)
logger.info(f"Deleted existing collection '{self.collection_name}'")

# Create new collection
self.client.create_collection(
collection_name=self.collection_name,
vectors_config={
"custom_vector": VectorParams(
size=self.embedding_dim,
distance=Distance.COSINE
)
}
)
logger.info(f"Created collection '{self.collection_name}'")
return True

except Exception as e:
logger.error(f"Error setting up collection: {str(e)}")
raise

def search_vectors(
self,
team_id: str,
query_vector: List[float],
limit: int = 10
) -> List[PointStruct]:
"""Search vectors within team's authorization scope"""
try:
team_filter = models.Filter(
must=[
models.FieldCondition(
key="team_id",
match=models.MatchValue(value=team_id)
)
]
)

# Use NamedVector for the query
return self.client.search(
collection_name=self.collection_name,
query_vector=NamedVector(
name="custom_vector",
vector=query_vector
),
query_filter=team_filter,
limit=limit
)

except Exception as e:
logger.error(f"Error searching vectors: {str(e)}")
raise

def upsert_points(self, points: List[PointStruct]) -> bool:
"""Insert or update points in the Qdrant collection"""
try:
response = self.client.upsert(
collection_name=self.collection_name,
points=points
)
logger.info(f"Successfully upserted {len(points)} points")
return True

except Exception as e:
logger.error(f"Error upserting points: {str(e)}")
raise


# Initialize global vector store
vector_store = VectorStore()

3. 安全的PDF处理流程

让我们通过适当的团队隔离来实现我们的PDF处理:

import pdfplumber
from typing import List, Any
from werkzeug.utils import secure_filename
from src.ai_service import ai_service
from src.vector_store import vector_store
from qdrant_client.models import PointStruct
import uuid
import logging

logger = logging.getLogger(__name__)


class DocumentProcessor:
"""Secure document processing with team isolation"""

def process_pdf(
self,
pdf_file: Any,
team_id: str,
doc_name: str,
document_id: str,
chunk_size: int = 500
) -> List[PointStruct]:
"""Process PDF with team-scoped authorization"""
try:
points = []
doc_name = secure_filename(doc_name)

with pdfplumber.open(pdf_file) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if not text:
continue

# Create chunks
chunks = self._create_chunks(text, chunk_size)

# Generate embeddings
embeddings = [ai_service.get_embedding(chunk) for chunk in chunks]

# Create points
points.extend(self._create_points(
chunks, embeddings, team_id, doc_name,
document_id, page_num
))

# Store vectors
if points:
vector_store.upsert_points(points)

return points

except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
raise

def _create_chunks(self, text: str, chunk_size: int) -> List[str]:
"""Create overlapping chunks from text"""
chunks = []
for i in range(0, len(text), chunk_size):
chunk = text[max(0, i - 50):i + chunk_size]
chunks.append(chunk)
return chunks

def _create_points(
self,
chunks: List[str],
embeddings: List[List[float]],
team_id: str,
doc_name: str,
document_id: str,
page_num: int
) -> List[PointStruct]:
"""Create points for vector storage"""
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point = PointStruct(
id=str(uuid.uuid4()),
vector={"custom_vector": embedding},
payload={
"team_id": team_id,
"doc_name": doc_name,
"document_id": document_id,
"page_number": page_num,
"chunk_index": i,
"text": chunk,
"embedding_model": "all-MiniLM-L6-v2"
}
)
points.append(point)
return points


# Initialize global document processor
document_processor = DocumentProcessor()

4. 带有授权的答案生成

实现安全答案生成与团队隔离:

class AnswerGenerator:
"""
Generate answers within team authorization scope
"""
def __init__(self, ai_config: AIConfig, vector_store: VectorStore):
self.ai_config = ai_config
self.vector_store = vector_store

def generate_answer(self, team_id: str, question: str) -> Dict[str, Any]:
"""
Generate answers using only team-authorized documents
"""
# Validate authorization
if not self._validate_team_id(team_id):
return {
"answer": "Unauthorized access",
"sources": [],
"status": "error"
}

try:
# Generate question embedding
query_vector = self.ai_config.get_embedding(question)

# Get relevant documents within team scope
points = self.vector_store.search_vectors(team_id, query_vector, limit=15)

if not points:
return {
"answer": "No relevant documents found",
"sources": [],
"status": "no_context"
}

# Prepare context with source tracking
context_parts = []
sources = set()
seen_text = set()

for point in points:
if point.payload:
text = point.payload.get('text', '').strip()
# Deduplicate similar content
if text in seen_text:
continue

doc_info = f"[Document: {point.payload.get('doc_name')}, Page: {point.payload.get('page_number')}]"
context_parts.append(f"{doc_info}\n{text}")
sources.add((
point.payload.get('doc_name'),
point.payload.get('page_number')
))
seen_text.add(text)

# Generate answer using AI
messages = [
{
"role": "system",
"content": (
"You are a helpful assistant that provides accurate, "
"comprehensive answers based on the given context. "
"Always cite your sources using [Document: X, Page: Y] format."
)
},
{
"role": "user",
"content": (
f"Answer this question using only the context provided. "
f"If you cannot answer based on the context, say so.\n\n"
f"Context:\n{' '.join(context_parts)}\n\n"
f"Question: {question}"
)
}
]

response = self.ai_config.openai_client.chat.completions.create(
model=os.getenv("OPENAI_MODEL_DEPLOYMENT"),
messages=messages,
max_tokens=1000,
temperature=0.2
)

return {
"answer": response.choices[0].message.content.strip(),
"sources": list(sources),
"status": "success"
}

except Exception as e:
return {
"answer": "Error generating answer",
"sources": [],
"status": "error",
"error": str(e)
}

def _validate_team_id(self, team_id: str) -> bool:
"""
Validate team_id authorization
Implementation depends on your authentication system
"""
# Add your team validation logic here
return bool(team_id and isinstance(team_id, str))

# Initialize global answer generator
answer_generator = AnswerGenerator(ai_config, vector_store)

5. 安全API层

使用适当的授权和安全性实现API:

from flask import Flask, request, jsonify
from functools import wraps
import time

app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size

# Rate limiting configuration
RATE_LIMIT = {
"window": 60, # seconds
"max_requests": 100 # requests per window
}

class RateLimiter:
"""Simple in-memory rate limiter"""
def __init__(self):
self.requests = {}

def is_allowed(self, team_id: str) -> bool:
now = time.time()
team_requests = self.requests.get(team_id, [])

# Clean old requests
team_requests = [req_time for req_time in team_requests
if now - req_time < RATE_LIMIT["window"]]

if len(team_requests) >= RATE_LIMIT["max_requests"]:
return False

team_requests.append(now)
self.requests[team_id] = team_requests
return True

rate_limiter = RateLimiter()

def require_team_auth(f):
"""Authorization middleware"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Get team_id from request
team_id = request.form.get('team_id') or request.json.get('team_id')

if not team_id:
return jsonify({"error": "team_id is required"}), 401

# Check rate limit
if not rate_limiter.is_allowed(team_id):
return jsonify({"error": "Rate limit exceeded"}), 429

# Add your additional authorization checks here
# For example, validating JWT tokens, checking team membership, etc.

return f(*args, **kwargs)
return decorated_function

@app.post('/answer')
@require_team_auth
def get_answer():
"""
Generate answer for question within team scope
"""
try:
data = request.json
team_id = data.get('team_id')
question = data.get('question')

if not question:
return jsonify({"error": "Question is required"}), 400

response = answer_generator.generate_answer(team_id, question)
return jsonify(response)

except Exception as e:
return jsonify({"error": str(e)}), 500

```python
@app.route("/upload", methods=['POST'])
@require_team_auth
def upload_file():
"""
Upload and process PDF within team scope
"""
try:
# Validate request
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400

file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400

if not file.filename.endswith('.pdf'):
return jsonify({"error": "Only PDF files are allowed"}), 400

team_id = request.form['team_id']
document_id = request.form['document_id']

# Process file with team scope
chunks = doc_processor.process_pdf(
pdf_file=file,
team_id=team_id,
doc_name=secure_filename(file.filename),
document_id=document_id
)

return jsonify({
"status": "success",
"chunks_processed": len(chunks),
"document_id": document_id
})

except Exception as e:
return jsonify({
"error": str(e),
"status": "error"
}), 500

@app.route("/documents", methods=['GET'])
@require_team_auth
def list_documents():
"""
List documents available for a team
"""
team_id = request.args.get('team_id')

try:
# Query vector store for team's documents
filter_query = Filter(
must=[
FieldCondition(
key="team_id",
match=MatchValue(value=team_id)
)
]
)

# Get unique documents
points = vector_store.client.scroll(
collection_name=vector_store.collection_name,
scroll_filter=filter_query,
limit=1000 # Adjust based on your needs
)

# Extract unique document information
documents = set()
for point in points[0]: # points[0] contains the actual points
if point.payload:
documents.add((
point.payload.get('document_id'),
point.payload.get('doc_name')
))

return jsonify({
"status": "success",
"documents": [
{"id": doc_id, "name": doc_name}
for doc_id, doc_name in documents
]
})

except Exception as e:
return jsonify({
"error": str(e),
"status": "error"
}), 500

# Environmental configuration
if __name__ == "__main__":
# Initialize vector store collection
vector_store.setup_collection()

# Configure server
app.run(
host='0.0.0.0',
port=int(os.getenv('PORT', 8000)),
debug=os.getenv('DEBUG', 'False').lower() == 'true',
ssl_context='adhoc' if os.getenv('ENABLE_HTTPS', 'False').lower() == 'true' else None
)

```html 系统使用示例 ```

Here's the translated text maintaining the HTML structure: ```html 1. 上传文档 ```

import requests

def upload_document(file_path: str, team_id: str, document_id: str):
"""Example: Upload a PDF document"""
with open(file_path, 'rb') as file:
response = requests.post(
'http://localhost:8000/upload',
files={'file': file},
data={
'team_id': team_id,
'document_id': document_id
}
)
return response.json()

```html 2. 提问 ```

def ask_question(team_id: str, question: str):
"""Example: Ask a question about uploaded documents"""
response = requests.post(
'http://localhost:8000/answer',
json={
'team_id': team_id,
'question': question
}
)
return response.json()

Certainly! Here’s the translation while keeping the HTML structure intact: ```html Deployment Guide 部署指南 ```

```html

Docker 设置

```

为应用程序创建一个Dockerfile:

FROM python:3.9-slim

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*

# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

# Run the application
CMD ["python", "app.py"]

Docker Compose配置

version: '3.8'

services:
app:
build: .
ports:
- "8000:8000"
environment:
- AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
- AZURE_OPENAI_API_KEY=${AZURE_OPENAI_API_KEY}
- AZURE_API_VERSION=${AZURE_API_VERSION}
- AZURE_DEPLOYMENT_NAME=${AZURE_DEPLOYMENT_NAME}
- QDRANT_HOST=qdrant
- QDRANT_PORT=6333
- ENABLE_HTTPS=false
depends_on:
- qdrant
networks:
- app-network

qdrant:
image: qdrant/qdrant
ports:
- "6333:6333"
volumes:
- qdrant_data:/qdrant/storage
networks:
- app-network

networks:
app-network:
driver: bridge

volumes:
qdrant_data:

2. 安全头部

实现安全头部中间件:

from flask_talisman import Talisman

# Initialize Talisman with security headers
Talisman(app,
force_https=True,
strict_transport_security=True,
session_cookie_secure=True,
content_security_policy={
'default-src': "'self'",
'img-src': '*',
'script-src': "'self'"
}
)

监控和维护

Here is the translation while keeping the HTML structure intact: ```html 1. 健康检查实施 ```

@app.route("/health", methods=['GET'])
def health_check():
"""System health check endpoint"""
try:
# Check components
health_status = {
"vector_store": "healthy",
"openai": "healthy",
"timestamp": time.time()
}

# Test vector store
vector_store.client.get_collections()

# Test OpenAI connection
ai_config.get_embedding("test")

return jsonify(health_status)

except Exception as e:
return jsonify({
"status": "unhealthy",
"error": str(e)
}), 500

2. 记录配置

import logging.config

# Configure logging
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
},
},
'handlers': {
'default': {
'level': 'INFO',
'formatter': 'standard',
'class': 'logging.StreamHandler',
},
'file': {
'level': 'INFO',
'formatter': 'standard',
'class': 'logging.FileHandler',
'filename': 'app.log',
'mode': 'a',
},
},
'loggers': {
'': {
'handlers': ['default', 'file'],
'level': 'INFO',
'propagate': True
}
}
})

logger = logging.getLogger(__name__)

```html 结论 ```

```html

该实现为构建 PDF 问答系统提供了一个强大、安全的基础。主要特点包括:

```

安全

  • 基于团队的隔离
  • Sure! Here’s the translation while keeping the HTML structure: ```html Rate limiting 限速 ``` If you need this in a specific HTML tag or format, please let me know!
  • Here is the translation while keeping the HTML structure: ```html

    安全文件处理

    ```
  • 授权中间件

可伸缩性

  • Docker 容器化
  • 高效的矢量搜索
  • Here is the translated text with the HTML structure kept intact: ```html Modular design 模块化设计 ```

可维护性

  • Sure! Here’s the translation of "Comprehensive logging" into simplified Chinese while keeping the HTML structure: ```html Comprehensive logging 全面日志记录 ``` If you need it wrapped in specific HTML tags, please let me know!
  • 健康监测
  • 清晰的文档

Sure! Here is the translation, keeping the HTML structure: ```html 记得: ```

  • 保持依赖项更新
  • ```html 监控系统性能 ```
  • 定期备份矢量数据。
  • Sure! Here’s the translation while keeping the HTML structure: ```html 审核安全配置 ```
  • 在部署之前请彻底测试

这个指南为构建智能文档系统提供了基础。 准备在您的组织中实施它或需要定制解决方案的帮助? 我可以提供选择的咨询项目和技术咨询角色,专注于生产级AI系统。 让我们讨论一下您的实施方案:me@arif.sh

完整的源代码:github.com/doganarif/pdf-gpt-vectordb-qa

如果您觉得这个指南有帮助,请给仓库点⭐️!

⚡ 快乐建筑!

2024-11-13 04:49:16 AI中文站翻译自原文