from fastapi import APIRouter, Form, Depends, HTTPException, Body from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer from app.models.models import User, UserPydantic from app.api import deps from sqlalchemy.orm import Session from typing import Any, Dict, Optional import secrets from fastapi_login.exceptions import InvalidCredentialsException from fastapi_login import LoginManager from datetime import timedelta,datetime from app.config import settings from pathlib import Path from jose import jwt import emails from emails.template import JinjaTemplate import logging import bcrypt from app.crud import crud_users import smtplib from email.mime.text import MIMEText from google.oauth2 import id_token from google.auth.transport import requests from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel from fastapi.security.utils import get_authorization_scheme_param import rpyc users = APIRouter() SECRET: str = secrets.token_urlsafe(32) manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72)) async def check_token(access_token: str): result = await User.filter(token=access_token).first() if not result: print("no access") return None user_id = result.id return user_id @manager.user_loader() async def query_user(user_id: str): """ Get a user from the db :param user_id: E-Mail of the user :return: None or the user object """ # result = await User.filter(email=user_id,is_active=1).first() result = await User.filter(email=user_id).first() if not result: print('無此筆資料') return None return result #@manager.user_loader() async def query_user_username(user_id: str): """ Get a user from the db :param user_id: E-Mail of the user :return: None or the user object """ result = await User.filter(username=user_id).first() if not result: print('無此筆資料') return None return result @users.post("/login") async def login(data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(deps.get_db), position: str=Form(default='')): email = data.username password = data.password user = await query_user(email) user_pydantic = UserPydantic.from_orm(user) user_dict = user_pydantic.dict(exclude_unset=True) access_token = manager.create_access_token( data={'sub': email} ) if not user: # you can return any response or error of your choice return {"message":"查無此人"} # elif password != user.password: # raise InvalidCredentialsException else: user_dict.update({"token":access_token}) token_update = user.update_from_dict(user_dict) await user.save() stored_hashed_password = user.password.encode('utf-8') if bcrypt.checkpw(password.encode('utf-8'),stored_hashed_password): return {'msg':'登入成功','code':'200','access_token': access_token,'username':user.username,'email':user.email,'points':user.points} else: return {"message": "Invalid username or password"} oauth2_scheme = OAuth2PasswordBearer(tokenUrl="https://oauth2.googleapis.com/token") CLIENT_ID = settings.CLIENT_ID @users.post("/login/google/access-token") async def login(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), position: str = Form(default=''), ) -> Any: """ OAuth2 compatible token login, get an access token for future requests """ access_token = manager.create_access_token( data={'sub': username} ) user = await User.filter(email=email).first() # 確認信箱是否已存在 if not user: u = await User.create(username=username, password=password, email=email,token=access_token, is_superuser=0, points=0,is_active =1) create_user = u # if user: # print('已用相同信箱註冊過,再開一個GMAIL帳號') # u = await User.create(username=username, password=password,email=email) user_pydantic = UserPydantic.from_orm(user) user_dict = user_pydantic.dict(exclude_unset=True) user_dict.update({"token": access_token}) token_update = user.update_from_dict(user_dict) await user.save() return_msg = { "access_token": access_token, "token_type": "bearer", 'code':'200', } return return_msg # if add_time_code: # available_ser_no = crud.serial_number.available(db, ser_no=add_time_code) # print(available_ser_no) # if available_ser_no: # user_in = schemas.UserUpdate(available_time=user.available_time + available_ser_no.time) # crud.user.update(db, db_obj=user, obj_in=user_in) # # ser_no_in = schemas.SerialNumberUpdate(code=available_ser_no.code, is_used=True, # used_datetime=str(datetime.now()), owner_id=user.id) # crud.serial_number.update(db, db_obj=available_ser_no, obj_in=ser_no_in) # print(available_ser_no.time, type(available_ser_no.time)) # return_msg['time_added'] = available_ser_no.time # else: # return_msg['time_added'] = -1 # return return_msg @users.post("/logout") async def logout(): return {"msg":"logout success","code":200} @users.post("/add") # 寄認證信 async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), re_password: str = Form(default='')): if username and password and email: user_email = await query_user(email) user_username = await query_user_username(username) if user_email or user_username: return {"msg":"該信箱或使用者名稱已存在","code":403} # elif user_username: # return {"msg":"該使用者名稱已存在","code":403} else: if password == re_password: # access_token = manager.create_access_token( # data={'sub': email} # ) print('前',settings.SECRET_KEY) expiration_time = datetime.utcnow() + timedelta(hours=0.5) access_token = jwt.encode({'email':email,'exp':expiration_time}, settings.SECRET_KEY, algorithm="HS256") hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') u = await User.create(username=username, password=hashed_password, email=email,is_superuser=0,token=access_token,points=0,is_active=0) if u: # message = '註冊認證' message = f"請點擊以下連結完成註冊流程:\n\nhttps://cmm.ai:8088/api/verify?token={access_token}" subject = '註冊信' print(message) send_email(email,access_token,subject,message) return {"msg": "已寄送註冊信", "code": 200} else: return {"msg": "未寄出註冊信", "code":403} else: return {"msg":"確認密碼錯誤","code":403} return {"msg": "create user failed", "code": 403} @users.get("/verify") # 註冊認證確認 async def verify_email(token:str): try: print(token) print('後',settings.SECRET_KEY) payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) print('解密結果',payload) email = payload["email"] # is_active True user = await User.filter(email=email,is_active=0).first() if user: user_pydantic = UserPydantic.from_orm(user) user_dict = user_pydantic.dict(exclude_unset=True) user_dict.update({"is_active": 1}) is_active_update = user.update_from_dict(user_dict) await user.save() result = '信箱驗證成功' else: result = '信箱驗證失敗' return {"message": result} except: raise HTTPException(status_code=400) # except jwt.ExpiredSignatureError: # raise HTTPException(status_code=400, detail="token已失效") # except jwt.DecodeError: # raise HTTPException(status_code=400, detail="無效token") def generate_password_reset_token(email: str) -> str: delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS) now = datetime.utcnow() expires = now + delta exp = expires.timestamp() encoded_jwt = jwt.encode( {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",) print(encoded_jwt) return encoded_jwt def verify_password_reset_token(token: str): try: decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) print(decoded_token) return decoded_token["sub"] except jwt.JWTError: return None def send_email( email_to: str, token: str, subject_template: str = "", html_template: str = "", environment: Dict[str, Any] = {}, ): # message = emails.Message( # subject=JinjaTemplate(subject_template), # html=JinjaTemplate(html_template), # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL), # ) subject=subject_template html=html_template mailobj={} mailobj['toaddr']=email_to mailobj['title']=subject mailobj['totext']=html conn = rpyc.connect("192.168.192.80", 12345) conn.root.mailto(mailobj) return {"message":f"send email"} #message = emails.Message( # subject=JinjaTemplate(subject_template), # html=JinjaTemplate(html_template), # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL), #) #smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT} #if settings.SMTP_TLS: # smtp_options["tls"] = True #if settings.SMTP_USER: # smtp_options["user"] = settings.SMTP_USER #if settings.SMTP_PASSWORD: # smtp_options["password"] = settings.SMTP_PASSWORD #response = message.send(to=email_to, render=environment, smtp=smtp_options) #print('RResponse',response) #return {"message":f"send email result: {response}"} # logging.info(f"send email result: {response}") # email_content = MIMEText(message) # email_content["Subject"] = subject # email_content["From"] = 'zooey@choozmo.com' # email_content["To"] = email_to # try: # print('測試成功') # # Connect to the SMTP server # smtp_server = smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT) # smtp_server.starttls() # # # Login to the email account # smtp_server.login(settings.SMTP_HOST, settings.SMTP_PASSWORD) # # # Send the email # smtp_server.sendmail(settings.SMTP_HOST, email_to, email_content.as_string()) # # # Close the connection # smtp_server.quit() # # return {"message": "Email sent successfully."} # except Exception as e: # print('測試失敗') def create_singup_url(): url='' return url def send_reset_password_email(email_to: str, email: str, token: str) -> None: subject = f"Password recovery for user {email}" with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f: template_str = f.read() server_host = settings.SERVER_HOST link = f"{server_host}/reset-password?token={token}" message = '重新設定密碼' send_email( email_to=email_to, subject_template=subject, html_template=template_str, environment={ "project_name": settings.PROJECT_NAME, "username": email, "email": email_to, "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS, "link": link, }, ) @users.post("/password-recovery/{email}") async def recover_password(email:str): user = await User.filter(email=email).first() if not user: raise HTTPException( status_code=404, detail="The user with this username does not exist in the system.", ) password_reset_token = generate_password_reset_token(email=email) send_reset_password_email( email_to=user.email, email=email, token=password_reset_token ) return {"msg": "Password recovery email sent"} from pydantic import BaseModel class Msg(BaseModel): msg: str @users.post("/reset-password/", response_model=Msg) async def reset_password( token: str = Body(...), new_password: str = Body(...), db: Session = Depends(deps.get_db), ) -> Any: """ Reset password """ email = verify_password_reset_token(token) print(email) if not email: raise HTTPException(status_code=400, detail="Invalid token") # user = await query_user(email) user = await User.filter(email=email).first() if not user: raise HTTPException( status_code=404, detail="The user with this username does not exist in the system.", ) # elif not crud.user.is_active(user): # raise HTTPException(status_code=400, detail="Inactive user") hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') user.password = hashed_password # db.add(user) db.commit() print(user.password) db.close() return {"msg": "Password updated successfully"} @users.get("/delete_user/{id}") async def delete( id: int, user_id = Depends(check_token) ): if not user_id : return {"msg": "no exit", "code": 200} user = await User.get(id=user_id) if user.is_superuser != 2: return {"msg": "no access", "code": 200} if id: await User.filter(id=id).delete() return {"msg": "success", "code": 200} return {"msg": "failed", "code": 400} @users.get("/information") async def get_information(token:str): result = await User.filter(token=token).first() return {"msg":result, "code":200} @users.get("/protect") def protected_route(user=Depends(manager)): if user is None: return {'message': "no access"} return {'user': user} @users.get("/check_user") async def check_user( user_id = Depends(check_token) ): user = await User.get(id=user_id) return {"msg": "success", "code": 200,"is_super":user.is_superuser}