from fastapi import APIRouter, Form, Depends, HTTPException, File, UploadFile from typing import List,Optional,Union from fastapi.responses import FileResponse from random import randint from fastapi.security import OAuth2PasswordRequestForm from app.models.models import User,Article_list from app.api import deps from sqlalchemy.orm import Session from typing import Any, Dict import secrets from fastapi_login.exceptions import InvalidCredentialsException from fastapi_login import LoginManager from datetime import timedelta,datetime from jose import jwt from emails.template import JinjaTemplate from tortoise.queryset import Q from fastapi.responses import HTMLResponse article = APIRouter() IMAGEDIR = "/var/www/ntcri/assets/article_files/" IMAGEDIR_short = "assets/article_files/" async def create_upload_files(files:Optional[List[UploadFile]] = File(None)): files_url = {} if files : file_num = 1 print(file_num) for file in files: contents = await file.read() #save the file with open(f"{IMAGEDIR}{file.filename}", "wb") as f: f.write(contents) file_name = "file" + str(file_num) print(file_name) files_url[file_name]=f"{IMAGEDIR_short}{file.filename}" file_num=file_num+1 return files_url @article.get("/get_article") async def get_article( article_id : Optional[int] = None, group_id : Optional[int] = None, group_sort : Optional[str] = None, category : Optional[str] = None, tags : Optional[str] = None, recommend: Optional[int] = None, page_num : Optional[int] = None, page_amount : Optional[int] = None ): try: article_list = Article_list.filter(is_del = 0).all() if group_sort: article_list = article_list.filter(group_sort = group_sort).all() if category: article_list = article_list.filter(Q(category__icontains=category)).all() if tags: article_list = article_list.filter(Q(tags__icontains=category)).all() if recommend: article_list = article_list.filter(Q(recommend=recommend)).all() if article_id : article_list = article_list.filter(id = article_id).all() if group_id : article_list = article_list.filter(group_id = group_id).all() count = await article_list.all().count() if page_num and page_amount: article_list = article_list.offset((page_num-1)*page_amount).limit(page_amount) article_list = await article_list.all().order_by("-create_time") article_objs = [] for article_obj in article_list: try: article_tmp = { "article_id": article_obj.id, "title": article_obj.title, "school_id" :article_obj.school_id, "group_sort" :article_obj.group_sort, "group_id" :article_obj.group_id, "category": article_obj.category, "create_time" : article_obj.create_time, "click_time" : article_obj.click_time, "depiction" : article_obj.depiction, "content" : article_obj.content, "files" : article_obj.files, "video_url" : article_obj.vedio_url, "tags" : article_obj.tags, "cover_img": article_obj.cover_img, "recommend" :article_obj.recommend, "url" : article_obj.url } except: article_tmp = {"msg":"data wrong"} article_objs.append(article_tmp) return {"msg": "success", "code": 200, "total_num":count,"articles": article_objs} except Exception as e: return {"msg": str(e), "code": 500} @article.post("/insert_article_imgs") async def insert_imgs( files_url = Depends(create_upload_files) ): try: files = str(files_url), return {"msg": "success", "code": 200, "url": files} except Exception as e: return {"msg": str(e), "code": 500} @article.post("/insert_article") async def insert_article( title : str = Form(default=''), school_id : int = Form(default=1), group_id : int = Form(default=1), group_sort : str = Form(default=''), tags : str = Form(default='[]'), category : str = Form(default=''), content : str = Form(default=''), depiction :str = Form(default=''), vedio_url :str = Form(default=''), files : str = Form(default='{}'), cover_img_file:UploadFile = File(default=''), url : str = Form(default=''), recommend: int = Form(default=0) ): try: cover_img = '' if cover_img_file != '': contents = await cover_img_file.read() #save the file with open(f"{IMAGEDIR}{cover_img_file.filename}", "wb") as f: f.write(contents) cover_img = f"{IMAGEDIR_short}{cover_img_file.filename}" new_article = await Article_list.create( title = title, school_id = school_id, group_id = group_id, group_sort = group_sort, create_user_id = 0, create_time = datetime.now(), latest_update_user_id = 0, latest_update_time = datetime.now(), tags = tags, category = category, content = content, depiction = depiction, vedio_url =vedio_url, cover_img = cover_img, click_time = 0, is_del = 0, url = url , files = files, recommend = recommend ) return {"msg": "success", "code": 200, "new_article": new_article.id} except Exception as e: return {"msg": str(e), "code": 500} @article.post("/update_article") async def update_article( article_id : int = Form(default=0), title : str = Form(default=''), school_id : int = Form(default=0), group_id : int = Form(default=0), group_sort : str = Form(default=''), tags : str = Form(default='[]'), category : str = Form(default=''), content : str = Form(default=''), depiction :str = Form(default=''), vedio_url :str = Form(default=''), url :str = Form(default=''), cover_img_file:UploadFile = File(default=''), recommend: int = Form(default=0) ): try: if article_id == 0 : return {"msg": "no ID"} article_obj = await Article_list.get(id=article_id) if title.strip() != '': article_obj.title = title if school_id != 0: article_obj.school_id = school_id if group_id != 0: article_obj.group_id = group_id if group_sort.strip() != '': article_obj.group_sort = group_sort print(group_sort +"change") if category.strip() != '': article_obj.category = category if content.strip() != '': article_obj.content = content if depiction.strip() != '': article_obj.depiction = depiction if vedio_url.strip() != '': article_obj.vedio_url = vedio_url if tags != '[]': article_obj.tags = tags if url != '': article_obj.url = url if recommend: article_obj.recommend = recommend if cover_img_file != '': contents = await cover_img_file.read() with open(f"{IMAGEDIR}{cover_img_file.filename}", "wb") as f: f.write(contents) article_obj.cover_img = f"{IMAGEDIR_short}{cover_img_file.filename}" await article_obj.save() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @article.post("/delete_article") async def delete_article(id: int = 0): if id != 0: article_obj = await Article_list.get(id=id) article_obj.is_del = 1 await article_obj.save() return {"msg": "success", "code": 200} else : return {"msg": "please input ID", "code": 200} @article.get("/search_article_like") async def search_article_like(keyword: str,page_num : Optional[int] = None, page_amount : Optional[int] = None): try: article_list = Article_list.filter( Q(title__icontains=keyword)| Q(category__icontains=keyword)| Q(content__icontains=keyword)| Q(depiction__icontains=keyword)| Q(tags__icontains=keyword) ).all() count = await article_list.all().count() if page_num and page_amount: article_list = article_list.offset((page_num-1)*page_amount).limit(page_amount) article_list = await article_list.all().order_by("-create_time") article_objs = [] for article_obj in article_list: try: article_tmp = { "article_id": article_obj.id, "title": article_obj.title, "school_id" :article_obj.school_id, "group_sort" :article_obj.group_sort, "group_id" :article_obj.group_id, "category": article_obj.category, "create_time" : article_obj.create_time, "click_time" : article_obj.click_time, "depiction" : article_obj.depiction, "content" : article_obj.content, "files" : article_obj.files, "video_url" : article_obj.vedio_url, "tags" : article_obj.tags, "cover_img": article_obj.cover_img, "url" : article_obj.url } except: article_tmp = {"msg":"data wrong"} article_objs.append(article_tmp) return {"msg": "success", "code": 200, "total_num":count,"article": article_objs} except Exception as e: return {"msg": str(e), "code": 500}