Аутентификация

В этом разделе описывается система аутентификации в Boilerplate, основанная на JWT (JSON Web Tokens) и интегрированная с FastAPI.

Обзор системы аутентификации

Backend часть Boilerplate использует JWT токены для аутентификации пользователей. Процесс включает в себя:

  • Регистрацию пользователей
  • Аутентификацию и получение токенов
  • Обновление токенов
  • Проверку прав доступа
  • Защиту маршрутов с помощью зависимостей

Конфигурация JWT

Настройки JWT определены в файле app/core/config.py:

python
# app/core/config.py
from pydantic_settings import BaseSettings
from datetime import timedelta

class Settings(BaseSettings):
    SECRET_KEY: str
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    REFRESH_TOKEN_EXPIRE_DAYS: int = 7

    # ... другие настройки ...

settings = Settings()

Генерация и проверка JWT токенов

Функции для работы с JWT токенами определены в файле app/core/security.py:

python
# app/core/security.py
from datetime import datetime, timedelta
from typing import Any, Optional, Union

from jose import jwt
from passlib.context import CryptContext

from app.core.config import settings

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def create_access_token(subject: Union[str, Any], expires_delta: Optional[timedelta] = None) -> str:
    """
    Создает JWT access token
    """
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
        
    to_encode = {"exp": expire, "sub": str(subject), "type": "access"}
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return encoded_jwt

def create_refresh_token(subject: Union[str, Any]) -> str:
    """
    Создает JWT refresh token
    """
    expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
    to_encode = {"exp": expire, "sub": str(subject), "type": "refresh"}
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return encoded_jwt

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """
    Проверяет соответствие открытого пароля хешированному
    """
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """
    Создает хеш пароля
    """
    return pwd_context.hash(password)

Зависимости для проверки аутентификации

Зависимости FastAPI для проверки аутентификации определены в файле app/api/deps.py:

python
# app/api/deps.py
from typing import Generator, Optional

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import ValidationError

from app import crud, models, schemas
from app.core import security
from app.core.config import settings
from app.db.session import AsyncSessionLocal

# Определение OAuth2 схемы с путем для получения токена
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")

async def get_db() -> Generator:
    """
    Зависимость для получения сессии базы данных
    """
    session = AsyncSessionLocal()
    try:
        yield session
    finally:
        await session.close()

async def get_current_user(
    db: AsyncSession = Depends(get_db),
    token: str = Depends(oauth2_scheme),
) -> models.User:
    """
    Зависимость для получения текущего пользователя по JWT токену
    """
    try:
        # Декодируем токен
        payload = jwt.decode(
            token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
        )
        token_data = schemas.TokenPayload(**payload)
        
        # Проверяем тип токена и срок действия
        if token_data.type != "access":
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Неверный тип токена",
                headers={"WWW-Authenticate": "Bearer"},
            )
            
        if token_data.exp < datetime.utcnow().timestamp():
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Срок действия токена истек",
                headers={"WWW-Authenticate": "Bearer"},
            )
    except (JWTError, ValidationError):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Невозможно проверить учетные данные",
            headers={"WWW-Authenticate": "Bearer"},
        )
        
    # Получаем пользователя из базы данных
    user = await crud.user.get(db, id=token_data.sub)
    if not user:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Пользователь не найден")
    if not user.is_active:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Пользователь неактивен")
        
    return user

async def get_current_active_superuser(
    current_user: models.User = Depends(get_current_user),
) -> models.User:
    """
    Зависимость для проверки что текущий пользователь является админом
    """
    if not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Недостаточно прав доступа",
        )
    return current_user

API эндпоинты аутентификации

API эндпоинты для регистрации, аутентификации и обновления токенов определены в файле app/api/routes/auth.py:

python
# app/api/routes/auth.py
from datetime import timedelta
from typing import Any

from fastapi import APIRouter, Body, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession

from app import crud, models, schemas
from app.api import deps
from app.core import security
from app.core.config import settings

router = APIRouter()

@router.post("/register", response_model=schemas.User)
async def register(
    *,
    db: AsyncSession = Depends(deps.get_db),
    user_in: schemas.UserCreate,
) -> Any:
    """
    Регистрация нового пользователя.
    """
    # Проверяем, существует ли пользователь с таким email
    user = await crud.user.get_by_email(db, email=user_in.email)
    if user:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Пользователь с таким email уже существует",
        )
        
    # Создаем нового пользователя
    user = await crud.user.create(db, obj_in=user_in)
    return user

@router.post("/login", response_model=schemas.Token)
async def login_access_token(
    db: AsyncSession = Depends(deps.get_db),
    form_data: OAuth2PasswordRequestForm = Depends(),
) -> Any:
    """
    Вход в систему и получение JWT токена.
    """
    # Проверяем учетные данные пользователя
    user = await crud.user.authenticate(
        db, email=form_data.username, password=form_data.password
    )
    if not user:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Неверный email или пароль",
        )
    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Пользователь неактивен",
        )
        
    # Создаем токены
    access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = security.create_access_token(
        user.id, expires_delta=access_token_expires
    )
    refresh_token = security.create_refresh_token(user.id)
    
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer",
    }

@router.post("/refresh", response_model=schemas.Token)
async def refresh_token(
    db: AsyncSession = Depends(deps.get_db),
    refresh_token: str = Body(...),
) -> Any:
    """
    Обновление JWT токена с помощью refresh токена.
    """
    try:
        # Декодируем refresh токен
        payload = jwt.decode(
            refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
        )
        token_data = schemas.TokenPayload(**payload)
        
        # Проверяем тип токена
        if token_data.type != "refresh":
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Неверный тип токена",
            )
            
        # Получаем пользователя
        user = await crud.user.get(db, id=token_data.sub)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail="Пользователь не найден",
            )
        if not user.is_active:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Пользователь неактивен",
            )
            
        # Создаем новые токены
        access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
        access_token = security.create_access_token(
            user.id, expires_delta=access_token_expires
        )
        new_refresh_token = security.create_refresh_token(user.id)
        
        return {
            "access_token": access_token,
            "refresh_token": new_refresh_token,
            "token_type": "bearer",
        }
    except (JWTError, ValidationError):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Невозможно проверить учетные данные",
        )

@router.post("/test-token", response_model=schemas.User)
async def test_token(
    current_user: models.User = Depends(deps.get_current_user),
) -> Any:
    """
    Тестовый эндпоинт для проверки JWT токена.
    """
    return current_user

Модели и схемы для аутентификации

Pydantic схемы для аутентификации определены в файле app/schemas/token.py:

python
# app/schemas/token.py
from typing import Optional
from pydantic import BaseModel, Field

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str

class TokenPayload(BaseModel):
    sub: Optional[int] = None
    exp: float  # Время истечения токена
    type: str  # Тип токена: "access" или "refresh"

Использование в клиентской части

Для использования аутентификации в клиентской части приложения, токены должны храниться в безопасном месте и отправляться с каждым запросом к защищенным эндпоинтам.

Пример отправки запроса с токеном аутентификации:

typescript
// frontend/lib/api.ts
async function fetchWithAuth(url: string, options?: RequestInit) {
  // Получаем токен из хранилища
  const token = localStorage.getItem('accessToken');
  
  // Добавляем заголовок авторизации
  const headers = {
    'Content-Type': 'application/json',
    ...(token ? { 'Authorization': `Bearer ${token}` } : {}),
    ...(options?.headers || {})
  };

  try {
    const response = await fetch(url, {
      ...options,
      headers
    });

    // Если токен истек, пытаемся обновить его
    if (response.status === 401) {
      const refreshed = await refreshToken();
      if (refreshed) {
        // Повторяем запрос с новым токеном
        return fetchWithAuth(url, options);
      }
      // Если не удалось обновить токен, перенаправляем на страницу входа
      window.location.href = '/login';
    }

    return response;
  } catch (error) {
    console.error('API request failed:', error);
    throw error;
  }
}

async function refreshToken() {
  const refreshToken = localStorage.getItem('refreshToken');
  if (!refreshToken) return false;

  try {
    const response = await fetch('/api/auth/refresh', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ refresh_token: refreshToken })
    });

    if (response.ok) {
      const data = await response.json();
      localStorage.setItem('accessToken', data.access_token);
      localStorage.setItem('refreshToken', data.refresh_token);
      return true;
    }
    
    return false;
  } catch (error) {
    console.error('Token refresh failed:', error);
    return false;
  }
}

Защита маршрутов

Чтобы защитить маршруты API, используйте зависимость get_current_user:

python
# app/api/routes/some_protected_route.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from app import models
from app.api import deps

router = APIRouter()

@router.get("/protected-data")
async def get_protected_data(
    current_user: models.User = Depends(deps.get_current_user),
    db: AsyncSession = Depends(deps.get_db),
):
    """
    Защищенный эндпоинт, требующий аутентификацию.
    """
    # Доступ только для аутентифицированных пользователей
    return {"message": f"Привет, {current_user.email}!", "data": "Секретные данные"}

@router.get("/admin-only")
async def get_admin_data(
    current_user: models.User = Depends(deps.get_current_active_superuser),
    db: AsyncSession = Depends(deps.get_db),
):
    """
    Защищенный эндпоинт, требующий права администратора.
    """
    # Доступ только для администраторов
    return {"message": "Данные для администраторов"}

Советы и лучшие практики

  • Используйте HTTPS для защиты передачи токенов
  • Устанавливайте короткий срок жизни для access токенов (15-30 минут)
  • Используйте refresh токены для обновления access токенов
  • Храните токены в безопасном месте на клиенте (HTTP-only cookies для refresh токенов)
  • Реализуйте механизм отзыва refresh токенов
  • Добавьте ротацию refresh токенов при каждом использовании
  • Включите CORS защиту для API

Ресурсы