crud_user.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from audioop import add
  2. from typing import Any, Dict, Optional, Union
  3. from sqlalchemy.orm import Session
  4. from app.core.security import get_password_hash, verify_password
  5. from app.crud.base import CRUDBase
  6. from app.models.user import users
  7. from app.schemas.user import UserBase, UserCreate, UserUpdate
  8. class CRUDUser(CRUDBase[users, UserBase, UserCreate]):
  9. def get_by_email(self, db: Session, *, email: str) -> Optional[users]:
  10. return db.query(users).filter(users.email == email).first()
  11. def get_by_address(selft, db: Session, *, address: str) -> Optional[str]:
  12. return db.query(users).filter(users.useraddress == address).first().__dict__['userid']
  13. def get_by_lineid(selft, db: Session, *, lineid: str) -> Optional[str]:
  14. return db.query(users).filter(users.userid == lineid).first()
  15. def get_by_account(self, db: Session, *, account: str) -> Optional[users]:
  16. return db.query(users).filter(users.account == account).first()
  17. def create(self, db: Session, *, obj_in: UserCreate) -> users:
  18. db_obj = users(
  19. email=obj_in.email,
  20. userid=obj_in.userid,
  21. useraddress=obj_in.useraddress,
  22. is_superuser=obj_in.is_superuser,
  23. account=obj_in.account,
  24. is_active=obj_in.is_active,
  25. hashed_password=get_password_hash(obj_in.hashed_password)
  26. )
  27. db.add(db_obj)
  28. db.commit()
  29. db.refresh(db_obj)
  30. return db_obj
  31. def update(
  32. self, db: Session, *, db_obj: users,
  33. obj_in: Union[UserUpdate, Dict[str, Any]]
  34. ) -> users:
  35. if isinstance(obj_in, dict):
  36. update_data = obj_in
  37. else:
  38. update_data = obj_in.dict(exclude_unset=True)
  39. # update_data = obj_in.dict()
  40. if "hashed_password" in update_data:
  41. hashed_password = get_password_hash(update_data["hashed_password"])
  42. del update_data["hashed_password"]
  43. update_data["hashed_password"] = hashed_password
  44. return super().update(db, db_obj=db_obj, obj_in=update_data)
  45. def authenticate(
  46. self, db: Session, *, account: str, password: str
  47. ) -> Optional[users]:
  48. user = self.get_by_account(db, account=account)
  49. if not user:
  50. return None
  51. if not verify_password(password, user.hashed_password):
  52. return None
  53. return user
  54. def is_active(self, user: users) -> bool:
  55. return user.is_active
  56. def is_superuser(self, user: users) -> bool:
  57. return user.is_superuser
  58. user = CRUDUser(users)