123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- from fastapi import APIRouter, Form, Depends, HTTPException
- from fastapi.security import OAuth2PasswordRequestForm
- from app.models.models import User
- from app.models.models import Class_list
- from app.api import deps
- from sqlalchemy.orm import Session
- from typing import Any, Dict
- import secrets
- from fastapi_login.exceptions import InvalidCredentialsException
- from fastapi_login import LoginManager
- from datetime import timedelta,datetime
- from app.config import settings
- from pathlib import Path
- from jose import jwt
- import emails
- from emails.template import JinjaTemplate
- import logging
- from tortoise.queryset import Q
- classes = APIRouter()
- # SECRET: str = secrets.token_urlsafe(32)
- # manager = LoginManager(SECRET, '/login',default_expiry=timedelta(hours=72))
- # @manager.user_loader()
- # async def query_user(user_id: str):
- # """
- # Get a user from the db
- # :param user_id: E-Mail of the user
- # :return: None or the user object
- # """
- # result = await User.filter(username=user_id).first()
- # if not result:
- # print('[]')
- # return []
- # return result
- # # return DB['users'].get(user_id)
- @classes.post("/insert_class")
- async def insert_class(
- id: int = Form(default=0),
- name: str = Form(default=''),
- start_time: datetime = Form(default=datetime.now()),
- end_time: datetime = Form(default=datetime.now()),
- location: str = Form(default=''),
- lecturer: str = Form(default=''),
- organizer: str = Form(default=''),
- contact: str = Form(default=''),
- introduction: str = Form(default=''),
- content: str = Form(default=''),
- cover_img: str = Form(default='')
- ):
- try:
- new_class = await Class_list.create(
- id=id,
- name=name,
- start_time=start_time,
- end_time=end_time,
- location=location,
- lecturer=lecturer,
- organizer=organizer,
- contact=contact,
- introduction=introduction,
- content=content,
- cover_img=cover_img
- )
- return {"msg": "success", "code": 200, "class_id": new_class.id}
- except Exception as e:
- return {"msg": str(e), "code": 500}
- # @classes.post("/update_class")
- # async def update_class(
- # id: int = Form(default=0),
- # name: str = Form(default=''),
- # start_time: datetime = Form(default=datetime.now()),
- # end_time: datetime = Form(default=datetime.now()),
- # location: str = Form(default=''),
- # lecturer: str = Form(default=''),
- # organizer: str = Form(default=''),
- # contact: str = Form(default=''),
- # introduction: str = Form(default=''),
- # content: str = Form(default='')
- # ):
- # try:
- # await Class_list.filter(id=id).update(
- # name=name,
- # start_time=start_time,
- # end_time=end_time,
- # location=location,
- # lecturer=lecturer,
- # organizer=organizer,
- # contact=contact,
- # introduction=introduction,
- # content=content
- # )
- # return {"msg": "success", "code": 200}
- # except Exception as e:
- # return {"msg": str(e), "code": 500}
- @classes.post("/update_class")
- async def update_class(
- id: int = Form(default=0),
- name: str = Form(default=''),
- start_time: datetime = Form(default=datetime.now()),
- end_time: datetime = Form(default=datetime.now()),
- location: str = Form(default=''),
- lecturer: str = Form(default=''),
- organizer: str = Form(default=''),
- contact: str = Form(default=''),
- introduction: str = Form(default=''),
- content: str = Form(default=''),
- cover_img: str = Form(default=''),
- ):
- try:
- class_obj = await Class_list.get(id=id)
- if name.strip() != '':
- class_obj.name = name
- if start_time:
- class_obj.start_time = start_time
- if end_time:
- class_obj.end_time = end_time
- if location.strip() != '':
- class_obj.location = location
- if lecturer.strip() != '':
- class_obj.lecturer = lecturer
- if organizer.strip() != '':
- class_obj.organizer = organizer
- if contact.strip() != '':
- class_obj.contact = contact
- if introduction.strip() != '':
- class_obj.introduction = introduction
- if content.strip() != '':
- class_obj.content = content
- if cover_img.strip() != '':
- class_obj.cover_img = cover_img
- await class_obj.save()
- return {"msg": "success", "code": 200}
- except Exception as e:
- return {"msg": str(e), "code": 500}
- @classes.post("/delete_class")
- async def delete(id: int):
- if id:
- await Class_list.filter(id=id).delete()
- return {"msg": "success", "code": 200}
- @classes.get("/search_class")
- async def search_class(id: int):
- try:
- class_obj = await Class_list.get(id=id)
- return {
- "msg": "success",
- "code": 200,
- "class_id": class_obj.id,
- "name": class_obj.name,
- "start_time": class_obj.start_time,
- "end_time": class_obj.end_time,
- "location": class_obj.location,
- "lecturer": class_obj.lecturer,
- "organizer": class_obj.organizer,
- "contact": class_obj.contact,
- "introduction": class_obj.introduction,
- "content": class_obj.content,
- "cover_img": class_obj.cover_img,
- }
- except Exception as e:
- return {"msg": str(e), "code": 500}
- @classes.get("/get_class")
- async def get_class():
- try:
- class_list = await Class_list.all()
- classes = []
- for class_obj in class_list:
- class_data = {
- "class_id": class_obj.id,
- "name": class_obj.name,
- "start_time": class_obj.start_time,
- "end_time": class_obj.end_time,
- "location": class_obj.location,
- "lecturer": class_obj.lecturer,
- "organizer": class_obj.organizer,
- "contact": class_obj.contact,
- "introduction": class_obj.introduction,
- "content": class_obj.content,
- "cover_img": class_obj.cover_img
- }
- classes.append(class_data)
- return {"msg": "success", "code": 200, "classes": classes}
- except Exception as e:
- return {"msg": str(e), "code": 500}
- @classes.get("/search_class_like")
- async def search_class_like(keyword: str):
- try:
- class_list = await Class_list.filter(
- Q(name__icontains=keyword) | Q(lecturer__icontains=keyword)
- ).all()
- classes = []
- for class_obj in class_list:
- class_data = {
- "class_id": class_obj.id,
- "name": class_obj.name,
- "start_time": class_obj.start_time,
- "end_time": class_obj.end_time,
- "location": class_obj.location,
- "lecturer": class_obj.lecturer,
- "organizer": class_obj.organizer,
- "contact": class_obj.contact,
- "introduction": class_obj.introduction,
- "content": class_obj.content,
- "cover_img": class_obj.cover_img
- }
- classes.append(class_data)
- return {"msg": "success", "code": 200, "classes": classes}
- except Exception as e:
- return {"msg": str(e), "code": 500}
- # @classes.post("/login")
- # async def login(data: OAuth2PasswordRequestForm = Depends()):
- # username = data.username
- # password = data.password
- # user = await query_user(username)
- # print(user)
- # if not user:
- # # you can return any response or error of your choice
- # raise InvalidCredentialsException
- # elif password != user.password:
- # raise InvalidCredentialsException
- # access_token = manager.create_access_token(
- # data={'sub': username}
- # )
- # return {'access_token': access_token}
- # @classes.post("/logout")
- # async def logout():
- # return {"msg":"logout success","code":200}
- # @classes.post("/add")
- # async def add(username: str = Form(default=''), password: str = Form(default=''), email: str = Form(default='')):
- # if username and password and email:
- # u = await User.create(username=username, password=password, email=email)
- # if u:
- # send_email()
- # return {"msg": "已寄送認證信", "code": 200}
- # return {"msg": "create user failed", "code": 403}
- # def generate_password_reset_token(email: str) -> str:
- # delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
- # now = datetime.utcnow()
- # expires = now + delta
- # exp = expires.timestamp()
- # encoded_jwt = jwt.encode(
- # {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",
- # )
- # return encoded_jwt
- # def send_email(
- # email_to: str,
- # subject_template: str = "",
- # html_template: str = "",
- # environment: Dict[str, Any] = {},
- # ) -> None:
- # # assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
- # message = emails.Message(
- # subject=JinjaTemplate(subject_template),
- # html=JinjaTemplate(html_template),
- # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
- # )
- # smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
- # if settings.SMTP_TLS:
- # smtp_options["tls"] = True
- # if settings.SMTP_USER:
- # smtp_options["user"] = settings.SMTP_USER
- # if settings.SMTP_PASSWORD:
- # smtp_options["password"] = settings.SMTP_PASSWORD
- # response = message.send(to=email_to, render=environment, smtp=smtp_options)
- # logging.info(f"send email result: {response}")
- # def send_reset_password_email(email_to: str, email: str, token: str) -> None:
- # subject = f"Password recovery for user {email}"
- # with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
- # template_str = f.read()
- # server_host = settings.SERVER_HOST
- # link = f"{server_host}/reset-password?token={token}"
- # send_email(
- # email_to=email_to,
- # subject_template=subject,
- # html_template=template_str,
- # environment={
- # # "project_name": settings.PROJECT_NAME,
- # "username": email,
- # "email": email_to,
- # "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
- # "link": link,
- # },
- # )
- # @users.post("/password-recovery/{email}")
- # async def recover_password(email:str):
- # user = await User.filter(email=email).first()
- # if not user:
- # raise HTTPException(
- # status_code=404,
- # detail="The user with this username does not exist in the system.",
- # )
- # password_reset_token = generate_password_reset_token(email=email)
- # send_reset_password_email(
- # email_to=user.email, email=email, token=password_reset_token
- # )
- # return {"msg": "Password recovery email sent"}
- # @users.get("/delete_user/{id}")
- # async def delete(id: int):
- # if id:
- # await User.filter(id=id).delete()
- # return {"msg": "success", "code": 200}
- # return {"msg": "failed", "code": 400}
|