classes.py 12 KB


  1. from fastapi import APIRouter, Form, Depends, HTTPException, File, UploadFile
  2. from fastapi.responses import FileResponse
  3. import os
  4. from random import randint
  5. import uuid
  6. from fastapi.security import OAuth2PasswordRequestForm
  7. from app.models.models import User
  8. from app.models.models import Class_list
  9. from app.api import deps
  10. from sqlalchemy.orm import Session
  11. from typing import Any, Dict
  12. import secrets
  13. from fastapi_login.exceptions import InvalidCredentialsException
  14. from fastapi_login import LoginManager
  15. from datetime import timedelta,datetime
  16. from app.config import settings
  17. from pathlib import Path
  18. from jose import jwt
  19. import emails
  20. from emails.template import JinjaTemplate
  21. import logging
  22. from tortoise.queryset import Q
  23. classes = APIRouter()
  24. # SECRET: str = secrets.token_urlsafe(32)
  25. # manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72))
  26. # @manager.user_loader()
  27. # async def query_user(user_id: str):
  28. # """
  29. # Get a user from the db
  30. # :param user_id: E-Mail of the user
  31. # :return: None or the user object
  32. # """
  33. # result = await User.filter(username=user_id).first()
  34. # if not result:
  35. # print('[]')
  36. # return []
  37. # return result
  38. # # return DB['users'].get(user_id)
  39. @classes.post("/insert_class")
  40. async def insert_class(
  41. id: int = Form(default=0),
  42. name: str = Form(default=''),
  43. start_time: datetime = Form(default=datetime.now()),
  44. end_time: datetime = Form(default=datetime.now()),
  45. location: str = Form(default=''),
  46. lecturer: str = Form(default=''),
  47. organizer: str = Form(default=''),
  48. contact: str = Form(default=''),
  49. introduction: str = Form(default=''),
  50. content: str = Form(default=''),
  51. cover_img: str = Form(default='')
  52. ):
  53. try:
  54. new_class = await Class_list.create(
  55. id=id,
  56. name=name,
  57. start_time=start_time,
  58. end_time=end_time,
  59. location=location,
  60. lecturer=lecturer,
  61. organizer=organizer,
  62. contact=contact,
  63. introduction=introduction,
  64. content=content,
  65. cover_img=cover_img
  66. )
  67. return {"msg": "success", "code": 200, "class_id": new_class.id}
  68. except Exception as e:
  69. return {"msg": str(e), "code": 500}
  70. # @classes.post("/update_class")
  71. # async def update_class(
  72. # id: int = Form(default=0),
  73. # name: str = Form(default=''),
  74. # start_time: datetime = Form(default=datetime.now()),
  75. # end_time: datetime = Form(default=datetime.now()),
  76. # location: str = Form(default=''),
  77. # lecturer: str = Form(default=''),
  78. # organizer: str = Form(default=''),
  79. # contact: str = Form(default=''),
  80. # introduction: str = Form(default=''),
  81. # content: str = Form(default='')
  82. # ):
  83. # try:
  84. # await Class_list.filter(id=id).update(
  85. # name=name,
  86. # start_time=start_time,
  87. # end_time=end_time,
  88. # location=location,
  89. # lecturer=lecturer,
  90. # organizer=organizer,
  91. # contact=contact,
  92. # introduction=introduction,
  93. # content=content
  94. # )
  95. # return {"msg": "success", "code": 200}
  96. # except Exception as e:
  97. # return {"msg": str(e), "code": 500}
  98. @classes.post("/update_class")
  99. async def update_class(
  100. id: int = Form(default=0),
  101. name: str = Form(default=''),
  102. start_time: datetime = Form(default=datetime.now()),
  103. end_time: datetime = Form(default=datetime.now()),
  104. location: str = Form(default=''),
  105. lecturer: str = Form(default=''),
  106. organizer: str = Form(default=''),
  107. contact: str = Form(default=''),
  108. introduction: str = Form(default=''),
  109. content: str = Form(default=''),
  110. cover_img: str = Form(default=''),
  111. ):
  112. try:
  113. class_obj = await Class_list.get(id=id)
  114. if name.strip() != '':
  115. class_obj.name = name
  116. if start_time:
  117. class_obj.start_time = start_time
  118. if end_time:
  119. class_obj.end_time = end_time
  120. if location.strip() != '':
  121. class_obj.location = location
  122. if lecturer.strip() != '':
  123. class_obj.lecturer = lecturer
  124. if organizer.strip() != '':
  125. class_obj.organizer = organizer
  126. if contact.strip() != '':
  127. class_obj.contact = contact
  128. if introduction.strip() != '':
  129. class_obj.introduction = introduction
  130. if content.strip() != '':
  131. class_obj.content = content
  132. if cover_img.strip() != '':
  133. class_obj.cover_img = cover_img
  134. await class_obj.save()
  135. return {"msg": "success", "code": 200}
  136. except Exception as e:
  137. return {"msg": str(e), "code": 500}
  138. @classes.post("/delete_class")
  139. async def delete(id: int):
  140. if id:
  141. await Class_list.filter(id=id).delete()
  142. return {"msg": "success", "code": 200}
  143. @classes.get("/search_class")
  144. async def search_class(id: int):
  145. try:
  146. class_obj = await Class_list.get(id=id)
  147. return {
  148. "msg": "success",
  149. "code": 200,
  150. "class_id": class_obj.id,
  151. "name": class_obj.name,
  152. "start_time": class_obj.start_time,
  153. "end_time": class_obj.end_time,
  154. "location": class_obj.location,
  155. "lecturer": class_obj.lecturer,
  156. "organizer": class_obj.organizer,
  157. "contact": class_obj.contact,
  158. "introduction": class_obj.introduction,
  159. "content": class_obj.content,
  160. "cover_img": class_obj.cover_img,
  161. }
  162. except Exception as e:
  163. return {"msg": str(e), "code": 500}
  164. @classes.get("/get_class")
  165. async def get_class():
  166. try:
  167. class_list = await Class_list.all()
  168. classes = []
  169. for class_obj in class_list:
  170. class_data = {
  171. "class_id": class_obj.id,
  172. "name": class_obj.name,
  173. "start_time": class_obj.start_time,
  174. "end_time": class_obj.end_time,
  175. "location": class_obj.location,
  176. "lecturer": class_obj.lecturer,
  177. "organizer": class_obj.organizer,
  178. "contact": class_obj.contact,
  179. "introduction": class_obj.introduction,
  180. "content": class_obj.content,
  181. "cover_img": class_obj.cover_img
  182. }
  183. classes.append(class_data)
  184. return {"msg": "success", "code": 200, "classes": classes}
  185. except Exception as e:
  186. return {"msg": str(e), "code": 500}
  187. @classes.get("/search_class_like")
  188. async def search_class_like(keyword: str):
  189. try:
  190. class_list = await Class_list.filter(
  191. Q(name__icontains=keyword) | Q(lecturer__icontains=keyword)
  192. ).all()
  193. classes = []
  194. for class_obj in class_list:
  195. class_data = {
  196. "class_id": class_obj.id,
  197. "name": class_obj.name,
  198. "start_time": class_obj.start_time,
  199. "end_time": class_obj.end_time,
  200. "location": class_obj.location,
  201. "lecturer": class_obj.lecturer,
  202. "organizer": class_obj.organizer,
  203. "contact": class_obj.contact,
  204. "introduction": class_obj.introduction,
  205. "content": class_obj.content,
  206. "cover_img": class_obj.cover_img
  207. }
  208. classes.append(class_data)
  209. return {"msg": "success", "code": 200, "classes": classes}
  210. except Exception as e:
  211. return {"msg": str(e), "code": 500}
  212. IMAGEDIR = "images/"
  213. @classes.post("/upload/")
  214. async def create_upload_file(file: UploadFile = File(...)):
  215. file.filename = f"{uuid.uuid4()}.jpg"
  216. contents = await file.read()
  217. #save the file
  218. with open(f"{IMAGEDIR}{file.filename}", "wb") as f:
  219. f.write(contents)
  220. return {"filename": file.filename}
  221. @classes.get("/show/")
  222. async def read_random_file():
  223. # get random file from the image directory
  224. files = os.listdir(IMAGEDIR)
  225. random_index = randint(0, len(files) - 1)
  226. path = f"{IMAGEDIR}{files[random_index]}"
  227. return FileResponse(path)
  228. # @classes.post("/login")
  229. # async def login(data: OAuth2PasswordRequestForm = Depends()):
  230. # username = data.username
  231. # password = data.password
  232. # user = await query_user(username)
  233. # print(user)
  234. # if not user:
  235. # # you can return any response or error of your choice
  236. # raise InvalidCredentialsException
  237. # elif password != user.password:
  238. # raise InvalidCredentialsException
  239. # access_token = manager.create_access_token(
  240. # data={'sub': username}
  241. # )
  242. # return {'access_token': access_token}
  243. # @classes.post("/logout")
  244. # async def logout():
  245. # return {"msg":"logout success","code":200}
  246. # @classes.post("/add")
  247. # async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default='')):
  248. # if username and password and email:
  249. # u = await User.create(username=username, password=password, email=email)
  250. # if u:
  251. # send_email()
  252. # return {"msg": "已寄送認證信", "code": 200}
  253. # return {"msg": "create user failed", "code": 403}
  254. # def generate_password_reset_token(email: str) -> str:
  255. # delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
  256. # now = datetime.utcnow()
  257. # expires = now + delta
  258. # exp = expires.timestamp()
  259. # encoded_jwt = jwt.encode(
  260. # {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",
  261. # )
  262. # return encoded_jwt
  263. # def send_email(
  264. # email_to: str,
  265. # subject_template: str = "",
  266. # html_template: str = "",
  267. # environment: Dict[str, Any] = {},
  268. # ) -> None:
  269. # # assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
  270. # message = emails.Message(
  271. # subject=JinjaTemplate(subject_template),
  272. # html=JinjaTemplate(html_template),
  273. # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
  274. # )
  275. # smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
  276. # if settings.SMTP_TLS:
  277. # smtp_options["tls"] = True
  278. # if settings.SMTP_USER:
  279. # smtp_options["user"] = settings.SMTP_USER
  280. # if settings.SMTP_PASSWORD:
  281. # smtp_options["password"] = settings.SMTP_PASSWORD
  282. # response = message.send(to=email_to, render=environment, smtp=smtp_options)
  283. # logging.info(f"send email result: {response}")
  284. # def send_reset_password_email(email_to: str, email: str, token: str) -> None:
  285. # subject = f"Password recovery for user {email}"
  286. # with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
  287. # template_str = f.read()
  288. # server_host = settings.SERVER_HOST
  289. # link = f"{server_host}/reset-password?token={token}"
  290. # send_email(
  291. # email_to=email_to,
  292. # subject_template=subject,
  293. # html_template=template_str,
  294. # environment={
  295. # # "project_name": settings.PROJECT_NAME,
  296. # "username": email,
  297. # "email": email_to,
  298. # "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
  299. # "link": link,
  300. # },
  301. # )
  302. # @users.post("/password-recovery/{email}")
  303. # async def recover_password(email:str):
  304. # user = await User.filter(email=email).first()
  305. # if not user:
  306. # raise HTTPException(
  307. # status_code=404,
  308. # detail="The user with this username does not exist in the system.",
  309. # )
  310. # password_reset_token = generate_password_reset_token(email=email)
  311. # send_reset_password_email(
  312. # email_to=user.email, email=email, token=password_reset_token
  313. # )
  314. # return {"msg": "Password recovery email sent"}
  315. # @users.get("/delete_user/{id}")
  316. # async def delete(id: int):
  317. # if id:
  318. # await User.filter(id=id).delete()
  319. # return {"msg": "success", "code": 200}
  320. # return {"msg": "failed", "code": 400}