from fastapi import APIRouter, Form, Depends, HTTPException from fastapi.security import OAuth2PasswordRequestForm from app.models.models import User from app.api import deps from sqlalchemy.orm import Session from typing import Any, Dict import secrets from fastapi_login.exceptions import InvalidCredentialsException from fastapi_login import LoginManager from datetime import timedelta,datetime from app.config import settings from pathlib import Path from jose import jwt import emails from emails.template import JinjaTemplate import logging import bcrypt users = APIRouter() SECRET: str = secrets.token_urlsafe(32) manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72)) @manager.user_loader() async def query_user(user_id: str): """ Get a user from the db :param user_id: E-Mail of the user :return: None or the user object """ result = await User.filter(username=user_id).first() if not result: print('[]') return [] return result # return DB['users'].get(user_id) @users.post("/login") async def login(data: OAuth2PasswordRequestForm = Depends()): email = data.email password = data.password user = await query_user(email) print(user) access_token = manager.create_access_token( data={'sub': email} ) if not user: # you can return any response or error of your choice raise InvalidCredentialsException # elif password != user.password: # raise InvalidCredentialsException else: if bcrypt.checkpw(user.password.encode('utf-8'), password): return {'access_token': access_token} else: return {"message": "Invalid username or password"} @users.post("/logout") async def logout(): return {"msg":"logout success","code":200} @users.post("/add") async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), re_password: str = Form(default='')): if username and password and email: if password == re_password: hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) u = await User.create(username=username, password=hashed_password, email=email) if u: # send_email() return {"msg": "已寄送認證信", "code": 200} else: return {"msg":"確認密碼錯誤","code":403} return {"msg": "create user failed", "code": 403} def generate_password_reset_token(email: str) -> str: delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS) now = datetime.utcnow() expires = now + delta exp = expires.timestamp() encoded_jwt = jwt.encode( {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256", ) return encoded_jwt def send_email( email_to: str, subject_template: str = "", html_template: str = "", environment: Dict[str, Any] = {}, ) -> None: # assert settings.EMAILS_ENABLED, "no provided configuration for email variables" message = emails.Message( subject=JinjaTemplate(subject_template), html=JinjaTemplate(html_template), mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL), ) smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT} if settings.SMTP_TLS: smtp_options["tls"] = True if settings.SMTP_USER: smtp_options["user"] = settings.SMTP_USER if settings.SMTP_PASSWORD: smtp_options["password"] = settings.SMTP_PASSWORD response = message.send(to=email_to, render=environment, smtp=smtp_options) logging.info(f"send email result: {response}") def send_reset_password_email(email_to: str, email: str, token: str) -> None: subject = f"Password recovery for user {email}" with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f: template_str = f.read() server_host = settings.SERVER_HOST link = f"{server_host}/reset-password?token={token}" send_email( email_to=email_to, subject_template=subject, html_template=template_str, environment={ # "project_name": settings.PROJECT_NAME, "username": email, "email": email_to, "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS, "link": link, }, ) @users.post("/password-recovery/{email}") async def recover_password(email:str): user = await User.filter(email=email).first() if not user: raise HTTPException( status_code=404, detail="The user with this username does not exist in the system.", ) password_reset_token = generate_password_reset_token(email=email) send_reset_password_email( email_to=user.email, email=email, token=password_reset_token ) return {"msg": "Password recovery email sent"} @users.get("/delete_user/{id}") async def delete(id: int): if id: await User.filter(id=id).delete() return {"msg": "success", "code": 200} return {"msg": "failed", "code": 400}