123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- from fastapi import APIRouter, Form, Depends, HTTPException
- from fastapi.security import OAuth2PasswordRequestForm
- from app.models.models import User
- from app.api import deps
- from sqlalchemy.orm import Session
- from typing import Any, Dict
- import secrets
- from fastapi_login.exceptions import InvalidCredentialsException
- from fastapi_login import LoginManager
- from datetime import timedelta,datetime
- from app.config import settings
- from pathlib import Path
- from jose import jwt
- import emails
- from emails.template import JinjaTemplate
- import logging
- import bcrypt
- users = APIRouter()
- SECRET: str = secrets.token_urlsafe(32)
- manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72))
- @manager.user_loader()
- async def query_user(user_id: str):
- """
- Get a user from the db
- :param user_id: E-Mail of the user
- :return: None or the user object
- """
- result = await User.filter(username=user_id).first()
- if not result:
- print('[]')
- return []
- return result
- # return DB['users'].get(user_id)
- @users.post("/login")
- async def login(data: OAuth2PasswordRequestForm = Depends()):
- username = data.username
- password = data.password
- password_bytes = password.encode('utf-8') # 輸入的密碼
- print(password_bytes)
- user = await query_user(username)
- stored_hashed_password_bytes = user.password.encode('utf-8')
- print(user)
- access_token = manager.create_access_token(
- data={'sub': username}
- )
- if not user:
- # you can return any response or error of your choice
- raise InvalidCredentialsException
- # elif password != user.password:
- # raise InvalidCredentialsException
- else:
- if bcrypt.checkpw(password_bytes, stored_hashed_password_bytes):
- return {'access_token': access_token}
- else:
- return {"message": "Invalid username or password"}
- @users.post("/logout")
- async def logout():
- return {"msg":"logout success","code":200}
- @users.post("/add")
- async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), re_password: str = Form(default='')):
- if username and password and email:
- if password == re_password:
- hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
- u = await User.create(username=username, password=hashed_password, email=email,point='1000')
- if u:
- # send_email()
- return {"msg": "已寄送認證信", "code": 200}
- else:
- return {"msg":"確認密碼錯誤","code":403}
- return {"msg": "create user failed", "code": 403}
- def generate_password_reset_token(email: str) -> str:
- delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
- now = datetime.utcnow()
- expires = now + delta
- exp = expires.timestamp()
- encoded_jwt = jwt.encode(
- {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",
- )
- return encoded_jwt
- def send_email(
- email_to: str,
- subject_template: str = "",
- html_template: str = "",
- environment: Dict[str, Any] = {},
- ) -> None:
- # assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
- message = emails.Message(
- subject=JinjaTemplate(subject_template),
- html=JinjaTemplate(html_template),
- mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
- )
- smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
- if settings.SMTP_TLS:
- smtp_options["tls"] = True
- if settings.SMTP_USER:
- smtp_options["user"] = settings.SMTP_USER
- if settings.SMTP_PASSWORD:
- smtp_options["password"] = settings.SMTP_PASSWORD
- response = message.send(to=email_to, render=environment, smtp=smtp_options)
- logging.info(f"send email result: {response}")
- def send_reset_password_email(email_to: str, email: str, token: str) -> None:
- subject = f"Password recovery for user {email}"
- with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
- template_str = f.read()
- server_host = settings.SERVER_HOST
- link = f"{server_host}/reset-password?token={token}"
- send_email(
- email_to=email_to,
- subject_template=subject,
- html_template=template_str,
- environment={
- # "project_name": settings.PROJECT_NAME,
- "username": email,
- "email": email_to,
- "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
- "link": link,
- },
- )
- @users.post("/password-recovery/{email}")
- async def recover_password(email:str):
- user = await User.filter(email=email).first()
- if not user:
- raise HTTPException(
- status_code=404,
- detail="The user with this username does not exist in the system.",
- )
- password_reset_token = generate_password_reset_token(email=email)
- send_reset_password_email(
- email_to=user.email, email=email, token=password_reset_token
- )
- return {"msg": "Password recovery email sent"}
- @users.get("/delete_user/{id}")
- async def delete(id: int):
- if id:
- await User.filter(id=id).delete()
- return {"msg": "success", "code": 200}
- return {"msg": "failed", "code": 400}
|