from fastapi import APIRouter, Form, Depends, HTTPException from fastapi.security import OAuth2PasswordRequestForm from app.models.models import User from app.models.models import Class_list from app.api import deps from sqlalchemy.orm import Session from typing import Any, Dict import secrets from fastapi_login.exceptions import InvalidCredentialsException from fastapi_login import LoginManager from datetime import timedelta,datetime from app.config import settings from pathlib import Path from jose import jwt import emails from emails.template import JinjaTemplate import logging from tortoise.queryset import Q classes = APIRouter() # SECRET: str = secrets.token_urlsafe(32) # manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72)) # @manager.user_loader() # async def query_user(user_id: str): # """ # Get a user from the db # :param user_id: E-Mail of the user # :return: None or the user object # """ # result = await User.filter(username=user_id).first() # if not result: # print('[]') # return [] # return result # # return DB['users'].get(user_id) @classes.post("/insert_class") async def insert_class( id: int = Form(default=0), name: str = Form(default=''), start_time: datetime = Form(default=datetime.now()), end_time: datetime = Form(default=datetime.now()), location: str = Form(default=''), lecturer: str = Form(default=''), organizer: str = Form(default=''), contact: str = Form(default=''), introduction: str = Form(default=''), content: str = Form(default=''), cover_img: str = Form(default='') ): try: new_class = await Class_list.create( id=id, name=name, start_time=start_time, end_time=end_time, location=location, lecturer=lecturer, organizer=organizer, contact=contact, introduction=introduction, content=content, cover_img=cover_img ) return {"msg": "success", "code": 200, "class_id": new_class.id} except Exception as e: return {"msg": str(e), "code": 500} # @classes.post("/update_class") # async def update_class( # id: int = Form(default=0), # name: str = Form(default=''), # start_time: datetime = Form(default=datetime.now()), # end_time: datetime = Form(default=datetime.now()), # location: str = Form(default=''), # lecturer: str = Form(default=''), # organizer: str = Form(default=''), # contact: str = Form(default=''), # introduction: str = Form(default=''), # content: str = Form(default='') # ): # try: # await Class_list.filter(id=id).update( # name=name, # start_time=start_time, # end_time=end_time, # location=location, # lecturer=lecturer, # organizer=organizer, # contact=contact, # introduction=introduction, # content=content # ) # return {"msg": "success", "code": 200} # except Exception as e: # return {"msg": str(e), "code": 500} @classes.post("/update_class") async def update_class( id: int = Form(default=0), name: str = Form(default=''), start_time: datetime = Form(default=datetime.now()), end_time: datetime = Form(default=datetime.now()), location: str = Form(default=''), lecturer: str = Form(default=''), organizer: str = Form(default=''), contact: str = Form(default=''), introduction: str = Form(default=''), content: str = Form(default=''), cover_img: str = Form(default=''), ): try: class_obj = await Class_list.get(id=id) if name.strip() != '': class_obj.name = name if start_time: class_obj.start_time = start_time if end_time: class_obj.end_time = end_time if location.strip() != '': class_obj.location = location if lecturer.strip() != '': class_obj.lecturer = lecturer if organizer.strip() != '': class_obj.organizer = organizer if contact.strip() != '': class_obj.contact = contact if introduction.strip() != '': class_obj.introduction = introduction if content.strip() != '': class_obj.content = content if cover_img.strip() != '': class_obj.cover_img = cover_img await class_obj.save() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/delete_class") async def delete(id: int): if id: await Class_list.filter(id=id).delete() return {"msg": "success", "code": 200} @classes.get("/search_class") async def search_class(id: int): try: class_obj = await Class_list.get(id=id) return { "msg": "success", "code": 200, "class_id": class_obj.id, "name": class_obj.name, "start_time": class_obj.start_time, "end_time": class_obj.end_time, "location": class_obj.location, "lecturer": class_obj.lecturer, "organizer": class_obj.organizer, "contact": class_obj.contact, "introduction": class_obj.introduction, "content": class_obj.content, "cover_img": class_obj.cover_img, } except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_class") async def get_class(): try: class_list = await Class_list.all() classes = [] for class_obj in class_list: class_data = { "class_id": class_obj.id, "name": class_obj.name, "start_time": class_obj.start_time, "end_time": class_obj.end_time, "location": class_obj.location, "lecturer": class_obj.lecturer, "organizer": class_obj.organizer, "contact": class_obj.contact, "introduction": class_obj.introduction, "content": class_obj.content, "cover_img": class_obj.cover_img } classes.append(class_data) return {"msg": "success", "code": 200, "classes": classes} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/search_class_like") async def search_class_like(keyword: str): try: class_list = await Class_list.filter( Q(name__icontains=keyword) | Q(lecturer__icontains=keyword) ).all() classes = [] for class_obj in class_list: class_data = { "class_id": class_obj.id, "name": class_obj.name, "start_time": class_obj.start_time, "end_time": class_obj.end_time, "location": class_obj.location, "lecturer": class_obj.lecturer, "organizer": class_obj.organizer, "contact": class_obj.contact, "introduction": class_obj.introduction, "content": class_obj.content, "cover_img": class_obj.cover_img } classes.append(class_data) return {"msg": "success", "code": 200, "classes": classes} except Exception as e: return {"msg": str(e), "code": 500} # @classes.post("/login") # async def login(data: OAuth2PasswordRequestForm = Depends()): # username = data.username # password = data.password # user = await query_user(username) # print(user) # if not user: # # you can return any response or error of your choice # raise InvalidCredentialsException # elif password != user.password: # raise InvalidCredentialsException # access_token = manager.create_access_token( # data={'sub': username} # ) # return {'access_token': access_token} # @classes.post("/logout") # async def logout(): # return {"msg":"logout success","code":200} # @classes.post("/add") # async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default='')): # if username and password and email: # u = await User.create(username=username, password=password, email=email) # if u: # send_email() # return {"msg": "已寄送認證信", "code": 200} # return {"msg": "create user failed", "code": 403} # def generate_password_reset_token(email: str) -> str: # delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS) # now = datetime.utcnow() # expires = now + delta # exp = expires.timestamp() # encoded_jwt = jwt.encode( # {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256", # ) # return encoded_jwt # def send_email( # email_to: str, # subject_template: str = "", # html_template: str = "", # environment: Dict[str, Any] = {}, # ) -> None: # # assert settings.EMAILS_ENABLED, "no provided configuration for email variables" # message = emails.Message( # subject=JinjaTemplate(subject_template), # html=JinjaTemplate(html_template), # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL), # ) # smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT} # if settings.SMTP_TLS: # smtp_options["tls"] = True # if settings.SMTP_USER: # smtp_options["user"] = settings.SMTP_USER # if settings.SMTP_PASSWORD: # smtp_options["password"] = settings.SMTP_PASSWORD # response = message.send(to=email_to, render=environment, smtp=smtp_options) # logging.info(f"send email result: {response}") # def send_reset_password_email(email_to: str, email: str, token: str) -> None: # subject = f"Password recovery for user {email}" # with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f: # template_str = f.read() # server_host = settings.SERVER_HOST # link = f"{server_host}/reset-password?token={token}" # send_email( # email_to=email_to, # subject_template=subject, # html_template=template_str, # environment={ # # "project_name": settings.PROJECT_NAME, # "username": email, # "email": email_to, # "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS, # "link": link, # }, # ) # @users.post("/password-recovery/{email}") # async def recover_password(email:str): # user = await User.filter(email=email).first() # if not user: # raise HTTPException( # status_code=404, # detail="The user with this username does not exist in the system.", # ) # password_reset_token = generate_password_reset_token(email=email) # send_reset_password_email( # email_to=user.email, email=email, token=password_reset_token # ) # return {"msg": "Password recovery email sent"} # @users.get("/delete_user/{id}") # async def delete(id: int): # if id: # await User.filter(id=id).delete() # return {"msg": "success", "code": 200} # return {"msg": "failed", "code": 400}