Аутентификация
В этом разделе описывается система аутентификации в Boilerplate, основанная на JWT (JSON Web Tokens) и интегрированная с FastAPI.
Обзор системы аутентификации
Backend часть Boilerplate использует JWT токены для аутентификации пользователей. Процесс включает в себя:
- Регистрацию пользователей
- Аутентификацию и получение токенов
- Обновление токенов
- Проверку прав доступа
- Защиту маршрутов с помощью зависимостей
Конфигурация JWT
Настройки JWT определены в файле app/core/config.py
:
# app/core/config.py
from pydantic_settings import BaseSettings
from datetime import timedelta
class Settings(BaseSettings):
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# ... другие настройки ...
settings = Settings()
Генерация и проверка JWT токенов
Функции для работы с JWT токенами определены в файле app/core/security.py
:
# app/core/security.py
from datetime import datetime, timedelta
from typing import Any, Optional, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(subject: Union[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""
Создает JWT access token
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expire, "sub": str(subject), "type": "access"}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def create_refresh_token(subject: Union[str, Any]) -> str:
"""
Создает JWT refresh token
"""
expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
to_encode = {"exp": expire, "sub": str(subject), "type": "refresh"}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Проверяет соответствие открытого пароля хешированному
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""
Создает хеш пароля
"""
return pwd_context.hash(password)
Зависимости для проверки аутентификации
Зависимости FastAPI для проверки аутентификации определены в файле app/api/deps.py
:
# app/api/deps.py
from typing import Generator, Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import ValidationError
from app import crud, models, schemas
from app.core import security
from app.core.config import settings
from app.db.session import AsyncSessionLocal
# Определение OAuth2 схемы с путем для получения токена
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
async def get_db() -> Generator:
"""
Зависимость для получения сессии базы данных
"""
session = AsyncSessionLocal()
try:
yield session
finally:
await session.close()
async def get_current_user(
db: AsyncSession = Depends(get_db),
token: str = Depends(oauth2_scheme),
) -> models.User:
"""
Зависимость для получения текущего пользователя по JWT токену
"""
try:
# Декодируем токен
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
# Проверяем тип токена и срок действия
if token_data.type != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный тип токена",
headers={"WWW-Authenticate": "Bearer"},
)
if token_data.exp < datetime.utcnow().timestamp():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Срок действия токена истек",
headers={"WWW-Authenticate": "Bearer"},
)
except (JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Невозможно проверить учетные данные",
headers={"WWW-Authenticate": "Bearer"},
)
# Получаем пользователя из базы данных
user = await crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Пользователь не найден")
if not user.is_active:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Пользователь неактивен")
return user
async def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
"""
Зависимость для проверки что текущий пользователь является админом
"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав доступа",
)
return current_user
API эндпоинты аутентификации
API эндпоинты для регистрации, аутентификации и обновления токенов определены в файле app/api/routes/auth.py
:
# app/api/routes/auth.py
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Body, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from app import crud, models, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
router = APIRouter()
@router.post("/register", response_model=schemas.User)
async def register(
*,
db: AsyncSession = Depends(deps.get_db),
user_in: schemas.UserCreate,
) -> Any:
"""
Регистрация нового пользователя.
"""
# Проверяем, существует ли пользователь с таким email
user = await crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь с таким email уже существует",
)
# Создаем нового пользователя
user = await crud.user.create(db, obj_in=user_in)
return user
@router.post("/login", response_model=schemas.Token)
async def login_access_token(
db: AsyncSession = Depends(deps.get_db),
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Any:
"""
Вход в систему и получение JWT токена.
"""
# Проверяем учетные данные пользователя
user = await crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неверный email или пароль",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь неактивен",
)
# Создаем токены
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = security.create_access_token(
user.id, expires_delta=access_token_expires
)
refresh_token = security.create_refresh_token(user.id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
}
@router.post("/refresh", response_model=schemas.Token)
async def refresh_token(
db: AsyncSession = Depends(deps.get_db),
refresh_token: str = Body(...),
) -> Any:
"""
Обновление JWT токена с помощью refresh токена.
"""
try:
# Декодируем refresh токен
payload = jwt.decode(
refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
# Проверяем тип токена
if token_data.type != "refresh":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неверный тип токена",
)
# Получаем пользователя
user = await crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Пользователь не найден",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь неактивен",
)
# Создаем новые токены
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = security.create_access_token(
user.id, expires_delta=access_token_expires
)
new_refresh_token = security.create_refresh_token(user.id)
return {
"access_token": access_token,
"refresh_token": new_refresh_token,
"token_type": "bearer",
}
except (JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Невозможно проверить учетные данные",
)
@router.post("/test-token", response_model=schemas.User)
async def test_token(
current_user: models.User = Depends(deps.get_current_user),
) -> Any:
"""
Тестовый эндпоинт для проверки JWT токена.
"""
return current_user
Модели и схемы для аутентификации
Pydantic схемы для аутентификации определены в файле app/schemas/token.py
:
# app/schemas/token.py
from typing import Optional
from pydantic import BaseModel, Field
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None
exp: float # Время истечения токена
type: str # Тип токена: "access" или "refresh"
Использование в клиентской части
Для использования аутентификации в клиентской части приложения, токены должны храниться в безопасном месте и отправляться с каждым запросом к защищенным эндпоинтам.
Пример отправки запроса с токеном аутентификации:
// frontend/lib/api.ts
async function fetchWithAuth(url: string, options?: RequestInit) {
// Получаем токен из хранилища
const token = localStorage.getItem('accessToken');
// Добавляем заголовок авторизации
const headers = {
'Content-Type': 'application/json',
...(token ? { 'Authorization': `Bearer ${token}` } : {}),
...(options?.headers || {})
};
try {
const response = await fetch(url, {
...options,
headers
});
// Если токен истек, пытаемся обновить его
if (response.status === 401) {
const refreshed = await refreshToken();
if (refreshed) {
// Повторяем запрос с новым токеном
return fetchWithAuth(url, options);
}
// Если не удалось обновить токен, перенаправляем на страницу входа
window.location.href = '/login';
}
return response;
} catch (error) {
console.error('API request failed:', error);
throw error;
}
}
async function refreshToken() {
const refreshToken = localStorage.getItem('refreshToken');
if (!refreshToken) return false;
try {
const response = await fetch('/api/auth/refresh', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ refresh_token: refreshToken })
});
if (response.ok) {
const data = await response.json();
localStorage.setItem('accessToken', data.access_token);
localStorage.setItem('refreshToken', data.refresh_token);
return true;
}
return false;
} catch (error) {
console.error('Token refresh failed:', error);
return false;
}
}
Защита маршрутов
Чтобы защитить маршруты API, используйте зависимость get_current_user:
# app/api/routes/some_protected_route.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app import models
from app.api import deps
router = APIRouter()
@router.get("/protected-data")
async def get_protected_data(
current_user: models.User = Depends(deps.get_current_user),
db: AsyncSession = Depends(deps.get_db),
):
"""
Защищенный эндпоинт, требующий аутентификацию.
"""
# Доступ только для аутентифицированных пользователей
return {"message": f"Привет, {current_user.email}!", "data": "Секретные данные"}
@router.get("/admin-only")
async def get_admin_data(
current_user: models.User = Depends(deps.get_current_active_superuser),
db: AsyncSession = Depends(deps.get_db),
):
"""
Защищенный эндпоинт, требующий права администратора.
"""
# Доступ только для администраторов
return {"message": "Данные для администраторов"}
Советы и лучшие практики
- Используйте HTTPS для защиты передачи токенов
- Устанавливайте короткий срок жизни для access токенов (15-30 минут)
- Используйте refresh токены для обновления access токенов
- Храните токены в безопасном месте на клиенте (HTTP-only cookies для refresh токенов)
- Реализуйте механизм отзыва refresh токенов
- Добавьте ротацию refresh токенов при каждом использовании
- Включите CORS защиту для API