123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- from fastapi import APIRouter, Form, Depends, HTTPException, File, UploadFile
- from fastapi.responses import FileResponse
- import os
- from random import randint
- import uuid
- from fastapi.security import OAuth2PasswordRequestForm
- from app.models.models import User
- from app.models.models import Class_list
- from app.api import deps
- from sqlalchemy.orm import Session
- from typing import Any, Dict
- import secrets
- from fastapi_login.exceptions import InvalidCredentialsException
- from fastapi_login import LoginManager
- from datetime import timedelta,datetime
- from app.config import settings
- from pathlib import Path
- from jose import jwt
- import emails
- from emails.template import JinjaTemplate
- import logging
- from tortoise.queryset import Q
- classes = APIRouter()
- IMAGEDIR = "/var/www/ntcri/app/api/images/"
- # SECRET: str = secrets.token_urlsafe(32)
- # manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72))
- # @manager.user_loader()
- # async def query_user(user_id: str):
- # """
- # Get a user from the db
- # :param user_id: E-Mail of the user
- # :return: None or the user object
- # """
- # result = await User.filter(username=user_id).first()
- # if not result:
- # print('[]')
- # return []
- # return result
- # # return DB['users'].get(user_id)
- @classes.post("/insert_class")
- async def insert_class(
- id: int = Form(default=0),
- name: str = Form(default=''),
- start_time: datetime = Form(default=datetime.now()),
- end_time: datetime = Form(default=datetime.now()),
- location: str = Form(default=''),
- lecturer: str = Form(default=''),
- organizer: str = Form(default=''),
- contact: str = Form(default=''),
- introduction: str = Form(default=''),
- content: str = Form(default=''),
- cover_img: str = Form(default=''),
- cover_img_file:UploadFile = File(default='')
- ):
- try:
- contents = await cover_img_file.read()
- if cover_img.strip() != '':
- cover_img = f"{IMAGEDIR}{cover_img}"
- else:
- cover_img = f"{IMAGEDIR}{cover_img_file.filename}"
- #save the file
- with open(cover_img, "wb") as f:
- f.write(contents)
-
- new_class = await Class_list.create(
- id=id,
- name=name,
- start_time=start_time,
- end_time=end_time,
- location=location,
- lecturer=lecturer,
- organizer=organizer,
- contact=contact,
- introduction=introduction,
- content=content,
- cover_img=cover_img
- )
- return {"msg": "success", "code": 200, "class_id": new_class.id}
- except Exception as e:
- return {"msg": str(e), "code": 500}
- # @classes.post("/update_class")
- # async def update_class(
- # id: int = Form(default=0),
- # name: str = Form(default=''),
- # start_time: datetime = Form(default=datetime.now()),
- # end_time: datetime = Form(default=datetime.now()),
- # location: str = Form(default=''),
- # lecturer: str = Form(default=''),
- # organizer: str = Form(default=''),
- # contact: str = Form(default=''),
- # introduction: str = Form(default=''),
- # content: str = Form(default='')
- # ):
- # try:
- # await Class_list.filter(id=id).update(
- # name=name,
- # start_time=start_time,
- # end_time=end_time,
- # location=location,
- # lecturer=lecturer,
- # organizer=organizer,
- # contact=contact,
- # introduction=introduction,
- # content=content
- # )
- # return {"msg": "success", "code": 200}
- # except Exception as e:
- # return {"msg": str(e), "code": 500}
- @classes.post("/update_class")
- async def update_class(
- id: int = Form(default=0),
- name: str = Form(default=''),
- start_time: datetime = Form(default=datetime.now()),
- end_time: datetime = Form(default=datetime.now()),
- location: str = Form(default=''),
- lecturer: str = Form(default=''),
- organizer: str = Form(default=''),
- contact: str = Form(default=''),
- introduction: str = Form(default=''),
- content: str = Form(default=''),
- cover_img: str = Form(default=''),
- cover_img_file:UploadFile = File(default='')
- ):
- try:
- class_obj = await Class_list.get(id=id)
- if name.strip() != '':
- class_obj.name = name
- if start_time:
- class_obj.start_time = start_time
- if end_time:
- class_obj.end_time = end_time
- if location.strip() != '':
- class_obj.location = location
- if lecturer.strip() != '':
- class_obj.lecturer = lecturer
- if organizer.strip() != '':
- class_obj.organizer = organizer
- if contact.strip() != '':
- class_obj.contact = contact
- if introduction.strip() != '':
- class_obj.introduction = introduction
- if content.strip() != '':
- class_obj.content = content
-
- if cover_img.strip() != '':
- if cover_img_file.strip() != '':
- class_obj.cover_img = f"{IMAGEDIR}{cover_img}"
- contents = await cover_img_file.read()
- with open(class_obj.cover_img, "wb") as f:
- f.write(contents)
- else:
- os.rename(class_obj.cover_img, f"{IMAGEDIR}{cover_img}")
- class_obj.cover_img = f"{IMAGEDIR}{cover_img}"
- else:
- if cover_img_file.strip() != '':
- class_obj.cover_img =f"{IMAGEDIR}{cover_img_file.filename}"
- contents = await cover_img_file.read()
- with open(class_obj.cover_img, "wb") as f:
- f.write(contents)
-
- await class_obj.save()
- return {"msg": "success", "code": 200}
- except Exception as e:
- return {"msg": str(e), "code": 500}
- @classes.post("/delete_class")
- async def delete(id: int):
- if id:
- await Class_list.filter(id=id).delete()
- return {"msg": "success", "code": 200}
- @classes.get("/search_class")
- async def search_class(id: int):
- try:
- class_obj = await Class_list.get(id=id)
- return {
- "msg": "success",
- "code": 200,
- "class_id": class_obj.id,
- "name": class_obj.name,
- "start_time": class_obj.start_time,
- "end_time": class_obj.end_time,
- "location": class_obj.location,
- "lecturer": class_obj.lecturer,
- "organizer": class_obj.organizer,
- "contact": class_obj.contact,
- "introduction": class_obj.introduction,
- "content": class_obj.content,
- "cover_img": class_obj.cover_img,
- }
- except Exception as e:
- return {"msg": str(e), "code": 500}
- @classes.get("/get_class")
- async def get_class():
- try:
- class_list = await Class_list.all()
- classes = []
- for class_obj in class_list:
- class_data = {
- "class_id": class_obj.id,
- "name": class_obj.name,
- "start_time": class_obj.start_time,
- "end_time": class_obj.end_time,
- "location": class_obj.location,
- "lecturer": class_obj.lecturer,
- "organizer": class_obj.organizer,
- "contact": class_obj.contact,
- "introduction": class_obj.introduction,
- "content": class_obj.content,
- "cover_img": class_obj.cover_img
- }
- classes.append(class_data)
- return {"msg": "success", "code": 200, "classes": classes}
- except Exception as e:
- return {"msg": str(e), "code": 500}
-
- @classes.get("/search_class_like")
- async def search_class_like(keyword: str):
- try:
- class_list = await Class_list.filter(
- Q(name__icontains=keyword) | Q(lecturer__icontains=keyword)
- ).all()
- classes = []
- for class_obj in class_list:
- class_data = {
- "class_id": class_obj.id,
- "name": class_obj.name,
- "start_time": class_obj.start_time,
- "end_time": class_obj.end_time,
- "location": class_obj.location,
- "lecturer": class_obj.lecturer,
- "organizer": class_obj.organizer,
- "contact": class_obj.contact,
- "introduction": class_obj.introduction,
- "content": class_obj.content,
- "cover_img": class_obj.cover_img
- }
- classes.append(class_data)
- return {"msg": "success", "code": 200, "classes": classes}
- except Exception as e:
- return {"msg": str(e), "code": 500}
- @classes.post("/upload/")
- async def create_upload_file(file: UploadFile = File(...)):
-
- #file.filename = f"{uuid.uuid4()}.jpeg"
- contents = await file.read()
-
- #save the file
- with open(f"{IMAGEDIR}{file.filename}", "wb") as f:
- f.write(contents)
-
- return {"filename": file.filename}
- #"/var/www/ntcri/app/api/images/test.jpeg"
-
- async def read_image_file():
-
- # get random file from the image directory
- files = os.listdir(IMAGEDIR)
- random_index = randint(0, len(files) - 1)
-
- path = f"{IMAGEDIR}{files[random_index]}"
-
- return FileResponse(path)
|