| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- from datetime import datetime, timedelta
- from jose import JWTError, jwt
- from passlib.context import CryptContext
- from fastapi import HTTPException, Depends
- from fastapi.security import HTTPBearer
- SECRET_KEY = "your-secret-key-here-change-in-production"
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_MINUTES = 30
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- security = HTTPBearer()
- def verify_password(plain_password, hashed_password):
- return pwd_context.verify(plain_password, hashed_password)
- def get_password_hash(password):
- return pwd_context.hash(password)
- def create_access_token(data: dict):
- to_encode = data.copy()
- expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
- def verify_token(token: str):
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- return payload
- except JWTError:
- return None
- # Добавьте эту функцию для зависимости
- async def get_current_user(token: str = Depends(security)):
- credentials_exception = HTTPException(
- status_code=401,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
-
- payload = verify_token(token.credentials)
- if payload is None:
- raise credentials_exception
-
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
-
- return {"sub": username}
|