from fastapi import APIRouter, Form, Depends, HTTPException, File, UploadFile from fastapi.responses import FileResponse import os from random import randint import uuid from fastapi.security import OAuth2PasswordRequestForm from app.models.models import User from app.models.models import Class_list from app.api import deps from sqlalchemy.orm import Session from typing import Any, Dict import secrets from fastapi_login.exceptions import InvalidCredentialsException from fastapi_login import LoginManager from datetime import timedelta,datetime from app.config import settings from pathlib import Path from jose import jwt import emails from emails.template import JinjaTemplate import logging from tortoise.queryset import Q classes = APIRouter() IMAGEDIR = "/var/www/ntcri/app/api/images/" # SECRET: str = secrets.token_urlsafe(32) # manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72)) # @manager.user_loader() # async def query_user(user_id: str): # """ # Get a user from the db # :param user_id: E-Mail of the user # :return: None or the user object # """ # result = await User.filter(username=user_id).first() # if not result: # print('[]') # return [] # return result # # return DB['users'].get(user_id) @classes.post("/insert_class") async def insert_class( id: int = Form(default=0), name: str = Form(default=''), start_time: datetime = Form(default=datetime.now()), end_time: datetime = Form(default=datetime.now()), location: str = Form(default=''), lecturer: str = Form(default=''), organizer: str = Form(default=''), contact: str = Form(default=''), introduction: str = Form(default=''), content: str = Form(default=''), cover_img: str = Form(default=''), cover_img_file:UploadFile = File(default='') ): try: contents = await cover_img_file.read() if cover_img.strip() != '': cover_img = f"{IMAGEDIR}{cover_img}" else: cover_img = f"{IMAGEDIR}{cover_img_file.filename}" #save the file with open(cover_img, "wb") as f: f.write(contents) new_class = await Class_list.create( id=id, name=name, start_time=start_time, end_time=end_time, location=location, lecturer=lecturer, organizer=organizer, contact=contact, introduction=introduction, content=content, cover_img=cover_img ) return {"msg": "success", "code": 200, "class_id": new_class.id} except Exception as e: return {"msg": str(e), "code": 500} # @classes.post("/update_class") # async def update_class( # id: int = Form(default=0), # name: str = Form(default=''), # start_time: datetime = Form(default=datetime.now()), # end_time: datetime = Form(default=datetime.now()), # location: str = Form(default=''), # lecturer: str = Form(default=''), # organizer: str = Form(default=''), # contact: str = Form(default=''), # introduction: str = Form(default=''), # content: str = Form(default='') # ): # try: # await Class_list.filter(id=id).update( # name=name, # start_time=start_time, # end_time=end_time, # location=location, # lecturer=lecturer, # organizer=organizer, # contact=contact, # introduction=introduction, # content=content # ) # return {"msg": "success", "code": 200} # except Exception as e: # return {"msg": str(e), "code": 500} @classes.post("/update_class") async def update_class( id: int = Form(default=0), name: str = Form(default=''), start_time: datetime = Form(default=datetime.now()), end_time: datetime = Form(default=datetime.now()), location: str = Form(default=''), lecturer: str = Form(default=''), organizer: str = Form(default=''), contact: str = Form(default=''), introduction: str = Form(default=''), content: str = Form(default=''), cover_img: str = Form(default=''), cover_img_file:UploadFile = File(default='') ): try: class_obj = await Class_list.get(id=id) if name.strip() != '': class_obj.name = name if start_time: class_obj.start_time = start_time if end_time: class_obj.end_time = end_time if location.strip() != '': class_obj.location = location if lecturer.strip() != '': class_obj.lecturer = lecturer if organizer.strip() != '': class_obj.organizer = organizer if contact.strip() != '': class_obj.contact = contact if introduction.strip() != '': class_obj.introduction = introduction if content.strip() != '': class_obj.content = content if cover_img.strip() != '': if cover_img_file.strip() != '': class_obj.cover_img = f"{IMAGEDIR}{cover_img}" contents = await cover_img_file.read() with open(class_obj.cover_img, "wb") as f: f.write(contents) else: os.rename(class_obj.cover_img, f"{IMAGEDIR}{cover_img}") class_obj.cover_img = f"{IMAGEDIR}{cover_img}" else: if cover_img_file.strip() != '': class_obj.cover_img =f"{IMAGEDIR}{cover_img_file.filename}" contents = await cover_img_file.read() with open(class_obj.cover_img, "wb") as f: f.write(contents) await class_obj.save() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/delete_class") async def delete(id: int): if id: await Class_list.filter(id=id).delete() return {"msg": "success", "code": 200} @classes.get("/search_class") async def search_class(id: int): try: class_obj = await Class_list.get(id=id) return { "msg": "success", "code": 200, "class_id": class_obj.id, "name": class_obj.name, "start_time": class_obj.start_time, "end_time": class_obj.end_time, "location": class_obj.location, "lecturer": class_obj.lecturer, "organizer": class_obj.organizer, "contact": class_obj.contact, "introduction": class_obj.introduction, "content": class_obj.content, "cover_img": class_obj.cover_img, } except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_class") async def get_class(): try: class_list = await Class_list.all() classes = [] for class_obj in class_list: class_data = { "class_id": class_obj.id, "name": class_obj.name, "start_time": class_obj.start_time, "end_time": class_obj.end_time, "location": class_obj.location, "lecturer": class_obj.lecturer, "organizer": class_obj.organizer, "contact": class_obj.contact, "introduction": class_obj.introduction, "content": class_obj.content, "cover_img": class_obj.cover_img } classes.append(class_data) return {"msg": "success", "code": 200, "classes": classes} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/search_class_like") async def search_class_like(keyword: str): try: class_list = await Class_list.filter( Q(name__icontains=keyword) | Q(lecturer__icontains=keyword) ).all() classes = [] for class_obj in class_list: class_data = { "class_id": class_obj.id, "name": class_obj.name, "start_time": class_obj.start_time, "end_time": class_obj.end_time, "location": class_obj.location, "lecturer": class_obj.lecturer, "organizer": class_obj.organizer, "contact": class_obj.contact, "introduction": class_obj.introduction, "content": class_obj.content, "cover_img": class_obj.cover_img } classes.append(class_data) return {"msg": "success", "code": 200, "classes": classes} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/upload/") async def create_upload_file(file: UploadFile = File(...)): #file.filename = f"{uuid.uuid4()}.jpeg" contents = await file.read() #save the file with open(f"{IMAGEDIR}{file.filename}", "wb") as f: f.write(contents) return {"filename": file.filename} #"/var/www/ntcri/app/api/images/test.jpeg" async def read_image_file(): # get random file from the image directory files = os.listdir(IMAGEDIR) random_index = randint(0, len(files) - 1) path = f"{IMAGEDIR}{files[random_index]}" return FileResponse(path)