login.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from datetime import timedelta
  2. from typing import Any
  3. from fastapi import APIRouter, Body, Depends, HTTPException
  4. from fastapi.security import OAuth2PasswordRequestForm
  5. from sqlalchemy.orm import Session
  6. from app import crud, models, schemas
  7. from app.api import deps
  8. from app.core import security
  9. from app.core.config import settings
  10. from app.core.security import get_password_hash
  11. from app.utils import (
  12. generate_password_reset_token,
  13. send_reset_password_email,
  14. verify_password_reset_token,
  15. )
  16. router = APIRouter()
  17. @router.post("/login/access-token", response_model=schemas.Token)
  18. def login_access_token(
  19. db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
  20. ) -> Any:
  21. """
  22. OAuth2 compatible token login, get an access token for future requests
  23. """
  24. user = crud.user.authenticate(
  25. db, account=form_data.username, password=form_data.password
  26. )
  27. if not user:
  28. raise HTTPException(status_code=401, detail="Incorrect email or password")
  29. elif not crud.user.is_active(user):
  30. raise HTTPException(status_code=400, detail="Inactive user")
  31. access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
  32. return {
  33. "access_token": security.create_access_token(
  34. user.id, expires_delta=access_token_expires
  35. ),
  36. "token_type": "bearer",
  37. }
  38. @router.post("/login/test-token", response_model=schemas.UserCreate)
  39. def test_token(
  40. current_user: models.users = Depends(deps.get_current_active_user)) -> Any:
  41. """
  42. Test access token
  43. """
  44. return current_user
  45. @router.post("/reset-password/", response_model=schemas.Msg)
  46. def reset_password(
  47. token: str = Body(...),
  48. new_password: str = Body(...),
  49. db: Session = Depends(deps.get_db),
  50. ) -> Any:
  51. """
  52. Reset password
  53. """
  54. email = verify_password_reset_token(token)
  55. if not email:
  56. raise HTTPException(status_code=400, detail="Invalid token")
  57. user = crud.user.get_by_email(db, email=email)
  58. if not user:
  59. raise HTTPException(
  60. status_code=404,
  61. detail="The user with this username does not exist in the system.",
  62. )
  63. elif not crud.user.is_active(user):
  64. raise HTTPException(status_code=400, detail="Inactive user")
  65. hashed_password = get_password_hash(new_password)
  66. user.hashed_password = hashed_password
  67. db.add(user)
  68. db.commit()
  69. return {"msg": "Password updated successfully"}