users.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. from fastapi import APIRouter, Form, Depends, HTTPException
  2. from fastapi.security import OAuth2PasswordRequestForm
  3. from app.models.models import User
  4. from app.api import deps
  5. from sqlalchemy.orm import Session
  6. from typing import Any, Dict
  7. import secrets
  8. from fastapi_login.exceptions import InvalidCredentialsException
  9. from fastapi_login import LoginManager
  10. from datetime import timedelta,datetime
  11. from app.config import settings
  12. from pathlib import Path
  13. from jose import jwt
  14. import emails
  15. from emails.template import JinjaTemplate
  16. import logging
  17. import bcrypt
  18. users = APIRouter()
  19. SECRET: str = secrets.token_urlsafe(32)
  20. manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72))
  21. @manager.user_loader()
  22. async def query_user(user_id: str):
  23. """
  24. Get a user from the db
  25. :param user_id: E-Mail of the user
  26. :return: None or the user object
  27. """
  28. result = await User.filter(username=user_id).first()
  29. if not result:
  30. print('[]')
  31. return []
  32. return result
  33. # return DB['users'].get(user_id)
  34. @users.post("/login")
  35. async def login(data: OAuth2PasswordRequestForm = Depends()):
  36. username = data.username
  37. password = data.password
  38. password_bytes = password.encode('utf-8') # 輸入的密碼
  39. print(password_bytes)
  40. user = await query_user(username)
  41. stored_hashed_password_bytes = user.password.encode('utf-8')
  42. print(user)
  43. access_token = manager.create_access_token(
  44. data={'sub': username}
  45. )
  46. if not user:
  47. # you can return any response or error of your choice
  48. raise InvalidCredentialsException
  49. # elif password != user.password:
  50. # raise InvalidCredentialsException
  51. else:
  52. if bcrypt.checkpw(password_bytes, stored_hashed_password_bytes):
  53. return {'access_token': access_token}
  54. else:
  55. return {"message": "Invalid username or password"}
  56. @users.post("/logout")
  57. async def logout():
  58. return {"msg":"logout success","code":200}
  59. @users.post("/add")
  60. async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), re_password: str = Form(default='')):
  61. if username and password and email:
  62. if password == re_password:
  63. hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
  64. u = await User.create(username=username, password=hashed_password, email=email,point='1000')
  65. if u:
  66. # send_email()
  67. return {"msg": "已寄送認證信", "code": 200}
  68. else:
  69. return {"msg":"確認密碼錯誤","code":403}
  70. return {"msg": "create user failed", "code": 403}
  71. def generate_password_reset_token(email: str) -> str:
  72. delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
  73. now = datetime.utcnow()
  74. expires = now + delta
  75. exp = expires.timestamp()
  76. encoded_jwt = jwt.encode(
  77. {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",
  78. )
  79. return encoded_jwt
  80. def send_email(
  81. email_to: str,
  82. subject_template: str = "",
  83. html_template: str = "",
  84. environment: Dict[str, Any] = {},
  85. ) -> None:
  86. # assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
  87. message = emails.Message(
  88. subject=JinjaTemplate(subject_template),
  89. html=JinjaTemplate(html_template),
  90. mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
  91. )
  92. smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
  93. if settings.SMTP_TLS:
  94. smtp_options["tls"] = True
  95. if settings.SMTP_USER:
  96. smtp_options["user"] = settings.SMTP_USER
  97. if settings.SMTP_PASSWORD:
  98. smtp_options["password"] = settings.SMTP_PASSWORD
  99. response = message.send(to=email_to, render=environment, smtp=smtp_options)
  100. logging.info(f"send email result: {response}")
  101. def send_reset_password_email(email_to: str, email: str, token: str) -> None:
  102. subject = f"Password recovery for user {email}"
  103. with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
  104. template_str = f.read()
  105. server_host = settings.SERVER_HOST
  106. link = f"{server_host}/reset-password?token={token}"
  107. send_email(
  108. email_to=email_to,
  109. subject_template=subject,
  110. html_template=template_str,
  111. environment={
  112. # "project_name": settings.PROJECT_NAME,
  113. "username": email,
  114. "email": email_to,
  115. "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
  116. "link": link,
  117. },
  118. )
  119. @users.post("/password-recovery/{email}")
  120. async def recover_password(email:str):
  121. user = await User.filter(email=email).first()
  122. if not user:
  123. raise HTTPException(
  124. status_code=404,
  125. detail="The user with this username does not exist in the system.",
  126. )
  127. password_reset_token = generate_password_reset_token(email=email)
  128. send_reset_password_email(
  129. email_to=user.email, email=email, token=password_reset_token
  130. )
  131. return {"msg": "Password recovery email sent"}
  132. @users.get("/delete_user/{id}")
  133. async def delete(id: int):
  134. if id:
  135. await User.filter(id=id).delete()
  136. return {"msg": "success", "code": 200}
  137. return {"msg": "failed", "code": 400}