Introduction
Ever wanted to build a system that can intelligently answer questions about your PDF documents? In this comprehensive guide, we’ll create a secure, scalable PDF Question-Answering system that combines vector search capabilities with the power of large language models. This system isn’t just about searching text — it’s about understanding documents and providing relevant, accurate answers while maintaining strict security boundaries between different teams and organizations.
What We’re Building
Our system provides:
- Secure, team-isolated document processing and storage
- Intelligent question answering using OpenAI’s language models
- Efficient vector search with Qdrant
- Authorization-scoped access to documents and queries
- RESTful API interface for easy integration
Prerequisites
Before starting, ensure you have:
- Python 3.8 or higher installed
- Access to OpenAI API (Azure or regular)
- Qdrant vector database (running locally or in cloud)
- Basic understanding of Flask and REST APIs
- Understanding of authentication and authorization principles
- Familiarity with async/await patterns in Python
System Architecture Overview
Authorization Design
Our system implements a multi-tenant architecture using team_id
as the primary authorization scope:
- Each team has an isolated document space
- All operations (uploads, queries) are scoped to specific teams
- Cross-team access is prevented by design
- Team-level rate limiting and access control
- Complete data isolation between different organizations
Key Components
PDF Processing Layer:
- Text extraction and chunking
- Metadata preservation
- Team-scoped document storage
Vector Search Layer:
- Semantic embedding generation
- Efficient similarity search
- Team-isolated vector spaces
Answer Generation Layer:
- Context retrieval within team scope
- AI-powered answer generation
- Source attribution
API Layer:
- Secure endpoints
- Authorization middleware
- Rate limiting and monitoring
Detailed Implementation
1. OpenAI and Embedding Setup
First, let’s set up our AI components with proper configuration:
from openai import AzureOpenAI
from sentence_transformers import SentenceTransformer
import os
from typing import Optional
class AIConfig:
"""
Configuration manager for AI services with security considerations
"""
def __init__(self):
# Load configuration from environment variables for security
self.openai_client = AzureOpenAI(
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_API_VERSION", "2024-02-01"),
azure_deployment=os.getenv("AZURE_DEPLOYMENT_NAME")
)
# Initialize embedding model
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
# Warm up the model to prevent cold starts
self._warmup()
def _warmup(self):
"""Warm up the embedding model"""
_ = self.embedding_model.encode("Warm up text")
def get_embedding(self, text: str) -> list:
"""Generate embeddings with error handling"""
try:
return self.embedding_model.encode(text).tolist()
except Exception as e:
print(f"Error generating embedding: {str(e)}")
raise
# Initialize global AI configuration
ai_config = AIConfig()
2. Vector Database Setup
Next, let’s configure our vector database with proper security measures:
# src/vector_store.py
from qdrant_client import QdrantClient
from qdrant_client.http.models import NamedVector
from qdrant_client.models import PointStruct, Distance, VectorParams, models
from typing import List, Optional
from src.config import config
import logging
logger = logging.getLogger(__name__)
class VectorStore:
"""Secure vector storage management with team isolation"""
def __init__(self):
self.client = QdrantClient(
host=config.QDRANT_HOST,
port=config.QDRANT_PORT,
api_key=config.QDRANT_API_KEY,
https=config.QDRANT_HTTPS
)
self.collection_name = "pdf_embeddings"
self.embedding_dim = 384 # Dimension for all-MiniLM-L6-v2
def setup_collection(self) -> bool:
"""Create or recreate the vector collection"""
try:
# Remove existing collection if it exists
collections = self.client.get_collections().collections
if any(collection.name == self.collection_name for collection in collections):
self.client.delete_collection(self.collection_name)
logger.info(f"Deleted existing collection '{self.collection_name}'")
# Create new collection
self.client.create_collection(
collection_name=self.collection_name,
vectors_config={
"custom_vector": VectorParams(
size=self.embedding_dim,
distance=Distance.COSINE
)
}
)
logger.info(f"Created collection '{self.collection_name}'")
return True
except Exception as e:
logger.error(f"Error setting up collection: {str(e)}")
raise
def search_vectors(
self,
team_id: str,
query_vector: List[float],
limit: int = 10
) -> List[PointStruct]:
"""Search vectors within team's authorization scope"""
try:
team_filter = models.Filter(
must=[
models.FieldCondition(
key="team_id",
match=models.MatchValue(value=team_id)
)
]
)
# Use NamedVector for the query
return self.client.search(
collection_name=self.collection_name,
query_vector=NamedVector(
name="custom_vector",
vector=query_vector
),
query_filter=team_filter,
limit=limit
)
except Exception as e:
logger.error(f"Error searching vectors: {str(e)}")
raise
def upsert_points(self, points: List[PointStruct]) -> bool:
"""Insert or update points in the Qdrant collection"""
try:
response = self.client.upsert(
collection_name=self.collection_name,
points=points
)
logger.info(f"Successfully upserted {len(points)} points")
return True
except Exception as e:
logger.error(f"Error upserting points: {str(e)}")
raise
# Initialize global vector store
vector_store = VectorStore()
3. Secure PDF Processing Pipeline
Let’s implement our PDF processing with proper team isolation:
import pdfplumber
from typing import List, Any
from werkzeug.utils import secure_filename
from src.ai_service import ai_service
from src.vector_store import vector_store
from qdrant_client.models import PointStruct
import uuid
import logging
logger = logging.getLogger(__name__)
class DocumentProcessor:
"""Secure document processing with team isolation"""
def process_pdf(
self,
pdf_file: Any,
team_id: str,
doc_name: str,
document_id: str,
chunk_size: int = 500
) -> List[PointStruct]:
"""Process PDF with team-scoped authorization"""
try:
points = []
doc_name = secure_filename(doc_name)
with pdfplumber.open(pdf_file) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if not text:
continue
# Create chunks
chunks = self._create_chunks(text, chunk_size)
# Generate embeddings
embeddings = [ai_service.get_embedding(chunk) for chunk in chunks]
# Create points
points.extend(self._create_points(
chunks, embeddings, team_id, doc_name,
document_id, page_num
))
# Store vectors
if points:
vector_store.upsert_points(points)
return points
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
raise
def _create_chunks(self, text: str, chunk_size: int) -> List[str]:
"""Create overlapping chunks from text"""
chunks = []
for i in range(0, len(text), chunk_size):
chunk = text[max(0, i - 50):i + chunk_size]
chunks.append(chunk)
return chunks
def _create_points(
self,
chunks: List[str],
embeddings: List[List[float]],
team_id: str,
doc_name: str,
document_id: str,
page_num: int
) -> List[PointStruct]:
"""Create points for vector storage"""
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point = PointStruct(
id=str(uuid.uuid4()),
vector={"custom_vector": embedding},
payload={
"team_id": team_id,
"doc_name": doc_name,
"document_id": document_id,
"page_number": page_num,
"chunk_index": i,
"text": chunk,
"embedding_model": "all-MiniLM-L6-v2"
}
)
points.append(point)
return points
# Initialize global document processor
document_processor = DocumentProcessor()
4. Answer Generation with Authorization
Implement secure answer generation with team isolation:
class AnswerGenerator:
"""
Generate answers within team authorization scope
"""
def __init__(self, ai_config: AIConfig, vector_store: VectorStore):
self.ai_config = ai_config
self.vector_store = vector_store
def generate_answer(self, team_id: str, question: str) -> Dict[str, Any]:
"""
Generate answers using only team-authorized documents
"""
# Validate authorization
if not self._validate_team_id(team_id):
return {
"answer": "Unauthorized access",
"sources": [],
"status": "error"
}
try:
# Generate question embedding
query_vector = self.ai_config.get_embedding(question)
# Get relevant documents within team scope
points = self.vector_store.search_vectors(team_id, query_vector, limit=15)
if not points:
return {
"answer": "No relevant documents found",
"sources": [],
"status": "no_context"
}
# Prepare context with source tracking
context_parts = []
sources = set()
seen_text = set()
for point in points:
if point.payload:
text = point.payload.get('text', '').strip()
# Deduplicate similar content
if text in seen_text:
continue
doc_info = f"[Document: {point.payload.get('doc_name')}, Page: {point.payload.get('page_number')}]"
context_parts.append(f"{doc_info}\n{text}")
sources.add((
point.payload.get('doc_name'),
point.payload.get('page_number')
))
seen_text.add(text)
# Generate answer using AI
messages = [
{
"role": "system",
"content": (
"You are a helpful assistant that provides accurate, "
"comprehensive answers based on the given context. "
"Always cite your sources using [Document: X, Page: Y] format."
)
},
{
"role": "user",
"content": (
f"Answer this question using only the context provided. "
f"If you cannot answer based on the context, say so.\n\n"
f"Context:\n{' '.join(context_parts)}\n\n"
f"Question: {question}"
)
}
]
response = self.ai_config.openai_client.chat.completions.create(
model=os.getenv("OPENAI_MODEL_DEPLOYMENT"),
messages=messages,
max_tokens=1000,
temperature=0.2
)
return {
"answer": response.choices[0].message.content.strip(),
"sources": list(sources),
"status": "success"
}
except Exception as e:
return {
"answer": "Error generating answer",
"sources": [],
"status": "error",
"error": str(e)
}
def _validate_team_id(self, team_id: str) -> bool:
"""
Validate team_id authorization
Implementation depends on your authentication system
"""
# Add your team validation logic here
return bool(team_id and isinstance(team_id, str))
# Initialize global answer generator
answer_generator = AnswerGenerator(ai_config, vector_store)
5. Secure API Layer
Implement the API with proper authorization and security:
from flask import Flask, request, jsonify
from functools import wraps
import time
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
# Rate limiting configuration
RATE_LIMIT = {
"window": 60, # seconds
"max_requests": 100 # requests per window
}
class RateLimiter:
"""Simple in-memory rate limiter"""
def __init__(self):
self.requests = {}
def is_allowed(self, team_id: str) -> bool:
now = time.time()
team_requests = self.requests.get(team_id, [])
# Clean old requests
team_requests = [req_time for req_time in team_requests
if now - req_time < RATE_LIMIT["window"]]
if len(team_requests) >= RATE_LIMIT["max_requests"]:
return False
team_requests.append(now)
self.requests[team_id] = team_requests
return True
rate_limiter = RateLimiter()
def require_team_auth(f):
"""Authorization middleware"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Get team_id from request
team_id = request.form.get('team_id') or request.json.get('team_id')
if not team_id:
return jsonify({"error": "team_id is required"}), 401
# Check rate limit
if not rate_limiter.is_allowed(team_id):
return jsonify({"error": "Rate limit exceeded"}), 429
# Add your additional authorization checks here
# For example, validating JWT tokens, checking team membership, etc.
return f(*args, **kwargs)
return decorated_function
@app.post('/answer')
@require_team_auth
def get_answer():
"""
Generate answer for question within team scope
"""
try:
data = request.json
team_id = data.get('team_id')
question = data.get('question')
if not question:
return jsonify({"error": "Question is required"}), 400
response = answer_generator.generate_answer(team_id, question)
return jsonify(response)
except Exception as e:
return jsonify({"error": str(e)}), 500
```python
@app.route("/upload", methods=['POST'])
@require_team_auth
def upload_file():
"""
Upload and process PDF within team scope
"""
try:
# Validate request
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
if not file.filename.endswith('.pdf'):
return jsonify({"error": "Only PDF files are allowed"}), 400
team_id = request.form['team_id']
document_id = request.form['document_id']
# Process file with team scope
chunks = doc_processor.process_pdf(
pdf_file=file,
team_id=team_id,
doc_name=secure_filename(file.filename),
document_id=document_id
)
return jsonify({
"status": "success",
"chunks_processed": len(chunks),
"document_id": document_id
})
except Exception as e:
return jsonify({
"error": str(e),
"status": "error"
}), 500
@app.route("/documents", methods=['GET'])
@require_team_auth
def list_documents():
"""
List documents available for a team
"""
team_id = request.args.get('team_id')
try:
# Query vector store for team's documents
filter_query = Filter(
must=[
FieldCondition(
key="team_id",
match=MatchValue(value=team_id)
)
]
)
# Get unique documents
points = vector_store.client.scroll(
collection_name=vector_store.collection_name,
scroll_filter=filter_query,
limit=1000 # Adjust based on your needs
)
# Extract unique document information
documents = set()
for point in points[0]: # points[0] contains the actual points
if point.payload:
documents.add((
point.payload.get('document_id'),
point.payload.get('doc_name')
))
return jsonify({
"status": "success",
"documents": [
{"id": doc_id, "name": doc_name}
for doc_id, doc_name in documents
]
})
except Exception as e:
return jsonify({
"error": str(e),
"status": "error"
}), 500
# Environmental configuration
if __name__ == "__main__":
# Initialize vector store collection
vector_store.setup_collection()
# Configure server
app.run(
host='0.0.0.0',
port=int(os.getenv('PORT', 8000)),
debug=os.getenv('DEBUG', 'False').lower() == 'true',
ssl_context='adhoc' if os.getenv('ENABLE_HTTPS', 'False').lower() == 'true' else None
)
System Usage Examples
1. Upload a Document
import requests
def upload_document(file_path: str, team_id: str, document_id: str):
"""Example: Upload a PDF document"""
with open(file_path, 'rb') as file:
response = requests.post(
'http://localhost:8000/upload',
files={'file': file},
data={
'team_id': team_id,
'document_id': document_id
}
)
return response.json()
2. Ask Questions
def ask_question(team_id: str, question: str):
"""Example: Ask a question about uploaded documents"""
response = requests.post(
'http://localhost:8000/answer',
json={
'team_id': team_id,
'question': question
}
)
return response.json()
Deployment Guide
Docker Setup
Create a Dockerfile for the application:
FROM python:3.9-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
# Run the application
CMD ["python", "app.py"]
Docker Compose Configuration
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
- AZURE_OPENAI_API_KEY=${AZURE_OPENAI_API_KEY}
- AZURE_API_VERSION=${AZURE_API_VERSION}
- AZURE_DEPLOYMENT_NAME=${AZURE_DEPLOYMENT_NAME}
- QDRANT_HOST=qdrant
- QDRANT_PORT=6333
- ENABLE_HTTPS=false
depends_on:
- qdrant
networks:
- app-network
qdrant:
image: qdrant/qdrant
ports:
- "6333:6333"
volumes:
- qdrant_data:/qdrant/storage
networks:
- app-network
networks:
app-network:
driver: bridge
volumes:
qdrant_data:
2. Security Headers
Implement secure headers middleware:
from flask_talisman import Talisman
# Initialize Talisman with security headers
Talisman(app,
force_https=True,
strict_transport_security=True,
session_cookie_secure=True,
content_security_policy={
'default-src': "'self'",
'img-src': '*',
'script-src': "'self'"
}
)
Monitoring and Maintenance
1. Health Check Implementation
@app.route("/health", methods=['GET'])
def health_check():
"""System health check endpoint"""
try:
# Check components
health_status = {
"vector_store": "healthy",
"openai": "healthy",
"timestamp": time.time()
}
# Test vector store
vector_store.client.get_collections()
# Test OpenAI connection
ai_config.get_embedding("test")
return jsonify(health_status)
except Exception as e:
return jsonify({
"status": "unhealthy",
"error": str(e)
}), 500
2. Logging Configuration
import logging.config
# Configure logging
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
},
},
'handlers': {
'default': {
'level': 'INFO',
'formatter': 'standard',
'class': 'logging.StreamHandler',
},
'file': {
'level': 'INFO',
'formatter': 'standard',
'class': 'logging.FileHandler',
'filename': 'app.log',
'mode': 'a',
},
},
'loggers': {
'': {
'handlers': ['default', 'file'],
'level': 'INFO',
'propagate': True
}
}
})
logger = logging.getLogger(__name__)
Conclusion
This implementation provides a robust, secure foundation for building a PDF question-answering system. Key features include:
Security
- Team-based isolation
- Rate limiting
- Secure file handling
- Authorization middleware
Scalability
- Docker containerization
- Efficient vector search
- Modular design
Maintainability
- Comprehensive logging
- Health monitoring
- Clear documentation
Remember to:
- Keep dependencies updated
- Monitor system performance
- Regularly backup vector data
- Review security configurations
- Test thoroughly before deployment
This guide provides a foundation for building intelligent document systems. Ready to implement it in your organization or need help with a custom solution? I’m available for select consulting projects and technical advisory roles, focusing on production-grade AI systems. Let’s discuss your implementation: me@arif.sh
Full Source Code: github.com/doganarif/pdf-gpt-vectordb-qa
Star ⭐️ the repository if you found this guide helpful!
⚡ Happy Building!