123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441 |
- from fastapi import APIRouter, Form, Depends, HTTPException, Body
- from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
- from app.models.models import User, UserPydantic
- from app.api import deps
- from sqlalchemy.orm import Session
- from typing import Any, Dict, Optional
- import secrets
- from fastapi_login.exceptions import InvalidCredentialsException
- from fastapi_login import LoginManager
- from datetime import timedelta,datetime
- from app.config import settings
- from pathlib import Path
- from jose import jwt
- import emails
- from emails.template import JinjaTemplate
- import logging
- import bcrypt
- from app.crud import crud_users
- import smtplib
- from email.mime.text import MIMEText
- from google.oauth2 import id_token
- from google.auth.transport import requests
- from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
- from fastapi.security.utils import get_authorization_scheme_param
- import rpyc
- users = APIRouter()
- SECRET: str = secrets.token_urlsafe(32)
- manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72))
- async def check_token(access_token: str):
- result = await User.filter(token=access_token).first()
-
- if not result:
- print("no access")
- return None
-
- user_id = result.id
- return user_id
- @manager.user_loader()
- async def query_user(user_id: str):
- """
- Get a user from the db
- :param user_id: E-Mail of the user
- :return: None or the user object
- """
- # result = await User.filter(email=user_id,is_active=1).first()
- result = await User.filter(email=user_id).first()
- if not result:
- print('無此筆資料')
- return None
- return result
- #@manager.user_loader()
- async def query_user_username(user_id: str):
- """
- Get a user from the db
- :param user_id: E-Mail of the user
- :return: None or the user object
- """
- result = await User.filter(username=user_id).first()
- if not result:
- print('無此筆資料')
- return None
- return result
- @users.post("/login")
- async def login(data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(deps.get_db), position: str=Form(default='')):
- email = data.username
- password = data.password
- user = await query_user(email)
- user_pydantic = UserPydantic.from_orm(user)
- user_dict = user_pydantic.dict(exclude_unset=True)
- access_token = manager.create_access_token(
- data={'sub': email}
- )
- if not user:
- # you can return any response or error of your choice
- return {"message":"查無此人"}
- # elif password != user.password:
- # raise InvalidCredentialsException
- else:
- user_dict.update({"token":access_token})
- token_update = user.update_from_dict(user_dict)
- await user.save()
- stored_hashed_password = user.password.encode('utf-8')
- if bcrypt.checkpw(password.encode('utf-8'),stored_hashed_password):
- return {'msg':'登入成功','code':'200','access_token': access_token,'username':user.username,'email':user.email,'points':user.points}
- else:
- return {"message": "Invalid username or password"}
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="https://oauth2.googleapis.com/token")
- CLIENT_ID = settings.CLIENT_ID
- @users.post("/login/google/access-token")
- async def login(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), position: str = Form(default=''),
- ) -> Any:
- """
- OAuth2 compatible token login, get an access token for future requests
- """
- access_token = manager.create_access_token(
- data={'sub': username}
- )
- user = await User.filter(email=email).first() # 確認信箱是否已存在
- if not user:
- u = await User.create(username=username, password=password, email=email,token=access_token, is_superuser=0, points=0,is_active =1)
- create_user = u
- # if user:
- # print('已用相同信箱註冊過,再開一個GMAIL帳號')
- # u = await User.create(username=username, password=password,email=email)
- user_pydantic = UserPydantic.from_orm(user)
- user_dict = user_pydantic.dict(exclude_unset=True)
- user_dict.update({"token": access_token})
- token_update = user.update_from_dict(user_dict)
- await user.save()
- return_msg = {
- "access_token": access_token,
- "token_type": "bearer",
- 'code':'200',
- }
- return return_msg
- # if add_time_code:
- # available_ser_no = crud.serial_number.available(db, ser_no=add_time_code)
- # print(available_ser_no)
- # if available_ser_no:
- # user_in = schemas.UserUpdate(available_time=user.available_time + available_ser_no.time)
- # crud.user.update(db, db_obj=user, obj_in=user_in)
- #
- # ser_no_in = schemas.SerialNumberUpdate(code=available_ser_no.code, is_used=True,
- # used_datetime=str(datetime.now()), owner_id=user.id)
- # crud.serial_number.update(db, db_obj=available_ser_no, obj_in=ser_no_in)
- # print(available_ser_no.time, type(available_ser_no.time))
- # return_msg['time_added'] = available_ser_no.time
- # else:
- # return_msg['time_added'] = -1
- # return return_msg
- @users.post("/logout")
- async def logout():
- return {"msg":"logout success","code":200}
- @users.post("/add") # 寄認證信
- async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), re_password: str = Form(default='')):
- if username and password and email:
- user_email = await query_user(email)
- user_username = await query_user_username(username)
- if user_email or user_username:
- return {"msg":"該信箱或使用者名稱已存在","code":403}
- # elif user_username:
- # return {"msg":"該使用者名稱已存在","code":403}
- else:
- if password == re_password:
- # access_token = manager.create_access_token(
- # data={'sub': email}
- # )
- print('前',settings.SECRET_KEY)
- expiration_time = datetime.utcnow() + timedelta(hours=0.5)
- access_token = jwt.encode({'email':email,'exp':expiration_time}, settings.SECRET_KEY, algorithm="HS256")
- hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
- u = await User.create(username=username, password=hashed_password, email=email,is_superuser=0,token=access_token,points=0,is_active=0)
- if u:
- # message = '註冊認證'
- message = f"請點擊以下連結完成註冊流程:\n\nhttps://cmm.ai:8088/api/verify?token={access_token}"
- subject = '註冊信'
- print(message)
- send_email(email,access_token,subject,message)
- return {"msg": "已寄送註冊信", "code": 200}
- else:
- return {"msg": "未寄出註冊信", "code":403}
- else:
- return {"msg":"確認密碼錯誤","code":403}
- return {"msg": "create user failed", "code": 403}
- @users.get("/verify") # 註冊認證確認
- async def verify_email(token:str):
- try:
- print(token)
- print('後',settings.SECRET_KEY)
- payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
- print('解密結果',payload)
- email = payload["email"]
- # is_active True
- user = await User.filter(email=email,is_active=0).first()
- if user:
- user_pydantic = UserPydantic.from_orm(user)
- user_dict = user_pydantic.dict(exclude_unset=True)
- user_dict.update({"is_active": 1})
- is_active_update = user.update_from_dict(user_dict)
- await user.save()
- result = '信箱驗證成功'
- else:
- result = '信箱驗證失敗'
- return {"message": result}
- except:
- raise HTTPException(status_code=400)
- # except jwt.ExpiredSignatureError:
- # raise HTTPException(status_code=400, detail="token已失效")
- # except jwt.DecodeError:
- # raise HTTPException(status_code=400, detail="無效token")
- def generate_password_reset_token(email: str) -> str:
- delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
- now = datetime.utcnow()
- expires = now + delta
- exp = expires.timestamp()
- encoded_jwt = jwt.encode(
- {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",)
- print(encoded_jwt)
- return encoded_jwt
- def verify_password_reset_token(token: str):
- try:
- decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
- print(decoded_token)
- return decoded_token["sub"]
- except jwt.JWTError:
- return None
- def send_email(
- email_to: str,
- token: str,
- subject_template: str = "",
- html_template: str = "",
- environment: Dict[str, Any] = {},
- ):
- # message = emails.Message(
- # subject=JinjaTemplate(subject_template),
- # html=JinjaTemplate(html_template),
- # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
- # )
-
- subject=subject_template
- html=html_template
- mailobj={}
- mailobj['toaddr']=email_to
- mailobj['title']=subject
- mailobj['totext']=html
- conn = rpyc.connect("192.168.192.80", 12345)
- conn.root.mailto(mailobj)
- return {"message":f"send email"}
- #message = emails.Message(
- # subject=JinjaTemplate(subject_template),
- # html=JinjaTemplate(html_template),
- # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
- #)
- #smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
- #if settings.SMTP_TLS:
- # smtp_options["tls"] = True
- #if settings.SMTP_USER:
- # smtp_options["user"] = settings.SMTP_USER
- #if settings.SMTP_PASSWORD:
- # smtp_options["password"] = settings.SMTP_PASSWORD
- #response = message.send(to=email_to, render=environment, smtp=smtp_options)
- #print('RResponse',response)
- #return {"message":f"send email result: {response}"}
- # logging.info(f"send email result: {response}")
- # email_content = MIMEText(message)
- # email_content["Subject"] = subject
- # email_content["From"] = 'zooey@choozmo.com'
- # email_content["To"] = email_to
- # try:
- # print('測試成功')
- # # Connect to the SMTP server
- # smtp_server = smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT)
- # smtp_server.starttls()
- #
- # # Login to the email account
- # smtp_server.login(settings.SMTP_HOST, settings.SMTP_PASSWORD)
- #
- # # Send the email
- # smtp_server.sendmail(settings.SMTP_HOST, email_to, email_content.as_string())
- #
- # # Close the connection
- # smtp_server.quit()
- #
- # return {"message": "Email sent successfully."}
- # except Exception as e:
- # print('測試失敗')
- def create_singup_url():
- url=''
- return url
- def send_reset_password_email(email_to: str, email: str, token: str) -> None:
- subject = f"Password recovery for user {email}"
- with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
- template_str = f.read()
- server_host = settings.SERVER_HOST
- link = f"{server_host}/reset-password?token={token}"
- message = '重新設定密碼'
- send_email(
- email_to=email_to,
- subject_template=subject,
- html_template=template_str,
- environment={
- "project_name": settings.PROJECT_NAME,
- "username": email,
- "email": email_to,
- "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
- "link": link,
- },
- )
- @users.post("/password-recovery/{email}")
- async def recover_password(email:str):
- user = await User.filter(email=email).first()
- if not user:
- raise HTTPException(
- status_code=404,
- detail="The user with this username does not exist in the system.",
- )
- password_reset_token = generate_password_reset_token(email=email)
- send_reset_password_email(
- email_to=user.email, email=email, token=password_reset_token
- )
- return {"msg": "Password recovery email sent"}
- from pydantic import BaseModel
- class Msg(BaseModel):
- msg: str
- @users.post("/reset-password/", response_model=Msg)
- async def reset_password(
- token: str = Body(...),
- new_password: str = Body(...),
- db: Session = Depends(deps.get_db),
- ) -> Any:
- """
- Reset password
- """
- email = verify_password_reset_token(token)
- print(email)
- if not email:
- raise HTTPException(status_code=400, detail="Invalid token")
- # user = await query_user(email)
- user = await User.filter(email=email).first()
- if not user:
- raise HTTPException(
- status_code=404,
- detail="The user with this username does not exist in the system.",
- )
- # elif not crud.user.is_active(user):
- # raise HTTPException(status_code=400, detail="Inactive user")
- hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
- user.password = hashed_password
- # db.add(user)
- db.commit()
- print(user.password)
- db.close()
- return {"msg": "Password updated successfully"}
- @users.get("/delete_user/{id}")
- async def delete(
- id: int,
- user_id = Depends(check_token)
- ):
- if not user_id :
- return {"msg": "no exit", "code": 200}
- user = await User.get(id=user_id)
- if user.is_superuser != 2:
- return {"msg": "no access", "code": 200}
- if id:
- await User.filter(id=id).delete()
- return {"msg": "success", "code": 200}
- return {"msg": "failed", "code": 400}
- @users.get("/information")
- async def get_information(token:str):
- result = await User.filter(token=token).first()
- return {"msg":result, "code":200}
- @users.get("/protect")
- def protected_route(user=Depends(manager)):
- if user is None:
- return {'message': "no access"}
- return {'user': user}
- @users.get("/check_user")
- async def check_user(
- user_id = Depends(check_token)
- ):
- user = await User.get(id=user_id)
- return {"msg": "success", "code": 200,"is_super":user.is_superuser}
|