|
@@ -1,383 +1,441 @@
|
|
|
-from fastapi import APIRouter, Form, Depends, HTTPException, Body
|
|
|
-from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
|
|
|
-from app.models.models import User, UserPydantic
|
|
|
-from app.api import deps
|
|
|
-from sqlalchemy.orm import Session
|
|
|
-from typing import Any, Dict, Optional
|
|
|
-import secrets
|
|
|
-from fastapi_login.exceptions import InvalidCredentialsException
|
|
|
-from fastapi_login import LoginManager
|
|
|
-from datetime import timedelta,datetime
|
|
|
-from app.config import settings
|
|
|
-from pathlib import Path
|
|
|
-from jose import jwt
|
|
|
-import emails
|
|
|
-from emails.template import JinjaTemplate
|
|
|
-import logging
|
|
|
-import bcrypt
|
|
|
-
|
|
|
-from app.crud import crud_users
|
|
|
-import smtplib
|
|
|
-from email.mime.text import MIMEText
|
|
|
-from google.oauth2 import id_token
|
|
|
-from google.auth.transport import requests
|
|
|
-from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
|
|
|
-from fastapi.security.utils import get_authorization_scheme_param
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-users = APIRouter()
|
|
|
-
|
|
|
-SECRET: str = secrets.token_urlsafe(32)
|
|
|
-manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72))
|
|
|
-
|
|
|
-
|
|
|
-@manager.user_loader()
|
|
|
-async def query_user(user_id: str):
|
|
|
- """
|
|
|
- Get a user from the db
|
|
|
- :param user_id: E-Mail of the user
|
|
|
- :return: None or the user object
|
|
|
- """
|
|
|
- # result = await User.filter(email=user_id,is_active=1).first()
|
|
|
- result = await User.filter(email=user_id).first()
|
|
|
-
|
|
|
- if not result:
|
|
|
- print('無此筆資料')
|
|
|
- return None
|
|
|
- return result
|
|
|
-
|
|
|
-
|
|
|
-#@manager.user_loader()
|
|
|
-async def query_user_username(user_id: str):
|
|
|
- """
|
|
|
- Get a user from the db
|
|
|
- :param user_id: E-Mail of the user
|
|
|
- :return: None or the user object
|
|
|
- """
|
|
|
- result = await User.filter(username=user_id).first()
|
|
|
- if not result:
|
|
|
- print('無此筆資料')
|
|
|
- return None
|
|
|
-
|
|
|
- return result
|
|
|
-
|
|
|
-
|
|
|
-@users.post("/login")
|
|
|
-async def login(data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(deps.get_db), position: str=Form(default='')):
|
|
|
-
|
|
|
- email = data.username
|
|
|
- password = data.password
|
|
|
-
|
|
|
- user = await query_user(email)
|
|
|
- user_pydantic = UserPydantic.from_orm(user)
|
|
|
- user_dict = user_pydantic.dict(exclude_unset=True)
|
|
|
- access_token = manager.create_access_token(
|
|
|
- data={'sub': email}
|
|
|
- )
|
|
|
-
|
|
|
- if not user:
|
|
|
- # you can return any response or error of your choice
|
|
|
- return {"message":"查無此人"}
|
|
|
-
|
|
|
- # elif password != user.password:
|
|
|
- # raise InvalidCredentialsException
|
|
|
- else:
|
|
|
- user_dict.update({"token":access_token})
|
|
|
- token_update = user.update_from_dict(user_dict)
|
|
|
- await user.save()
|
|
|
- stored_hashed_password = user.password.encode('utf-8')
|
|
|
- if bcrypt.checkpw(password.encode('utf-8'),stored_hashed_password):
|
|
|
- return {'msg':'登入成功','code':'200','access_token': access_token,'username':user.username,'email':user.email,'points':user.points}
|
|
|
- else:
|
|
|
- return {"message": "Invalid username or password"}
|
|
|
-
|
|
|
-oauth2_scheme = OAuth2PasswordBearer(tokenUrl="https://oauth2.googleapis.com/token")
|
|
|
-
|
|
|
-CLIENT_ID = settings.CLIENT_ID
|
|
|
-@users.post("/login/google/access-token")
|
|
|
-async def login(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), position: str = Form(default=''),
|
|
|
- ) -> Any:
|
|
|
- """
|
|
|
- OAuth2 compatible token login, get an access token for future requests
|
|
|
- """
|
|
|
- access_token = manager.create_access_token(
|
|
|
- data={'sub': username}
|
|
|
- )
|
|
|
- user = await User.filter(email=email).first() # 確認信箱是否已存在
|
|
|
- if not user:
|
|
|
- u = await User.create(username=username, password=password, email=email,token=access_token, is_superuser=0, points=0,is_active =1)
|
|
|
- create_user = u
|
|
|
- # if user:
|
|
|
- # print('已用相同信箱註冊過,再開一個GMAIL帳號')
|
|
|
- # u = await User.create(username=username, password=password,email=email)
|
|
|
- user_pydantic = UserPydantic.from_orm(user)
|
|
|
- user_dict = user_pydantic.dict(exclude_unset=True)
|
|
|
- user_dict.update({"token": access_token})
|
|
|
- token_update = user.update_from_dict(user_dict)
|
|
|
- await user.save()
|
|
|
-
|
|
|
- return_msg = {
|
|
|
- "access_token": access_token,
|
|
|
- "token_type": "bearer",
|
|
|
- 'code':'200',
|
|
|
- }
|
|
|
- return return_msg
|
|
|
- # if add_time_code:
|
|
|
- # available_ser_no = crud.serial_number.available(db, ser_no=add_time_code)
|
|
|
- # print(available_ser_no)
|
|
|
- # if available_ser_no:
|
|
|
- # user_in = schemas.UserUpdate(available_time=user.available_time + available_ser_no.time)
|
|
|
- # crud.user.update(db, db_obj=user, obj_in=user_in)
|
|
|
- #
|
|
|
- # ser_no_in = schemas.SerialNumberUpdate(code=available_ser_no.code, is_used=True,
|
|
|
- # used_datetime=str(datetime.now()), owner_id=user.id)
|
|
|
- # crud.serial_number.update(db, db_obj=available_ser_no, obj_in=ser_no_in)
|
|
|
- # print(available_ser_no.time, type(available_ser_no.time))
|
|
|
- # return_msg['time_added'] = available_ser_no.time
|
|
|
- # else:
|
|
|
- # return_msg['time_added'] = -1
|
|
|
- # return return_msg
|
|
|
-
|
|
|
-@users.post("/logout")
|
|
|
-async def logout():
|
|
|
-
|
|
|
- return {"msg":"logout success","code":200}
|
|
|
-
|
|
|
-@users.post("/add") # 寄認證信
|
|
|
-async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), re_password: str = Form(default='')):
|
|
|
- if username and password and email:
|
|
|
- user_email = await query_user(email)
|
|
|
- user_username = await query_user_username(username)
|
|
|
-
|
|
|
- if user_email or user_username:
|
|
|
- return {"msg":"該信箱或使用者名稱已存在","code":403}
|
|
|
- # elif user_username:
|
|
|
- # return {"msg":"該使用者名稱已存在","code":403}
|
|
|
- else:
|
|
|
- if password == re_password:
|
|
|
- # access_token = manager.create_access_token(
|
|
|
- # data={'sub': email}
|
|
|
- # )
|
|
|
- print('前',settings.SECRET_KEY)
|
|
|
- expiration_time = datetime.utcnow() + timedelta(hours=0.5)
|
|
|
- access_token = jwt.encode({'email':email,'exp':expiration_time}, settings.SECRET_KEY, algorithm="HS256")
|
|
|
- hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
|
|
- u = await User.create(username=username, password=hashed_password, email=email,is_superuser=0,token=access_token,points=0,is_active=1)
|
|
|
- if u:
|
|
|
- # message = '註冊認證'
|
|
|
- message = f"請點擊以下連結完成註冊流程:\n\nhttps://cmm.ai:8088/api/verify?token={access_token}"
|
|
|
- subject = '註冊信'
|
|
|
- print(message)
|
|
|
- send_email(email,access_token,subject,message)
|
|
|
-
|
|
|
- return {"msg": "已寄送註冊信", "code": 200}
|
|
|
- else:
|
|
|
- return {"msg": "未寄出註冊信", "code":403}
|
|
|
- else:
|
|
|
- return {"msg":"確認密碼錯誤","code":403}
|
|
|
-
|
|
|
- return {"msg": "create user failed", "code": 403}
|
|
|
-
|
|
|
-
|
|
|
-@users.get("/verify") # 註冊認證確認
|
|
|
-async def verify_email(token:str):
|
|
|
- try:
|
|
|
- print(token)
|
|
|
- print('後',settings.SECRET_KEY)
|
|
|
- payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
|
|
|
- print('解密結果',payload)
|
|
|
- email = payload["email"]
|
|
|
- # is_active True
|
|
|
- user = await User.filter(email=email,is_active=0).first()
|
|
|
- if user:
|
|
|
- user_pydantic = UserPydantic.from_orm(user)
|
|
|
- user_dict = user_pydantic.dict(exclude_unset=True)
|
|
|
- user_dict.update({"is_active": 1})
|
|
|
- is_active_update = user.update_from_dict(user_dict)
|
|
|
- await user.save()
|
|
|
- result = '信箱驗證成功'
|
|
|
- else:
|
|
|
- result = '信箱驗證失敗'
|
|
|
-
|
|
|
- return {"message": result}
|
|
|
- except:
|
|
|
- raise HTTPException(status_code=400)
|
|
|
- # except jwt.ExpiredSignatureError:
|
|
|
- # raise HTTPException(status_code=400, detail="token已失效")
|
|
|
- # except jwt.DecodeError:
|
|
|
- # raise HTTPException(status_code=400, detail="無效token")
|
|
|
-
|
|
|
-
|
|
|
-def generate_password_reset_token(email: str) -> str:
|
|
|
- delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
|
|
|
- now = datetime.utcnow()
|
|
|
- expires = now + delta
|
|
|
- exp = expires.timestamp()
|
|
|
- encoded_jwt = jwt.encode(
|
|
|
- {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",)
|
|
|
- print(encoded_jwt)
|
|
|
- return encoded_jwt
|
|
|
-
|
|
|
-def verify_password_reset_token(token: str):
|
|
|
- try:
|
|
|
- decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
|
|
|
- print(decoded_token)
|
|
|
- return decoded_token["sub"]
|
|
|
- except jwt.JWTError:
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
-def send_email(
|
|
|
- email_to: str,
|
|
|
- token: str,
|
|
|
- subject_template: str = "",
|
|
|
- html_template: str = "",
|
|
|
- environment: Dict[str, Any] = {},
|
|
|
-):
|
|
|
- # message = emails.Message(
|
|
|
- # subject=JinjaTemplate(subject_template),
|
|
|
- # html=JinjaTemplate(html_template),
|
|
|
- # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
|
|
|
- # )
|
|
|
-
|
|
|
- message = emails.Message(
|
|
|
- subject=JinjaTemplate(subject_template),
|
|
|
- html=JinjaTemplate(html_template),
|
|
|
- mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
|
|
|
- )
|
|
|
- smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
|
|
|
- if settings.SMTP_TLS:
|
|
|
- smtp_options["tls"] = True
|
|
|
- if settings.SMTP_USER:
|
|
|
- smtp_options["user"] = settings.SMTP_USER
|
|
|
- if settings.SMTP_PASSWORD:
|
|
|
- smtp_options["password"] = settings.SMTP_PASSWORD
|
|
|
- response = message.send(to=email_to, render=environment, smtp=smtp_options)
|
|
|
- print('RResponse',response)
|
|
|
- return {"message":f"send email result: {response}"}
|
|
|
- # logging.info(f"send email result: {response}")
|
|
|
- # email_content = MIMEText(message)
|
|
|
- # email_content["Subject"] = subject
|
|
|
- # email_content["From"] = 'zooey@choozmo.com'
|
|
|
- # email_content["To"] = email_to
|
|
|
- # try:
|
|
|
- # print('測試成功')
|
|
|
- # # Connect to the SMTP server
|
|
|
- # smtp_server = smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT)
|
|
|
- # smtp_server.starttls()
|
|
|
- #
|
|
|
- # # Login to the email account
|
|
|
- # smtp_server.login(settings.SMTP_HOST, settings.SMTP_PASSWORD)
|
|
|
- #
|
|
|
- # # Send the email
|
|
|
- # smtp_server.sendmail(settings.SMTP_HOST, email_to, email_content.as_string())
|
|
|
- #
|
|
|
- # # Close the connection
|
|
|
- # smtp_server.quit()
|
|
|
- #
|
|
|
- # return {"message": "Email sent successfully."}
|
|
|
- # except Exception as e:
|
|
|
- # print('測試失敗')
|
|
|
-
|
|
|
-
|
|
|
-def create_singup_url():
|
|
|
- url=''
|
|
|
- return url
|
|
|
-
|
|
|
-def send_reset_password_email(email_to: str, email: str, token: str) -> None:
|
|
|
- subject = f"Password recovery for user {email}"
|
|
|
- with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
|
|
|
- template_str = f.read()
|
|
|
- server_host = settings.SERVER_HOST
|
|
|
- link = f"{server_host}/reset-password?token={token}"
|
|
|
- message = '重新設定密碼'
|
|
|
- send_email(
|
|
|
- email_to=email_to,
|
|
|
- subject_template=subject,
|
|
|
- html_template=template_str,
|
|
|
- environment={
|
|
|
- "project_name": settings.PROJECT_NAME,
|
|
|
- "username": email,
|
|
|
- "email": email_to,
|
|
|
- "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
|
|
|
- "link": link,
|
|
|
- },
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
-@users.post("/password-recovery/{email}")
|
|
|
-async def recover_password(email:str):
|
|
|
- user = await User.filter(email=email).first()
|
|
|
- if not user:
|
|
|
- raise HTTPException(
|
|
|
- status_code=404,
|
|
|
- detail="The user with this username does not exist in the system.",
|
|
|
- )
|
|
|
- password_reset_token = generate_password_reset_token(email=email)
|
|
|
- send_reset_password_email(
|
|
|
- email_to=user.email, email=email, token=password_reset_token
|
|
|
- )
|
|
|
- return {"msg": "Password recovery email sent"}
|
|
|
-
|
|
|
-
|
|
|
-from pydantic import BaseModel
|
|
|
-
|
|
|
-
|
|
|
-class Msg(BaseModel):
|
|
|
- msg: str
|
|
|
-@users.post("/reset-password/", response_model=Msg)
|
|
|
-async def reset_password(
|
|
|
- token: str = Body(...),
|
|
|
- new_password: str = Body(...),
|
|
|
- db: Session = Depends(deps.get_db),
|
|
|
-) -> Any:
|
|
|
- """
|
|
|
- Reset password
|
|
|
- """
|
|
|
- email = verify_password_reset_token(token)
|
|
|
- print(email)
|
|
|
- if not email:
|
|
|
- raise HTTPException(status_code=400, detail="Invalid token")
|
|
|
- # user = await query_user(email)
|
|
|
- user = await User.filter(email=email).first()
|
|
|
-
|
|
|
- if not user:
|
|
|
- raise HTTPException(
|
|
|
- status_code=404,
|
|
|
- detail="The user with this username does not exist in the system.",
|
|
|
- )
|
|
|
- # elif not crud.user.is_active(user):
|
|
|
- # raise HTTPException(status_code=400, detail="Inactive user")
|
|
|
- hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
|
|
- user.password = hashed_password
|
|
|
- # db.add(user)
|
|
|
- db.commit()
|
|
|
- print(user.password)
|
|
|
- db.close()
|
|
|
- return {"msg": "Password updated successfully"}
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-@users.get("/delete_user/{id}")
|
|
|
-async def delete(id: int):
|
|
|
- if id:
|
|
|
- await User.filter(id=id).delete()
|
|
|
- return {"msg": "success", "code": 200}
|
|
|
-
|
|
|
- return {"msg": "failed", "code": 400}
|
|
|
-
|
|
|
-
|
|
|
-@users.get("/information")
|
|
|
-async def get_information(token:str):
|
|
|
- result = await User.filter(token=token).first()
|
|
|
- return {"msg":result, "code":200}
|
|
|
-
|
|
|
-@users.get("/protect")
|
|
|
-def protected_route(user=Depends(manager)):
|
|
|
- if user is None:
|
|
|
- return {'message': "no access"}
|
|
|
- return {'user': user}
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+from fastapi import APIRouter, Form, Depends, HTTPException, Body
|
|
|
+from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
|
|
|
+from app.models.models import User, UserPydantic
|
|
|
+from app.api import deps
|
|
|
+from sqlalchemy.orm import Session
|
|
|
+from typing import Any, Dict, Optional
|
|
|
+import secrets
|
|
|
+from fastapi_login.exceptions import InvalidCredentialsException
|
|
|
+from fastapi_login import LoginManager
|
|
|
+from datetime import timedelta,datetime
|
|
|
+from app.config import settings
|
|
|
+from pathlib import Path
|
|
|
+from jose import jwt
|
|
|
+import emails
|
|
|
+from emails.template import JinjaTemplate
|
|
|
+import logging
|
|
|
+import bcrypt
|
|
|
+
|
|
|
+from app.crud import crud_users
|
|
|
+import smtplib
|
|
|
+from email.mime.text import MIMEText
|
|
|
+from google.oauth2 import id_token
|
|
|
+from google.auth.transport import requests
|
|
|
+from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
|
|
|
+from fastapi.security.utils import get_authorization_scheme_param
|
|
|
+import rpyc
|
|
|
+
|
|
|
+
|
|
|
+users = APIRouter()
|
|
|
+
|
|
|
+SECRET: str = secrets.token_urlsafe(32)
|
|
|
+manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72))
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+async def check_token(access_token: str):
|
|
|
+
|
|
|
+ result = await User.filter(token=access_token).first()
|
|
|
+
|
|
|
+ if not result:
|
|
|
+ print("no access")
|
|
|
+ return None
|
|
|
+
|
|
|
+ user_id = result.id
|
|
|
+
|
|
|
+ return user_id
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+@manager.user_loader()
|
|
|
+async def query_user(user_id: str):
|
|
|
+ """
|
|
|
+ Get a user from the db
|
|
|
+ :param user_id: E-Mail of the user
|
|
|
+ :return: None or the user object
|
|
|
+ """
|
|
|
+ # result = await User.filter(email=user_id,is_active=1).first()
|
|
|
+ result = await User.filter(email=user_id).first()
|
|
|
+
|
|
|
+ if not result:
|
|
|
+ print('無此筆資料')
|
|
|
+ return None
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+#@manager.user_loader()
|
|
|
+async def query_user_username(user_id: str):
|
|
|
+ """
|
|
|
+ Get a user from the db
|
|
|
+ :param user_id: E-Mail of the user
|
|
|
+ :return: None or the user object
|
|
|
+ """
|
|
|
+ result = await User.filter(username=user_id).first()
|
|
|
+ if not result:
|
|
|
+ print('無此筆資料')
|
|
|
+ return None
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+@users.post("/login")
|
|
|
+async def login(data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(deps.get_db), position: str=Form(default='')):
|
|
|
+
|
|
|
+ email = data.username
|
|
|
+ password = data.password
|
|
|
+
|
|
|
+ user = await query_user(email)
|
|
|
+ user_pydantic = UserPydantic.from_orm(user)
|
|
|
+ user_dict = user_pydantic.dict(exclude_unset=True)
|
|
|
+ access_token = manager.create_access_token(
|
|
|
+ data={'sub': email}
|
|
|
+ )
|
|
|
+
|
|
|
+ if not user:
|
|
|
+ # you can return any response or error of your choice
|
|
|
+ return {"message":"查無此人"}
|
|
|
+
|
|
|
+ # elif password != user.password:
|
|
|
+ # raise InvalidCredentialsException
|
|
|
+ else:
|
|
|
+ user_dict.update({"token":access_token})
|
|
|
+ token_update = user.update_from_dict(user_dict)
|
|
|
+ await user.save()
|
|
|
+ stored_hashed_password = user.password.encode('utf-8')
|
|
|
+ if bcrypt.checkpw(password.encode('utf-8'),stored_hashed_password):
|
|
|
+ return {'msg':'登入成功','code':'200','access_token': access_token,'username':user.username,'email':user.email,'points':user.points}
|
|
|
+ else:
|
|
|
+ return {"message": "Invalid username or password"}
|
|
|
+
|
|
|
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="https://oauth2.googleapis.com/token")
|
|
|
+
|
|
|
+CLIENT_ID = settings.CLIENT_ID
|
|
|
+@users.post("/login/google/access-token")
|
|
|
+async def login(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), position: str = Form(default=''),
|
|
|
+ ) -> Any:
|
|
|
+ """
|
|
|
+ OAuth2 compatible token login, get an access token for future requests
|
|
|
+ """
|
|
|
+ access_token = manager.create_access_token(
|
|
|
+ data={'sub': username}
|
|
|
+ )
|
|
|
+ user = await User.filter(email=email).first() # 確認信箱是否已存在
|
|
|
+ if not user:
|
|
|
+ u = await User.create(username=username, password=password, email=email,token=access_token, is_superuser=0, points=0,is_active =1)
|
|
|
+ create_user = u
|
|
|
+ # if user:
|
|
|
+ # print('已用相同信箱註冊過,再開一個GMAIL帳號')
|
|
|
+ # u = await User.create(username=username, password=password,email=email)
|
|
|
+ user_pydantic = UserPydantic.from_orm(user)
|
|
|
+ user_dict = user_pydantic.dict(exclude_unset=True)
|
|
|
+ user_dict.update({"token": access_token})
|
|
|
+ token_update = user.update_from_dict(user_dict)
|
|
|
+ await user.save()
|
|
|
+
|
|
|
+ return_msg = {
|
|
|
+ "access_token": access_token,
|
|
|
+ "token_type": "bearer",
|
|
|
+ 'code':'200',
|
|
|
+ }
|
|
|
+ return return_msg
|
|
|
+ # if add_time_code:
|
|
|
+ # available_ser_no = crud.serial_number.available(db, ser_no=add_time_code)
|
|
|
+ # print(available_ser_no)
|
|
|
+ # if available_ser_no:
|
|
|
+ # user_in = schemas.UserUpdate(available_time=user.available_time + available_ser_no.time)
|
|
|
+ # crud.user.update(db, db_obj=user, obj_in=user_in)
|
|
|
+ #
|
|
|
+ # ser_no_in = schemas.SerialNumberUpdate(code=available_ser_no.code, is_used=True,
|
|
|
+ # used_datetime=str(datetime.now()), owner_id=user.id)
|
|
|
+ # crud.serial_number.update(db, db_obj=available_ser_no, obj_in=ser_no_in)
|
|
|
+ # print(available_ser_no.time, type(available_ser_no.time))
|
|
|
+ # return_msg['time_added'] = available_ser_no.time
|
|
|
+ # else:
|
|
|
+ # return_msg['time_added'] = -1
|
|
|
+ # return return_msg
|
|
|
+
|
|
|
+@users.post("/logout")
|
|
|
+async def logout():
|
|
|
+
|
|
|
+ return {"msg":"logout success","code":200}
|
|
|
+
|
|
|
+@users.post("/add") # 寄認證信
|
|
|
+async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default=''), re_password: str = Form(default='')):
|
|
|
+ if username and password and email:
|
|
|
+ user_email = await query_user(email)
|
|
|
+ user_username = await query_user_username(username)
|
|
|
+
|
|
|
+ if user_email or user_username:
|
|
|
+ return {"msg":"該信箱或使用者名稱已存在","code":403}
|
|
|
+ # elif user_username:
|
|
|
+ # return {"msg":"該使用者名稱已存在","code":403}
|
|
|
+ else:
|
|
|
+ if password == re_password:
|
|
|
+ # access_token = manager.create_access_token(
|
|
|
+ # data={'sub': email}
|
|
|
+ # )
|
|
|
+ print('前',settings.SECRET_KEY)
|
|
|
+ expiration_time = datetime.utcnow() + timedelta(hours=0.5)
|
|
|
+ access_token = jwt.encode({'email':email,'exp':expiration_time}, settings.SECRET_KEY, algorithm="HS256")
|
|
|
+ hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
|
|
+ u = await User.create(username=username, password=hashed_password, email=email,is_superuser=0,token=access_token,points=0,is_active=0)
|
|
|
+ if u:
|
|
|
+ # message = '註冊認證'
|
|
|
+ message = f"請點擊以下連結完成註冊流程:\n\nhttps://cmm.ai:8088/api/verify?token={access_token}"
|
|
|
+ subject = '註冊信'
|
|
|
+ print(message)
|
|
|
+ send_email(email,access_token,subject,message)
|
|
|
+
|
|
|
+ return {"msg": "已寄送註冊信", "code": 200}
|
|
|
+ else:
|
|
|
+ return {"msg": "未寄出註冊信", "code":403}
|
|
|
+ else:
|
|
|
+ return {"msg":"確認密碼錯誤","code":403}
|
|
|
+
|
|
|
+ return {"msg": "create user failed", "code": 403}
|
|
|
+
|
|
|
+
|
|
|
+@users.get("/verify") # 註冊認證確認
|
|
|
+async def verify_email(token:str):
|
|
|
+ try:
|
|
|
+ print(token)
|
|
|
+ print('後',settings.SECRET_KEY)
|
|
|
+ payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
|
|
|
+ print('解密結果',payload)
|
|
|
+ email = payload["email"]
|
|
|
+ # is_active True
|
|
|
+ user = await User.filter(email=email,is_active=0).first()
|
|
|
+ if user:
|
|
|
+ user_pydantic = UserPydantic.from_orm(user)
|
|
|
+ user_dict = user_pydantic.dict(exclude_unset=True)
|
|
|
+ user_dict.update({"is_active": 1})
|
|
|
+ is_active_update = user.update_from_dict(user_dict)
|
|
|
+ await user.save()
|
|
|
+ result = '信箱驗證成功'
|
|
|
+ else:
|
|
|
+ result = '信箱驗證失敗'
|
|
|
+
|
|
|
+ return {"message": result}
|
|
|
+ except:
|
|
|
+ raise HTTPException(status_code=400)
|
|
|
+ # except jwt.ExpiredSignatureError:
|
|
|
+ # raise HTTPException(status_code=400, detail="token已失效")
|
|
|
+ # except jwt.DecodeError:
|
|
|
+ # raise HTTPException(status_code=400, detail="無效token")
|
|
|
+
|
|
|
+
|
|
|
+def generate_password_reset_token(email: str) -> str:
|
|
|
+ delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
|
|
|
+ now = datetime.utcnow()
|
|
|
+ expires = now + delta
|
|
|
+ exp = expires.timestamp()
|
|
|
+ encoded_jwt = jwt.encode(
|
|
|
+ {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",)
|
|
|
+ print(encoded_jwt)
|
|
|
+ return encoded_jwt
|
|
|
+
|
|
|
+def verify_password_reset_token(token: str):
|
|
|
+ try:
|
|
|
+ decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
|
|
|
+ print(decoded_token)
|
|
|
+ return decoded_token["sub"]
|
|
|
+ except jwt.JWTError:
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def send_email(
|
|
|
+ email_to: str,
|
|
|
+ token: str,
|
|
|
+ subject_template: str = "",
|
|
|
+ html_template: str = "",
|
|
|
+ environment: Dict[str, Any] = {},
|
|
|
+):
|
|
|
+ # message = emails.Message(
|
|
|
+ # subject=JinjaTemplate(subject_template),
|
|
|
+ # html=JinjaTemplate(html_template),
|
|
|
+ # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
|
|
|
+ # )
|
|
|
+
|
|
|
+
|
|
|
+ subject=subject_template
|
|
|
+ html=html_template
|
|
|
+ mailobj={}
|
|
|
+ mailobj['toaddr']=email_to
|
|
|
+ mailobj['title']=subject
|
|
|
+ mailobj['totext']=html
|
|
|
+
|
|
|
+ conn = rpyc.connect("192.168.192.80", 12345)
|
|
|
+ conn.root.mailto(mailobj)
|
|
|
+ return {"message":f"send email"}
|
|
|
+
|
|
|
+
|
|
|
+ #message = emails.Message(
|
|
|
+ # subject=JinjaTemplate(subject_template),
|
|
|
+ # html=JinjaTemplate(html_template),
|
|
|
+ # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
|
|
|
+ #)
|
|
|
+ #smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
|
|
|
+ #if settings.SMTP_TLS:
|
|
|
+ # smtp_options["tls"] = True
|
|
|
+ #if settings.SMTP_USER:
|
|
|
+ # smtp_options["user"] = settings.SMTP_USER
|
|
|
+ #if settings.SMTP_PASSWORD:
|
|
|
+ # smtp_options["password"] = settings.SMTP_PASSWORD
|
|
|
+ #response = message.send(to=email_to, render=environment, smtp=smtp_options)
|
|
|
+ #print('RResponse',response)
|
|
|
+ #return {"message":f"send email result: {response}"}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ # logging.info(f"send email result: {response}")
|
|
|
+ # email_content = MIMEText(message)
|
|
|
+ # email_content["Subject"] = subject
|
|
|
+ # email_content["From"] = 'zooey@choozmo.com'
|
|
|
+ # email_content["To"] = email_to
|
|
|
+ # try:
|
|
|
+ # print('測試成功')
|
|
|
+ # # Connect to the SMTP server
|
|
|
+ # smtp_server = smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT)
|
|
|
+ # smtp_server.starttls()
|
|
|
+ #
|
|
|
+ # # Login to the email account
|
|
|
+ # smtp_server.login(settings.SMTP_HOST, settings.SMTP_PASSWORD)
|
|
|
+ #
|
|
|
+ # # Send the email
|
|
|
+ # smtp_server.sendmail(settings.SMTP_HOST, email_to, email_content.as_string())
|
|
|
+ #
|
|
|
+ # # Close the connection
|
|
|
+ # smtp_server.quit()
|
|
|
+ #
|
|
|
+ # return {"message": "Email sent successfully."}
|
|
|
+ # except Exception as e:
|
|
|
+ # print('測試失敗')
|
|
|
+
|
|
|
+
|
|
|
+def create_singup_url():
|
|
|
+ url=''
|
|
|
+ return url
|
|
|
+
|
|
|
+def send_reset_password_email(email_to: str, email: str, token: str) -> None:
|
|
|
+ subject = f"Password recovery for user {email}"
|
|
|
+ with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
|
|
|
+ template_str = f.read()
|
|
|
+ server_host = settings.SERVER_HOST
|
|
|
+ link = f"{server_host}/reset-password?token={token}"
|
|
|
+ message = '重新設定密碼'
|
|
|
+ send_email(
|
|
|
+ email_to=email_to,
|
|
|
+ subject_template=subject,
|
|
|
+ html_template=template_str,
|
|
|
+ environment={
|
|
|
+ "project_name": settings.PROJECT_NAME,
|
|
|
+ "username": email,
|
|
|
+ "email": email_to,
|
|
|
+ "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
|
|
|
+ "link": link,
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+@users.post("/password-recovery/{email}")
|
|
|
+async def recover_password(email:str):
|
|
|
+ user = await User.filter(email=email).first()
|
|
|
+ if not user:
|
|
|
+ raise HTTPException(
|
|
|
+ status_code=404,
|
|
|
+ detail="The user with this username does not exist in the system.",
|
|
|
+ )
|
|
|
+ password_reset_token = generate_password_reset_token(email=email)
|
|
|
+ send_reset_password_email(
|
|
|
+ email_to=user.email, email=email, token=password_reset_token
|
|
|
+ )
|
|
|
+ return {"msg": "Password recovery email sent"}
|
|
|
+
|
|
|
+
|
|
|
+from pydantic import BaseModel
|
|
|
+
|
|
|
+
|
|
|
+class Msg(BaseModel):
|
|
|
+ msg: str
|
|
|
+@users.post("/reset-password/", response_model=Msg)
|
|
|
+async def reset_password(
|
|
|
+ token: str = Body(...),
|
|
|
+ new_password: str = Body(...),
|
|
|
+ db: Session = Depends(deps.get_db),
|
|
|
+) -> Any:
|
|
|
+ """
|
|
|
+ Reset password
|
|
|
+ """
|
|
|
+ email = verify_password_reset_token(token)
|
|
|
+ print(email)
|
|
|
+ if not email:
|
|
|
+ raise HTTPException(status_code=400, detail="Invalid token")
|
|
|
+ # user = await query_user(email)
|
|
|
+ user = await User.filter(email=email).first()
|
|
|
+
|
|
|
+ if not user:
|
|
|
+ raise HTTPException(
|
|
|
+ status_code=404,
|
|
|
+ detail="The user with this username does not exist in the system.",
|
|
|
+ )
|
|
|
+ # elif not crud.user.is_active(user):
|
|
|
+ # raise HTTPException(status_code=400, detail="Inactive user")
|
|
|
+ hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
|
|
+ user.password = hashed_password
|
|
|
+ # db.add(user)
|
|
|
+ db.commit()
|
|
|
+ print(user.password)
|
|
|
+ db.close()
|
|
|
+ return {"msg": "Password updated successfully"}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+@users.get("/delete_user/{id}")
|
|
|
+async def delete(
|
|
|
+id: int,
|
|
|
+user_id = Depends(check_token)
|
|
|
+):
|
|
|
+ if not user_id :
|
|
|
+ return {"msg": "no exit", "code": 200}
|
|
|
+ user = await User.get(id=user_id)
|
|
|
+
|
|
|
+ if user.is_superuser != 2:
|
|
|
+ return {"msg": "no access", "code": 200}
|
|
|
+
|
|
|
+ if id:
|
|
|
+ await User.filter(id=id).delete()
|
|
|
+ return {"msg": "success", "code": 200}
|
|
|
+
|
|
|
+ return {"msg": "failed", "code": 400}
|
|
|
+
|
|
|
+
|
|
|
+@users.get("/information")
|
|
|
+async def get_information(token:str):
|
|
|
+ result = await User.filter(token=token).first()
|
|
|
+ return {"msg":result, "code":200}
|
|
|
+
|
|
|
+@users.get("/protect")
|
|
|
+def protected_route(user=Depends(manager)):
|
|
|
+ if user is None:
|
|
|
+ return {'message': "no access"}
|
|
|
+ return {'user': user}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+@users.get("/check_user")
|
|
|
+async def check_user(
|
|
|
+user_id = Depends(check_token)
|
|
|
+):
|
|
|
+ user = await User.get(id=user_id)
|
|
|
+
|
|
|
+ return {"msg": "success", "code": 200,"is_super":user.is_superuser}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|