crud_user.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. from audioop import add
  2. from typing import Any, Dict, Optional, Union
  3. from sqlalchemy.orm import Session
  4. from app.core.security import get_password_hash, verify_password
  5. from app.crud.base import CRUDBase
  6. from app.models.user import users
  7. from app.schemas.user import UserBase, UserCreate, UserUpdate
  8. class CRUDUser(CRUDBase[users, UserBase, UserCreate]):
  9. def get_by_email(self, db: Session, *, email: str) -> Optional[users]:
  10. return db.query(users).filter(users.email == email).first()
  11. def get_by_address(selft, db: Session, *, address: str) -> Optional[str]:
  12. return db.query(users).filter(users.useraddress == address).first().__dict__['userid']
  13. def get_by_account(self, db: Session, *, account: str) -> Optional[users]:
  14. return db.query(users).filter(users.account == account).first()
  15. def create(self, db: Session, *, obj_in: UserCreate) -> users:
  16. db_obj = users(
  17. email=obj_in.email,
  18. userid=obj_in.userid,
  19. useraddress=obj_in.useraddress,
  20. is_superuser=obj_in.is_superuser,
  21. account=obj_in.account,
  22. is_active=obj_in.is_active,
  23. hashed_password=get_password_hash(obj_in.hashed_password)
  24. )
  25. db.add(db_obj)
  26. db.commit()
  27. db.refresh(db_obj)
  28. return db_obj
  29. def update(
  30. self, db: Session, *, db_obj: users,
  31. obj_in: Union[UserUpdate, Dict[str, Any]]
  32. ) -> users:
  33. if isinstance(obj_in, dict):
  34. update_data = obj_in
  35. else:
  36. update_data = obj_in.dict(exclude_unset=True)
  37. if update_data["password"]:
  38. hashed_password = get_password_hash(update_data["password"])
  39. del update_data["password"]
  40. update_data["password"] = hashed_password
  41. return super().update(db, db_obj=db_obj, obj_in=update_data)
  42. def authenticate(
  43. self, db: Session, *, account: str, password: str
  44. ) -> Optional[users]:
  45. user = self.get_by_account(db, account=account)
  46. if not user:
  47. return None
  48. if not verify_password(password, user.hashed_password):
  49. return None
  50. return user
  51. def is_active(self, user: users) -> bool:
  52. return user.is_active
  53. def is_superuser(self, user: users) -> bool:
  54. return user.is_superuser
  55. user = CRUDUser(users)