|
- from fastapi import APIRouter, Form, Depends, HTTPException, Body
- from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
- from app.models.models import User
- from app.api import deps
- from sqlalchemy.orm import Session
- from typing import Any, Dict, Optional
- import secrets
- from fastapi_login.exceptions import InvalidCredentialsException
- from fastapi_login import LoginManager
- from datetime import timedelta,datetime
- from app.config import settings
- from pathlib import Path
- from jose import jwt
- import emails
- from emails.template import JinjaTemplate
- import logging
- import bcrypt
- import smtplib
- from email.mime.text import MIMEText
- from google.oauth2 import id_token
- from google.auth.transport import requests
- from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
- from fastapi.security.utils import get_authorization_scheme_param
- users = APIRouter()
- SECRET: str = secrets.token_urlsafe(32)
- manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72))
- @manager.user_loader()
- async def query_user(user_id: str):
- """
- Get a user from the db
- :param user_id: E-Mail of the user
- :return: None or the user object
- """
- result = await User.filter(email=user_id,is_gmail=0).first()
- if not result:
- print('無此筆資料')
- return None
- return result
- @manager.user_loader()
- async def query_user_username(user_id: str):
- """
- Get a user from the db
- :param user_id: E-Mail of the user
- :return: None or the user object
- """
- result = await User.filter(username=user_id).first()
- if not result:
- print('無此筆資料')
- return None
- return result
- @users.post("/login")
- async def login(data: OAuth2PasswordRequestForm = Depends()):
- email = data.username
- password = data.password
- user = await query_user(email)
- print(user)
- access_token = manager.create_access_token(
- data={'sub': email}
- )
- if not user:
- # you can return any response or error of your choice
- return {"message":"查無此人"}
- # elif password != user.password:
- # raise InvalidCredentialsException
- else:
- stored_hashed_password = user.password.encode('utf-8')
- if bcrypt.checkpw(password.encode('utf-8'),stored_hashed_password):
- return {'msg':'登入成功','code':'200','access_token': access_token,'username':user.username,'email':user.email,'points':user.points}
- else:
- return {"message": "Invalid username or password"}
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="https://oauth2.googleapis.com/token")
- CLIENT_ID = settings.CLIENT_ID
- @users.post("/login/google/access-token")
- async def login(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default='')
- ) -> Any:
- """
- OAuth2 compatible token login, get an access token for future requests
- """
- user = await User.filter(email=email,is_gmail=1).first() # 確認信箱是否已存在
- if not user:
- u = await User.create(username=username, password=password, email=email, is_gmail=1)
- # if user:
- # print('已用相同信箱註冊過,再開一個GMAIL帳號')
- # u = await User.create(username=username, password=password,email=email,is_gmail=1)
- access_token = manager.create_access_token(
- data={'sub': username}
- )
- return_msg = {
- "access_token": access_token,
- "token_type": "bearer",
- }
- return return_msg
- # if add_time_code:
- # available_ser_no = crud.serial_number.available(db, ser_no=add_time_code)
- # print(available_ser_no)
- # if available_ser_no:
- # user_in = schemas.UserUpdate(available_time=user.available_time + available_ser_no.time)
- # crud.user.update(db, db_obj=user, obj_in=user_in)
- #
- # ser_no_in = schemas.SerialNumberUpdate(code=available_ser_no.code, is_used=True,
- # used_datetime=str(datetime.now()), owner_id=user.id)
- # crud.serial_number.update(db, db_obj=available_ser_no, obj_in=ser_no_in)
- # print(available_ser_no.time, type(available_ser_no.time))
- # return_msg['time_added'] = available_ser_no.time
- # else:
- # return_msg['time_added'] = -1
- # return return_msg
- @users.post("/logout")
- async def logout():
- return {"msg":"logout success","code":200}
- @users.post("/add")
- async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), re_password: str = Form(default='')):
- if username and password and email:
- user_email = await query_user(email)
- user_username = await query_user_username(username)
- if user_email and user_username:
- return {"msg":"該信箱或使用者名稱已存在","code":403}
- # elif user_username:
- # return {"msg":"該使用者名稱已存在","code":403}
- else:
- if password == re_password:
- hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
- u = await User.create(username=username, password=hashed_password, email=email,is_superuser=0,is_gmail=0)
- if u:
- message = '註冊成功'
- subject = '註冊信'
- print(message)
- send_email(email,message,subject)
- return {"msg": "已寄送註冊信", "code": 200}
- else:
- return {"msg":"確認密碼錯誤","code":403}
- return {"msg": "create user failed", "code": 403}
- def generate_password_reset_token(email: str) -> str:
- delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
- now = datetime.utcnow()
- expires = now + delta
- exp = expires.timestamp()
- encoded_jwt = jwt.encode(
- {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",)
- print(encoded_jwt)
- return encoded_jwt
- def verify_password_reset_token(token: str):
- try:
- decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
- print(decoded_token)
- return decoded_token["sub"]
- except jwt.JWTError:
- return None
- def send_email(
- email_to: str,
- subject_template: str = "",
- html_template: str = "",
- environment: Dict[str, Any] = {},
- ):
- message = emails.Message(
- subject=JinjaTemplate(subject_template),
- html=JinjaTemplate(html_template),
- mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
- )
- smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
- if settings.SMTP_TLS:
- smtp_options["tls"] = True
- if settings.SMTP_USER:
- smtp_options["user"] = settings.SMTP_USER
- if settings.SMTP_PASSWORD:
- smtp_options["password"] = settings.SMTP_PASSWORD
- response = message.send(to=email_to, render=environment, smtp=smtp_options)
- return {"message":f"send email result: {response}"}
- # logging.info(f"send email result: {response}")
- # email_content = MIMEText(message)
- # email_content["Subject"] = subject
- # email_content["From"] = 'zooey@choozmo.com'
- # email_content["To"] = email_to
- # try:
- # print('測試成功')
- # # Connect to the SMTP server
- # smtp_server = smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT)
- # smtp_server.starttls()
- #
- # # Login to the email account
- # smtp_server.login(settings.SMTP_HOST, settings.SMTP_PASSWORD)
- #
- # # Send the email
- # smtp_server.sendmail(settings.SMTP_HOST, email_to, email_content.as_string())
- #
- # # Close the connection
- # smtp_server.quit()
- #
- # return {"message": "Email sent successfully."}
- # except Exception as e:
- # print('測試失敗')
- def send_reset_password_email(email_to: str, email: str, token: str) -> None:
- subject = f"Password recovery for user {email}"
- with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
- template_str = f.read()
- server_host = settings.SERVER_HOST
- link = f"{server_host}/reset-password?token={token}"
- message = '重新設定密碼'
- send_email(
- email_to=email_to,
- subject_template=subject,
- html_template=template_str,
- environment={
- "project_name": settings.PROJECT_NAME,
- "username": email,
- "email": email_to,
- "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
- "link": link,
- },
- )
- @users.post("/password-recovery/{email}")
- async def recover_password(email:str):
- user = await User.filter(email=email).first()
- if not user:
- raise HTTPException(
- status_code=404,
- detail="The user with this username does not exist in the system.",
- )
- password_reset_token = generate_password_reset_token(email=email)
- send_reset_password_email(
- email_to=user.email, email=email, token=password_reset_token
- )
- return {"msg": "Password recovery email sent"}
- from pydantic import BaseModel
- class Msg(BaseModel):
- msg: str
- @users.post("/reset-password/", response_model=Msg)
- async def reset_password(
- token: str = Body(...),
- new_password: str = Body(...),
- db: Session = Depends(deps.get_db),
- ) -> Any:
- """
- Reset password
- """
- email = verify_password_reset_token(token)
- print(email)
- if not email:
- raise HTTPException(status_code=400, detail="Invalid token")
- # user = await query_user(email)
- user = await User.filter(email=email).first()
- if not user:
- raise HTTPException(
- status_code=404,
- detail="The user with this username does not exist in the system.",
- )
- # elif not crud.user.is_active(user):
- # raise HTTPException(status_code=400, detail="Inactive user")
- hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
- user.password = hashed_password
- # db.add(user)
- db.commit()
- print(user.password)
- db.close()
- return {"msg": "Password updated successfully"}
- @users.get("/delete_user/{id}")
- async def delete(id: int):
- if id:
- await User.filter(id=id).delete()
- return {"msg": "success", "code": 200}
- return {"msg": "failed", "code": 400}
- @users.get("/information")
- async def get_information(email:str,is_gmail:int):
- result = await User.filter(email=email, is_gmail=is_gmail).first()
- return {"msg":result, "code":200}
|