from fastapi import APIRouter, Form, Depends, HTTPException, File, UploadFile from fastapi.responses import FileResponse import os from random import randint import uuid from fastapi.security import OAuth2PasswordRequestForm from app.models.models import User from app.models.models import Class_list from app.api import deps from sqlalchemy.orm import Session from typing import Any, Dict import secrets from fastapi_login.exceptions import InvalidCredentialsException from fastapi_login import LoginManager from datetime import timedelta,datetime from app.config import settings from pathlib import Path from jose import jwt import emails from emails.template import JinjaTemplate import logging from tortoise.queryset import Q classes = APIRouter() # SECRET: str = secrets.token_urlsafe(32) # manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72)) # @manager.user_loader() # async def query_user(user_id: str): # """ # Get a user from the db # :param user_id: E-Mail of the user # :return: None or the user object # """ # result = await User.filter(username=user_id).first() # if not result: # print('[]') # return [] # return result # # return DB['users'].get(user_id) @classes.post("/insert_class") async def insert_class( id: int = Form(default=0), name: str = Form(default=''), start_time: datetime = Form(default=datetime.now()), end_time: datetime = Form(default=datetime.now()), location: str = Form(default=''), lecturer: str = Form(default=''), organizer: str = Form(default=''), contact: str = Form(default=''), introduction: str = Form(default=''), content: str = Form(default=''), cover_img: str = Form(default='') ): try: new_class = await Class_list.create( id=id, name=name, start_time=start_time, end_time=end_time, location=location, lecturer=lecturer, organizer=organizer, contact=contact, introduction=introduction, content=content, cover_img=cover_img ) return {"msg": "success", "code": 200, "class_id": new_class.id} except Exception as e: return {"msg": str(e), "code": 500} # @classes.post("/update_class") # async def update_class( # id: int = Form(default=0), # name: str = Form(default=''), # start_time: datetime = Form(default=datetime.now()), # end_time: datetime = Form(default=datetime.now()), # location: str = Form(default=''), # lecturer: str = Form(default=''), # organizer: str = Form(default=''), # contact: str = Form(default=''), # introduction: str = Form(default=''), # content: str = Form(default='') # ): # try: # await Class_list.filter(id=id).update( # name=name, # start_time=start_time, # end_time=end_time, # location=location, # lecturer=lecturer, # organizer=organizer, # contact=contact, # introduction=introduction, # content=content # ) # return {"msg": "success", "code": 200} # except Exception as e: # return {"msg": str(e), "code": 500} @classes.post("/update_class") async def update_class( id: int = Form(default=0), name: str = Form(default=''), start_time: datetime = Form(default=datetime.now()), end_time: datetime = Form(default=datetime.now()), location: str = Form(default=''), lecturer: str = Form(default=''), organizer: str = Form(default=''), contact: str = Form(default=''), introduction: str = Form(default=''), content: str = Form(default=''), cover_img: str = Form(default=''), ): try: class_obj = await Class_list.get(id=id) if name.strip() != '': class_obj.name = name if start_time: class_obj.start_time = start_time if end_time: class_obj.end_time = end_time if location.strip() != '': class_obj.location = location if lecturer.strip() != '': class_obj.lecturer = lecturer if organizer.strip() != '': class_obj.organizer = organizer if contact.strip() != '': class_obj.contact = contact if introduction.strip() != '': class_obj.introduction = introduction if content.strip() != '': class_obj.content = content if cover_img.strip() != '': class_obj.cover_img = cover_img await class_obj.save() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/delete_class") async def delete(id: int): if id: await Class_list.filter(id=id).delete() return {"msg": "success", "code": 200} @classes.get("/search_class") async def search_class(id: int): try: class_obj = await Class_list.get(id=id) return { "msg": "success", "code": 200, "class_id": class_obj.id, "name": class_obj.name, "start_time": class_obj.start_time, "end_time": class_obj.end_time, "location": class_obj.location, "lecturer": class_obj.lecturer, "organizer": class_obj.organizer, "contact": class_obj.contact, "introduction": class_obj.introduction, "content": class_obj.content, "cover_img": class_obj.cover_img, } except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_class") async def get_class(): try: class_list = await Class_list.all() classes = [] for class_obj in class_list: class_data = { "class_id": class_obj.id, "name": class_obj.name, "start_time": class_obj.start_time, "end_time": class_obj.end_time, "location": class_obj.location, "lecturer": class_obj.lecturer, "organizer": class_obj.organizer, "contact": class_obj.contact, "introduction": class_obj.introduction, "content": class_obj.content, "cover_img": class_obj.cover_img } classes.append(class_data) return {"msg": "success", "code": 200, "classes": classes} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/search_class_like") async def search_class_like(keyword: str): try: class_list = await Class_list.filter( Q(name__icontains=keyword) | Q(lecturer__icontains=keyword) ).all() classes = [] for class_obj in class_list: class_data = { "class_id": class_obj.id, "name": class_obj.name, "start_time": class_obj.start_time, "end_time": class_obj.end_time, "location": class_obj.location, "lecturer": class_obj.lecturer, "organizer": class_obj.organizer, "contact": class_obj.contact, "introduction": class_obj.introduction, "content": class_obj.content, "cover_img": class_obj.cover_img } classes.append(class_data) return {"msg": "success", "code": 200, "classes": classes} except Exception as e: return {"msg": str(e), "code": 500} IMAGEDIR = "images/" @classes.post("/upload/") async def create_upload_file(file: UploadFile = File(...)): #file.filename = f"{uuid.uuid4()}.jpeg" contents = await file.read() #save the file with open(f"{IMAGEDIR}{file.filename}", "wb") as f: f.write(contents) return {"filename": file.filename} @classes.get("/show/") async def read_random_file(): # get random file from the image directory files = os.listdir(IMAGEDIR) random_index = randint(0, len(files) - 1) path = f"{IMAGEDIR}{files[random_index]}" return FileResponse(path) # @classes.post("/login") # async def login(data: OAuth2PasswordRequestForm = Depends()): # username = data.username # password = data.password # user = await query_user(username) # print(user) # if not user: # # you can return any response or error of your choice # raise InvalidCredentialsException # elif password != user.password: # raise InvalidCredentialsException # access_token = manager.create_access_token( # data={'sub': username} # ) # return {'access_token': access_token} # @classes.post("/logout") # async def logout(): # return {"msg":"logout success","code":200} # @classes.post("/add") # async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default='')): # if username and password and email: # u = await User.create(username=username, password=password, email=email) # if u: # send_email() # return {"msg": "已寄送認證信", "code": 200} # return {"msg": "create user failed", "code": 403} # def generate_password_reset_token(email: str) -> str: # delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS) # now = datetime.utcnow() # expires = now + delta # exp = expires.timestamp() # encoded_jwt = jwt.encode( # {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256", # ) # return encoded_jwt # def send_email( # email_to: str, # subject_template: str = "", # html_template: str = "", # environment: Dict[str, Any] = {}, # ) -> None: # # assert settings.EMAILS_ENABLED, "no provided configuration for email variables" # message = emails.Message( # subject=JinjaTemplate(subject_template), # html=JinjaTemplate(html_template), # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL), # ) # smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT} # if settings.SMTP_TLS: # smtp_options["tls"] = True # if settings.SMTP_USER: # smtp_options["user"] = settings.SMTP_USER # if settings.SMTP_PASSWORD: # smtp_options["password"] = settings.SMTP_PASSWORD # response = message.send(to=email_to, render=environment, smtp=smtp_options) # logging.info(f"send email result: {response}") # def send_reset_password_email(email_to: str, email: str, token: str) -> None: # subject = f"Password recovery for user {email}" # with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f: # template_str = f.read() # server_host = settings.SERVER_HOST # link = f"{server_host}/reset-password?token={token}" # send_email( # email_to=email_to, # subject_template=subject, # html_template=template_str, # environment={ # # "project_name": settings.PROJECT_NAME, # "username": email, # "email": email_to, # "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS, # "link": link, # }, # ) # @users.post("/password-recovery/{email}") # async def recover_password(email:str): # user = await User.filter(email=email).first() # if not user: # raise HTTPException( # status_code=404, # detail="The user with this username does not exist in the system.", # ) # password_reset_token = generate_password_reset_token(email=email) # send_reset_password_email( # email_to=user.email, email=email, token=password_reset_token # ) # return {"msg": "Password recovery email sent"} # @users.get("/delete_user/{id}") # async def delete(id: int): # if id: # await User.filter(id=id).delete() # return {"msg": "success", "code": 200} # return {"msg": "failed", "code": 400}