from ast import Or from email.policy import default from sre_constants import ANY from fastapi import APIRouter, Form, Depends, HTTPException, File, UploadFile,Request from typing import List,Optional from fastapi.responses import FileResponse from random import randint from fastapi.security import OAuth2PasswordRequestForm from app.models.models import User,User_information,Favorite_course,Article_list,Class_date,Attend_record,One_day_class,Outter_class_list,Proposal from app.models.models import Class_list,Schools,Class_detail,Class_name,Registration,Group_name,Online_course,User_resume,User_point,Point_exchange_record from app.api import deps from sqlalchemy.orm import Session from typing import Any, Dict import secrets from fastapi_login.exceptions import InvalidCredentialsException from fastapi_login import LoginManager from datetime import timedelta,datetime from app.config import settings from pathlib import Path from jose import jwt from tortoise.queryset import Q from fastapi.responses import HTMLResponse import requests import json from itertools import chain from tortoise import fields import random from app.log import my_log import rpyc import emails from email.mime.text import MIMEText from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart import re classes = APIRouter() # logging.basicConfig( # filename='app-basic.log', # level=logging.INFO, # format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # datefmt='%Y-%m-%d %H:%M:%S' # ) # logger = logging.getLogger(__name__) def send_email( email_to: str, token: str, subject_template: str = "", html_template: str = "", environment: Dict[str, Any] = {}, ): # message = emails.Message( # subject=JinjaTemplate(subject_template), # html=JinjaTemplate(html_template), # mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL), # ) try: subject=subject_template html=html_template mailobj={} mailobj['toaddr']=email_to mailobj['title']=subject mailobj['totext']=html conn = rpyc.connect("192.168.192.80", 12345) conn.root.mailto(mailobj) return {"message":f"send email"} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/test_email") async def test_email( email_to: str, subject_template: str = "", html_template: str = "" ): email = email_to subject = subject_template message = html_template result = send_email(email,"",subject,message) return result IMAGEDIR = "/var/www/ntcri/assets/" IMAGEDIR_short = "assets/" encode_set = { "樹藝" : "ARB", "漆藝" : "LAC", "藍染" : "AIZ", "蠟雕" : "WAX", "竹工藝籃" :"BAM", "金工/飾品" :"MWT", "蠟燭/香氛/調香" : "CAN" , "編織/羊毛氈/縫紉" : "KNI" , "植栽/花藝" : "PLA" , "陶藝/玻璃" : "CER" , "皮件/皮革" : "LEA" , "插畫/繪畫/寫字" : "Ill", "木工/竹藝" :"WOO", "安心活動" : "HER" } taiwan_city = ["基隆","台北,臺北","新北","宜蘭","花蓮","台東,臺東","桃園","新竹","苗栗","台中,臺中","彰化","南投","雲林","嘉義","台南,臺南","高雄","屏東"] async def create_upload_files(files:Optional[List[UploadFile]] = File(None)): files_url = {} if files : file_num = 1 print(file_num) for file in files: contents = await file.read() words = file.filename.split('.') # if words[len(words)-1] != ("png" or "jpg" or "pdf" or "jpeg"): # return {"msg":"wrong filetype"} #save the file with open(f"{IMAGEDIR}{file.filename}", "wb") as f: f.write(contents) file_name = "file" + str(file_num) print(file_name) files_url[file_name]=f"{IMAGEDIR_short}{file.filename}" file_num=file_num+1 return files_url async def check_token(access_token: str): result = await User.get_or_none(token=access_token) if not result: print("no access") return None user_id = result.id return user_id async def update_location_time(location_id: int): if location_id: school = await Schools.get(id=location_id) school.update_time = datetime.now() await school.save() return {"msg": "success", "code": 200} @classes.post("/change_class_img") async def change_class_img( category: str , files:Optional[List[UploadFile]] = File(None) ): try: files_url = {} file_num = 0 for file in files: contents = await file.read() #save the file with open(f"{IMAGEDIR}{file.filename}", "wb") as f: f.write(contents) file_name = "file" + str(file_num) files_url[file_name]=f"{IMAGEDIR_short}{file.filename}" file_num=file_num+1 class_name_list = await Class_name.filter(Q(category__icontains=category)& Q(is_inner = 0)).all() for class_name_obj in class_name_list: x = random.randrange(4) file_name = "file" + str(x) class_name_obj.cover_img = files_url[file_name] await class_name_obj.save() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} class resume(): imgs = UploadFile teacher_name : str work_type : str experience : str expertise : str license : str media : str introduction: str # @classes.post("/check_city") # async def check_city( # Lng: str = None, # Lat: str = None, # ): # try: # country : Any # city : Any # if Lng and Lat: # url = 'https://maps.googleapis.com/maps/api/geocode/json?' # params = {'key':'AIzaSyDETbkTAmZNOECx2u4rmNudHvyoQTgDbc4', 'language':'zh-TW','latlng':''} # params['latlng'] = Lat + ',' + Lng # #print(params['address']) # res = requests.get(url, params=params) # mes = json.loads(res.text) # list = mes['results'][0]['address_components'] # for tmp in list : # if "administrative_area_level_1" in tmp['types']: # city = tmp["long_name"] # if "administrative_area_level_2" in tmp['types']: # country = tmp["long_name"] # else : # return {"msg": "please input Lng and Lat", "code": 500} # return {"msg": "success", "code": 200, "result": city+','+country} # except Exception as e: # return {"msg": str(e), "code": 500} @classes.post("/insert_school") async def insert_school( request: Request, location_name: str = Form(default=''), Lng: str = Form(default=None), Lat: str = Form(default=None), address : str = Form(default=''), school_introduction : str = Form(default=''), email : str = Form(default=''), phone : str = Form(default=''), access_token:str = Form(default=None), teachers_list :str = Form(default=None), is_pass_proposal : int = Form(default=0), principal_user_email : str = Form(default=None), ): try: user_id = None if access_token: user_id = await check_token(access_token) if user_id == None: return {"msg": "沒有權限", "code": 500} # teachers = [] # for teacher in teachers_list: # try: # teacher_info = await User.get(email=teacher,is_avtive=1) # teachers.append(teacher_info.id) # except: # my_log("info",__name__,f"no this teacher") try: # 這裡response採用json格式,可以自由選擇為json?或是xml? if Lat == None or Lng == None: url = 'https://maps.googleapis.com/maps/api/geocode/json?' params = {'key':'AIzaSyDETbkTAmZNOECx2u4rmNudHvyoQTgDbc4', 'address':''} params['address'] = address #print(params['address']) res = requests.get(url, params=params) mes = json.loads(res.text) Lat = mes['results'][0]['geometry']['location']['lat'] Lng = mes['results'][0]['geometry']['location']['lng'] print(Lat,Lng) except : my_log("error",__name__,f"not get lat and lng") principal_user_id =None if principal_user_email: try: principal_user = await User.get(email=principal_user_email,is_avtive=1) principal_user_id = principal_user.id except: my_log("info",__name__,f"no this teacher") new_school = await Schools.create( name=location_name, longitude=Lng, latitude=Lat, address = address, update_time = datetime.now(), introduction = school_introduction , email= email, phone = phone, create_user_id = user_id, is_delete = 0, is_pass_proposal = is_pass_proposal, principal_user_id = principal_user_id, teachers = teachers_list ) client_ip = request.client.host my_log("info",__name__,f"Client IP: {client_ip} - new school:{new_school.id}") return {"msg": "success", "code": 200, "location_id": new_school.id} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/insert_class_name") async def insert_class_name( name: str = Form(default=''), location_id: int = Form(default=1), category: str = Form(default=''), introduction: str = Form(default=''), organizer: str = Form(default=''), cover_img_file:UploadFile = File(default=''), group_id : int = Form(default=1), group_sort :str = Form(default=''), recommend: int = Form(default=0), special_class_list_name : str = Form(default=None), is_inner : int = Form(default=0), is_check : int = Form(default=0), access_token:str = Form(default=None), teachers_resume : str = Form(default=None), syllabus : str = Form(default=None) ): try: cover_img = '' if cover_img_file != '': contents = await cover_img_file.read() #save the file with open(f"{IMAGEDIR}{cover_img_file.filename}", "wb") as f: f.write(contents) cover_img = f"{IMAGEDIR_short}{cover_img_file.filename}" user_id = None if access_token: user_id = await check_token(access_token) if user_id == None: return {"msg": "沒有權限", "code": 500} # 課程編碼 encode = "" if encode_set.__contains__(category) : encode = encode_set[category] else: encode = "OTH" now = datetime.now() encode += now.strftime("%Y%m%d") new_class_name = await Class_name.create( name=name, school_id=location_id, category=category, introduction=introduction, organizer=organizer, cover_img=cover_img, group_id=group_id, group_sort=group_sort, recommend = recommend, special_class_list_name = special_class_list_name, is_inner = is_inner, is_check = is_check, create_user_id = user_id, create_time = datetime.now(), is_delete = 0, is_record_point =0, teachers_resume = teachers_resume, syllabus = syllabus ) encode += str(new_class_name.id) new_class_name.encode = encode await new_class_name.save() update_location_time(location_id= location_id) return {"msg": "success", "code": 200, "new_class_name_id": new_class_name.id} except Exception as e: return {"msg": str(e), "code": 500} # @classes.get("/insert_encode") # async def insert_encode(): # try: # class_name = await Class_name.all() # for classes in class_name: # encode = "" # if encode_set.__contains__(classes.category) : # encode = encode_set[classes.category] # else: # encode = "OTH" # now = classes.create_time # encode += now.strftime("%Y%m%d") # encode += str(classes.id) # classes.encode = encode # await classes.save() # except Exception as e: # return {"msg": str(e), "code": 500} # 創建場次 @classes.post("/insert_event") async def insert_event( request : Request, name_id: int = Form(default=0), event: str = Form(default=''), start_time: datetime = Form(default=None), end_time: datetime = Form(default=None), contact: str = Form(default=''), lecturer: str = Form(default=''), location: str = Form(default=''), content: str = Form(default=''), URL: str = Form(default=''), people : str = Form(default=''), fee_method: str = Form(default=''), registration_way: str = Form(default=''), registration_start: datetime = Form(default=None), registration_end: datetime = Form(default=None), number_limit: int = Form(default=None), remark : str = Form(default=''), ATM_address: str = Form(default=''), access_token:str = Form(default=None), fee_payment: str = Form(default=''), number_minimum : int = Form(default=None), files_url = Depends(create_upload_files) ): try: user_id = None if access_token: user_id = await check_token(access_token) if user_id == None: return {"msg": "沒有權限", "code": 500} # 檢查是否有該課程 class_name_list = await Class_name.filter(id=name_id).all() if class_name_list == []: return {"msg": "沒有此課程", "code": 500} if files_url: if files_url.__contains__("msg"): return {"msg": files_url["msg"], "code": 500} # 判斷時間合法 if start_time and end_time : if start_time >= end_time : return {"msg": "時間輸入錯誤", "code": 500} new_class = await Class_list.create( name_id=name_id, event =event, start_time=start_time, end_time=end_time, contact=contact, lecturer=lecturer, location=location, content=content, URL=URL, people=people, fee_method=fee_method, registration_way=registration_way, remark=remark, ATM_address=ATM_address, create_user_id = user_id, fee_payment = fee_payment, event_create_time = datetime.now(), files=files_url ) client_ip = request.client.host try: if (registration_start and registration_end and number_limit) or number_minimum: await Class_date.create( class_list_id = new_class.id, registration_start = registration_start, registration_end = registration_end, number_limit = number_limit, amount_left = number_limit, number_minimum = number_minimum ) except: print("input class_date fail") # 寄開課信 try : if user_id: try: user = await User.get_or_none(id = user_id) user_info = await User_information.get_or_none(user_id=user_id) if user is None or user_info is None: my_log("info",__name__,f"Client IP: {client_ip} - {user_id} has no user resume") return {"msg": "請去建立工藝家履歷", "code": 500} email = user.email subject = "創建課程通知" # message = f"親愛的工藝家{user_info.name}您好,
\ # 感謝您埋下工藝種子!
\ # 您創建的課程由管理員進行審核中,將於3-7個工作天內e-mail通知您審核結果。
\ # 若7天後尚未收到課程審核通知,請將後台畫面、聯絡資訊等截圖寄送至客服信箱:
\ # craftology@ntcri.gov.tw,以便客服為您查詢。
\ # 註:此封信件為系統自動發送,請勿回信,謝謝。" with open("/var/www/ntcri/assets/edm/success_open_class/index.html", 'r', encoding='utf-8') as html_file: html_template = html_file.read() message = html_template.replace("{username}",user_info.name) send_email(email,"",subject,message) except Exception as e: return {"msg": str(e), "code": 500} except: print("no this user") return {"msg": "success", "code": 200, "class_id": new_class.id} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/auto_create_session") async def auto_create_session( class_event_id : int = Form(default=0), week_day_str : str = Form(default="[]"), start_week_time : str = Form(default="[]"), end_week_time : str = Form(default="[]"), ): try: week_day = eval(week_day_str) start_week_time= eval(start_week_time) end_week_time= eval(end_week_time) #print(week_day) class_obj = await Class_list.get_or_none(id = class_event_id) if class_obj is None : return {"msg": "no this class", "code": 500} time_stemp = class_obj.start_time session = 1 session_list = await Class_detail.filter(class_list_id=class_event_id).delete() while time_stemp <= class_obj.end_time: if week_day[time_stemp.weekday()]: s_start_time = time_stemp.date().strftime("%Y-%m-%d")+" "+start_week_time[time_stemp.weekday()] s_end_time = time_stemp.date().strftime("%Y-%m-%d")+" "+end_week_time[time_stemp.weekday()] #time_del = class_obj.end_time.hour - time_stemp.hour #time_del_minutes = class_obj.end_time.minute - time_stemp.minute #time_end = time_stemp + timedelta(hours=time_del,minutes=time_del_minutes) #print(time_end,class_obj.end_time.hour) day1 = datetime.strptime(s_start_time, '%Y-%m-%d %H:%M:%S') day2 = datetime.strptime(s_end_time, '%Y-%m-%d %H:%M:%S') time_difference = day2 - day1 total_minutes = int(time_difference.total_seconds() // 60) hour=total_minutes//60 + round((total_minutes%60)/60,1) new_session = await Class_detail.create( class_list_id=class_event_id, #start_time=time_stemp, #end_time=time_end, start_time=s_start_time, end_time=s_end_time, hour = hour, sessions=session, content = "" ) session+=1 time_stemp = time_stemp + timedelta(days=1) return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/insert_session") async def insert_session( class_event_id : int = Form(default=0), start_time: datetime = Form(default=datetime.now()), end_time: datetime = Form(default=datetime.now()), content : str = Form(default='') ): try: session_list = await Class_detail.filter(class_list_id=class_event_id).all() session = 0 if session_list != []: for session_obj in session_list: if session < session_obj.sessions: session = session_obj.sessions time_difference = end_time - start_time session_list = await Class_detail.filter(class_list_id=class_event_id).delete() total_minutes = int(time_difference.total_seconds() / 60) hour=total_minutes//60 + round((total_minutes%60)/60,1) new_session = await Class_detail.create( class_list_id=class_event_id, start_time=start_time, end_time=end_time, sessions=session +1, content = content, hour = hour ) return {"msg": "success", "code": 200, "new_session_id": new_session.id} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/update_school") async def update_school( location_id: int = Form(default=0), location_name: str = Form(default=''), Lng: str = Form(default=''), Lat: str = Form(default=''), address : str = Form(default=''), school_introduction : str = Form(default=None), email : str = Form(default=None), phone : str = Form(default=None), teachers_list : List[str] = Form(default=[]), is_pass_proposal : int = Form(default=None), principal_user_email : str = Form(default=None) ): try: school = await Schools.get(id=location_id) if location_name.strip() != '': school.name = location_name if Lng != '': school.longitude = Lng if Lat.strip() != '': school.latitude = Lat if address.strip() != '': school.address = address if school_introduction: school.introduction = school_introduction if email: school.email = email if phone: school.phone = phone if is_pass_proposal is not None: if is_pass_proposal == 0 and school.is_pass_proposal == 2: # 寄開課信 try : if school.create_user_id: try: try : user = await User.get(id = school.create_user_id) user_info = await User_information.get(user_id = school.create_user_id) except: return {"msg": "請去建立工藝家履歷", "code": 500} email = user.email subject = "重新提案通知" # message = f"親愛的工藝家{user_info.name}您好,
\ # 感謝您埋下工藝種子!
\ # 您的提案由管理員進行審核中,將於3-7個工作天內e-mail通知您審核結果。
\ # 若7天後尚未收到課程審核通知,請將後台畫面、聯絡資訊等截圖寄送至客服信箱:
\ # craftology@ntcri.gov.tw,以便客服為您查詢。
\ # 註:此封信件為系統自動發送,請勿回信,謝謝。" with open("/var/www/ntcri/assets/edm/porposal/index.html", 'r', encoding='utf-8') as html_file: html_template = html_file.read() message = html_template.replace("{username}",user_info.name) send_email(email,"",subject,message) except: print("have no email") except: print("no this user") if is_pass_proposal != 0 and is_pass_proposal ==1 : if school.create_user_id: try: user = await User.get_or_none(id = school.create_user_id) user_info = await User_information.get(user_id = school.create_user_id) if user is None : return {"msg": "請去建立工藝家履歷", "code": 500} email = user.email subject = "提案審核通過通知" # message = f"親愛的工藝家{user_info.name}您好,
\ # 恭喜您埋下工藝種子已成功發芽,讓我們一起期待它的成長茁壯!
\ # 您的提案已由管理員審核通過!
\ # 您可透過「會員專區」→「我的提案」檢視提案內容,並開始開課。" with open("/var/www/ntcri/assets/edm/pass_porposal/index.html", 'r', encoding='utf-8') as html_file: html_template = html_file.read() message = html_template.replace("{username}",user_info.name) send_email(email,"",subject,message) except: print("have no email") school.is_pass_proposal = is_pass_proposal if principal_user_email: try: principal_user = await User.get(email=principal_user_email,is_avtive=1) school.principal_user_id = principal_user.id except: my_log("info",__name__,f"no this teacher") if teachers_list != []: teachers = [] for teacher in teachers_list: try: teacher_info = await User.get(email=teacher,is_avtive=1) teachers.append(teacher_info.id) except: my_log("info",__name__,f"no this teacher") school.teachers = str(teachers) await school.save() update_location_time(location_id= location_id) return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/update_class_name") async def update_class_name( request : Request, class_name_id: int = Form(default=0), name: str = Form(default=None), location_id: int = Form(default=None), category: str = Form(default=None), introduction: str = Form(default=None), organizer: str = Form(default=None), cover_img_file:UploadFile = File(default=""), group_id : int = Form(default=None), group_sort : str = Form(default=None), recommend: int = Form(default=None), special_class_list_name : str = Form(default=None), is_inner : int = Form(default=None), is_check : int = Form(default=None), teachers_resume : str = Form(default=None), syllabus : str = Form(default=None) ): try: class_name = await Class_name.get(id=class_name_id) if name is not None: class_name.name = name if location_id is not None: class_name.school_id = location_id update_location_time(location_id= location_id) if category is not None: class_name.category = category if introduction is not None: class_name.introduction = introduction if organizer is not None: class_name.organizer = organizer if group_id is not None : class_name.group_id = group_id if is_inner is not None: class_name.is_inner = is_inner if syllabus is not None: class_name.syllabus = syllabus if teachers_resume is not None: class_name.teachers_resume = teachers_resume if cover_img_file != "": contents = await cover_img_file.read() with open(f"{IMAGEDIR}{cover_img_file.filename}", "wb") as f: f.write(contents) class_name.cover_img = f"{IMAGEDIR_short}{cover_img_file.filename}" if group_sort is not None: class_name.group_sort = group_sort if recommend is not None: class_name.recommend = recommend client_ip = request.client.host if is_check is not None: if class_name.is_check == 0 and is_check ==1: user_id = class_name.create_user_id if user_id: try: user = await User.get(id = user_id) user_info = await User_information.get(user_id = user_id) except: # my_log(type,name,msg) my_log("info",__name__,f"Client IP: {client_ip} - {user_id} has no user resume") return {"msg": "請去建立工藝家履歷", "code": 500} email = user.email subject = "課程核准通知" # message = f"親愛的工藝家{user_info.name}您好,
\ # 恭喜您埋下工藝種子已成功發芽,讓我們一起期待它的成長茁壯!
\ # 您創建的課程已由管理員審核通過並上架成功!
\ # 您可透過「會員專區」→「我的開課」檢視跟管理學員報名狀況。
\ # 註:此封信件為系統自動發送,請勿回信,謝謝。" with open("/var/www/ntcri/assets/edm/pass_check/index.html", 'r', encoding='utf-8') as html_file: html_template = html_file.read() message = html_template.replace("{username}",user_info.name) try: send_email(email,"",subject,message) except: my_log("error",__name__,f"Client IP: {client_ip} - send email fail") class_name.is_check = is_check if special_class_list_name: class_name.special_class_list_name = special_class_list_name await class_name.save() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/cancel_proposal") async def cancel_proposal( request: Request, location_id: int = Form(default=0), access_token:str = Form(default=None), message:str = Form(default=None) ): try: user_id = None if access_token: user_id = await check_token(access_token) if user_id: user = await User.get(id=user_id) else : return {"msg": "no access", "code": 200} if user.is_superuser != 2: return {"msg": "no access", "code": 200} # 檢查是否有該課程 class_name_list = await Schools.filter(id=location_id).all() if class_name_list == []: my_log("info",__name__,f"Client IP: {client_ip} - no this class_name_id:{location_id}") return {"msg": "沒有此課程", "code": 200} class_name = await Schools.get(id=location_id) class_name.is_pass_proposal = 2 await class_name.save() if class_name.create_user_id: creater = await User.get(id=class_name.create_user_id) user_info = await User_information.get_or_none(user_id=user_id) email = creater.email subject = '提案駁回通知' # email_message = f'親愛的工藝家{creater.username}您好,
\ # 很遺憾您埋下的工藝種子未能成功發芽。
\ # 您的提案經管理員審核,因{message}未能通過審核,
\ # 請別灰心!只要修正完善內容再次提交即有機會讓您的工藝種子發芽。
\ # 若對於審核結果有疑慮,請將後台畫面、聯絡資訊等截圖寄送至客服信箱:
\ # craftology@ntcri.gov.tw,以便客服為您查詢。
\ # 註:此封信件為系統自動發送,請勿回信,謝謝。' with open("/var/www/ntcri/assets/edm/porposal_refuse/index.html", 'r', encoding='utf-8') as html_file: html_template = html_file.read() email_message = html_template.replace("{username}",user_info.name) email_message = email_message.replace("{msg}",message) send_email(email,"",subject,email_message) else : print("no creater") client_ip = request.client.host my_log("info",__name__,f"{client_ip} - school_id:{location_id} change is_pass_proposal to 2") return {"msg": "success", "code": 200} except Exception as e: client_ip = request.client.host my_log("error",__name__,f"{client_ip} - An exception occurred: {e} \n Request : {request}") return {"msg": str(e), "code": 500} @classes.post("/cancel_class_check") async def cancel_class_check( request: Request, class_name_id: int = Form(default=0), access_token:str = Form(default=None), message:str = Form(default=None) ): try: user_id = None if access_token: user_id = await check_token(access_token) if user_id: user = await User.get(id=user_id) else : return {"msg": "no access", "code": 200} if user.is_superuser != 2: return {"msg": "no access", "code": 200} # 檢查是否有該課程 class_name_list = await Class_name.filter(id=class_name_id).all() if class_name_list == []: my_log("info",__name__,f"Client IP: {client_ip} - no this class_name_id:{class_name_id}") return {"msg": "沒有此課程", "code": 200} class_name = await Class_name.get(id=class_name_id) class_name.is_check = 2 await class_name.save() if class_name.create_user_id: creater = await User.get(id=class_name.create_user_id) user_info = await User_information.get_or_none(user_id=user_id) email = creater.email subject = '開課駁回通知' # email_message = f'親愛的工藝家{creater.username}您好,
\ # 很遺憾您埋下的工藝種子未能成功發芽。
\ # 您創建的課程經管理員審核,因{message}未能通過審核,
\ # 請別灰心!只要修正完善內容再次提交即有機會讓您的工藝種子發芽。
\ # 若對於審核結果有疑慮,請將後台畫面、聯絡資訊等截圖寄送至客服信箱:
\ # craftology@ntcri.gov.tw,以便客服為您查詢。
\ # 註:此封信件為系統自動發送,請勿回信,謝謝。' with open("/var/www/ntcri/assets/edm/class_refuse/index.html", 'r', encoding='utf-8') as html_file: html_template = html_file.read() email_message = html_template.replace("{username}",user_info.name) email_message = email_message.replace("{msg}",message) send_email(email,"",subject,email_message) else : print("no creater") client_ip = request.client.host my_log("info",__name__,f"{client_ip} - class_name_id:{class_name_id} change is_check to 2") return {"msg": "success", "code": 200} except Exception as e: client_ip = request.client.host my_log("error",__name__,f"{client_ip} - An exception occurred: {e} \n Request : {request}") return {"msg": str(e), "code": 500} @classes.post("/update_event") async def update_event( id: int = Form(default=0), name_id: int = Form(default=None), event: str = Form(default=None), start_time: datetime = Form(default=None), end_time: datetime = Form(default=None), contact: str = Form(default=None), lecturer: str = Form(default=None), location: str = Form(default=None), content: str = Form(default=None), URL: str = Form(default=None), people : str = Form(default=None), fee_method: str = Form(default=None), registration_way: str = Form(default=None), registration_start: datetime = Form(default=None), registration_end: datetime = Form(default=None), number_limit: int = Form(default=None), remark : str = Form(default=None), ATM_address :str = Form(default=None), fee_payment : str = Form(default=None), number_minimum: int = Form(default=None), files_url = Depends(create_upload_files) ): try: class_obj = await Class_list.get_or_none(id=id) if class_obj is None : return {"msg": "no this event", "code": 500} if name_id != None: class_obj.name_id = name_id if event != None: class_obj.event = event if start_time != None: class_obj.start_time = start_time if end_time != None: class_obj.end_time = end_time if lecturer != None: class_obj.lecturer = lecturer if location != None: class_obj.location = location if contact != None: class_obj.contact = contact if content!= None: class_obj.content = content if URL!= None: class_obj.URL = URL if people!= None: class_obj.people = people if fee_method!= None: class_obj.fee_method = fee_method if registration_way!= None: class_obj.registration_way = registration_way if remark!= None: class_obj.remark = remark if ATM_address!= None: class_obj.ATM_address = ATM_address if fee_payment!= None: class_obj.fee_payment = fee_payment if files_url: if files_url.__contains__("msg"): return {"msg": files_url["msg"], "code": 500} class_obj.files_url = files_url class_date_obj,created = await Class_date.get_or_create( class_list_id=id, defaults={ "registration_start" : registration_start, "registration_end" : registration_end, "number_limit" : number_limit, "amount_left" : number_limit, "number_minimum" : number_minimum } ) if not created : if registration_start : class_date_obj.registration_start = registration_start if registration_end : class_date_obj.registration_end = registration_end if number_limit : class_date_obj.number_limit = number_limit if number_minimum != None: class_date_obj.number_minimum = number_minimum await class_date_obj.save() await class_obj.save() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/update_session") async def update_session( session_id : int = Form(default=0), class_event_id : int = Form(default=0), start_time: datetime = Form(default=datetime.now()), end_time: datetime = Form(default=datetime.now()), sessions: str = Form(default=0), content : str = Form(default='') ): try: class_session_obj = await Class_detail.get(id=session_id) if class_event_id != 0: class_session_obj.class_list_id = class_event_id if start_time != '': class_session_obj.start_time = start_time if end_time != '': class_session_obj.end_time = end_time if sessions.strip() != '': class_session_obj.sessions = sessions if content.strip() != '': class_session_obj.content = content await class_session_obj.save() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/delete_school") async def delete_school(location_id: int): if location_id: try: school = await Schools.get(id=location_id) except: return {"msg": "無法找到據點", "code": 500} school.is_delete = (school.is_delete+1)%2 await school.save() return {"msg": "success", "code": 200} @classes.post("/delete_session") async def delete_session(id: int): if id: await Class_detail.filter(id=id).delete() return {"msg": "success", "code": 200} @classes.post("/delete_event") async def delete(id: int): if id: await Class_detail.filter(class_list_id=id).delete() await Class_list.filter(id=id).delete() await Class_date.filter(class_list_id=id).delete() return {"msg": "success", "code": 200} @classes.post("/delete_class_name") async def delete(id: int): if id: try: class_name = await Class_name.get(id=id) except: return {"msg": "無法找到課程", "code": 500} class_name.is_delete = (class_name.is_delete+1)%2 # 相反狀態(如果本來被刪除就恢復) await class_name.save() # class_event_list = await Class_list.filter(name_id=id).all() # for class_event_obj in class_event_list: # await Class_detail.filter(class_list_id=class_event_obj.id).delete() # await Class_date.filter(class_list_id=class_event_obj.id).delete() # await Class_list.filter(name_id=id).delete() # await Class_name.filter(id=id).delete() return {"msg": "success", "code": 200} @classes.get("/get_class_state") async def check_date_state( class_name_id : Optional[int] = None, class_event_id : Optional[int] = None, special_class_list_name : Optional[str] = None, ): if class_name_id: Q_word = Q(name_id=class_name_id) elif class_event_id: Q_word = Q(id=class_event_id) else: return {"msg": 'please input ID', "code": 500} if special_class_list_name == "one_day_class": class_list = await One_day_class.filter(Q_word).all() elif special_class_list_name == "outter_class_list": class_list = await Outter_class_list.filter(Q_word).all() else : class_list = await Class_list.filter(Q_word).all() result = {"state": "尚未開放報名"} try: if class_list == [] : result["state"] = "尚未創建場次" for class_obj in class_list: try: class_name = await Class_name.get_or_none(id = class_obj.name_id) if class_name.is_check == 0 : result["state"] = "課程等待審核" break if class_name.is_check == 2 : result["state"] = "課程審核未通過" break if class_name: print(class_name.is_record_point) print(class_name.name) school = await Schools.get_or_none(id = class_name.school_id) print(class_name.school_id) if school : print(school.is_pass_proposal) if school.is_pass_proposal == 0 : result["state"] = "正在提案階段" break if school.is_pass_proposal == 2 : result["state"] = "提案已被駁回" break if class_name.is_record_point == 1 : result["state"] = "課程已關閉" break if class_obj.start_time and class_obj.end_time: if class_obj.start_time.replace(tzinfo=None) <= datetime.now() and class_obj.end_time.replace(tzinfo=None) >= datetime.now(): result["state"] = "開課中" break elif class_obj.end_time.replace(tzinfo=None) < datetime.now(): result["state"] = "課程已結束" else: pass else : result["state"] = "尚未創建課程" except Exception as e: pass if special_class_list_name == "one_day_class": if class_obj.reg_deadline.replace(tzinfo=None) >= datetime.now() : result["state"] = "報名中" break elif class_obj.reg_deadline.replace(tzinfo=None) < datetime.now(): result["state"] = "報名截止(報名額滿)" break else : try: class_date_obj = await Class_date.get(class_list_id=class_obj.id) if class_date_obj.registration_start.replace(tzinfo=None) <= datetime.now() and class_date_obj.registration_end.replace(tzinfo=None) >= datetime.now() : if class_date_obj.amount_left == 0: result["state"] = "報名截止(報名額滿)" break else: result["state"] = "報名中" break elif class_date_obj.registration_end.replace(tzinfo=None) < datetime.now(): result["state"] = "報名截止(人數已滿)" break except Exception as e: pass return {"msg": "success", "code": 200, "result": result} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_event") async def search_event( class_name_id: Optional[int] = None, event_id : Optional[int] = None, access_token: Optional[str] = None ): try: try : class_name_obj = await Class_name.get(id=class_name_id) except: return {"msg": "no this class id", "code": 500} special_class_list_name = class_name_obj.special_class_list_name Q_word = Q() if class_name_id: Q_word = Q_word | Q(name_id = class_name_id) elif event_id: Q_word = Q_word | Q(id = event_id) else : return {"msg": "please input class_name_id or event_id", "code": 500} user_id = None if access_token: user_id = await check_token(access_token) if user_id: Q_word = Q_word & Q(create_user_id = user_id) model_fields =[] state = {} if special_class_list_name==None: class_list = Class_list.filter(Q_word).all().order_by("-start_time") elif special_class_list_name=='one_day_class': class_list = One_day_class.filter(Q_word).all().order_by("-start_time") elif special_class_list_name=='outter_class_list': class_list = Outter_class_list.filter(Q_word).all().order_by("-start_time") else: class_list = Class_list.filter(Q_word).all().order_by("-start_time") class_list = await class_list.all() classes = [] for class_obj in class_list: class_name_obj = await Class_name.get(id=class_obj.name_id) class_name = class_name_obj.name try : class_data = class_obj.show_data() class_data["class_name"] = class_name state = await check_date_state(class_event_id=class_obj.id,special_class_list_name=special_class_list_name) class_data["state"] = state["result"]["state"] try: class_date_obj = await Class_date.get(class_list_id=class_obj.id) class_data["registration_start"] = class_date_obj.registration_start class_data["registration_end"] = class_date_obj.registration_end class_data["number_limit"] = class_date_obj.number_limit class_data["amount_left"] = class_date_obj.amount_left class_data["number_minimum"] = class_date_obj.number_minimum model_fields.append("registration_start","registration_end","number_limit","amount_left") except: print("沒有報名時間&人數限制的資料") except: class_data = { "msg" : "fail to get data" } classes.append(class_data) return {"msg": "success", "code": 200, "classes": classes} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_school") async def get_school( location_id : Optional[int] = None, keyword : Optional[str] = None, location_keyword : Optional[str] = None, page_num : Optional[int] = None, page_amount : Optional[int] = None, category : Optional[str] = None, access_token: Optional[str] = None, is_delete: Optional[int] = None, is_pass_proposal : Optional[int] = None, principal_user_email : Optional[str] = None ): try: await copy() my_log("info",__name__,"get_school") Q_word = Q() if location_id : Q_word = Q_word & Q(id = location_id) if keyword : Q_word = Q_word & Q(name__icontains=keyword) if location_keyword : Q_word = Q_word & Q(address__icontains=location_keyword) if is_pass_proposal is not None : Q_word = Q_word &Q(is_pass_proposal=is_pass_proposal) if principal_user_email : try: principal_user = await User.get(email=principal_user_email,is_avtive=1) except: my_log("info",__name__,f"no this teacher") Q_word = Q_word & Q(principal_user_id=principal_user.id) if is_delete is not None: Q_word = Q_word &Q(is_delete=is_delete) else : Q_word = Q_word &Q(is_delete=0) user_id = None if access_token: user_id = await check_token(access_token) if user_id: Q_word = Q_word &Q(create_user_id = user_id) Q_word2 = Q() if category: for tmp_word in category.split(",") : Q_word2 = Q_word2 | Q(category__icontains=tmp_word) Q_word = Q_word & Q_word2 school_list = Schools.filter(Q_word) # for school_obj in school_list_tmp: # class_list = await Class_name.filter(Q(school_id = school_obj.id) & Q_word) # if class_list == []: # school_list = school_list.exclude(id = school_obj.id) count = await school_list.all().count() if page_num and page_amount: school_list = school_list.offset((page_num-1)*page_amount).limit(page_amount) school_list = await school_list.all().order_by("-update_time") schools = [] for school_obj in school_list: try : school_data = school_obj.show_data() if school_data["Lng"] == (None or '') or school_data["Lat"] == (None or ''): try: # 這裡response採用json格式,可以自由選擇為json?或是xml? url = 'https://maps.googleapis.com/maps/api/geocode/json?' params = {'key':'AIzaSyDETbkTAmZNOECx2u4rmNudHvyoQTgDbc4', 'address':''} params['address'] = school_data["address"] #print(params['address']) res = requests.get(url, params=params) mes = json.loads(res.text) school_data["Lat"] = mes['results'][0]['geometry']['location']['lat'] school_data["Lng"] = mes['results'][0]['geometry']['location']['lng'] print(school_data["Lat"],school_data["Lng"]) school_obj.longitude = school_data["Lng"] school_obj.latitude = school_data["Lng"] await school_obj.save() except : my_log("error",__name__,f"not get lat and lng") schools.append(school_data) except: schools.append({"msg : data wrong"}) return {"msg": "success", "code": 200, "total_num" : count,"schools": schools} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_group_name") async def get_school_group( id : Optional[int] = 0 ): try: if id==0: school_group_list = await Group_name.all() print(school_group_list) else: school_group_list = [await Group_name.get(id=id)] print(school_group_list) school_groups = [] for school_obj in school_group_list: school_data = { "group_id": school_obj.id, "group_name": school_obj.group_name, "describe": school_obj.describe } school_groups.append(school_data) return {"msg": "success", "code": 200, "school_groups": school_groups} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/update_group_name") async def update_school_group( id : int = Form(default=0), group_name : str = Form(default=''), describe : str = Form(default=''), ): try: group_name_obj = await Group_name.get(id=id) if group_name.strip() != '': group_name_obj.group_name = group_name if describe.strip() != '': group_name_obj.describe = describe await group_name_obj.save() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_class_name") async def get_class_name( location_id : str = None , class_name_id : Optional[int] = None, group_id : Optional[int] = None, group_sort :Optional[str] = None, exclude_word: Optional[str] = None, category :Optional[str] = None, page_num : Optional[int] = None, page_amount : Optional[int] = None, recommend : Optional[int] = None, is_inner : Optional[int] = None, is_check : Optional[int] = None, access_token: Optional[str] = None, is_delete: Optional[int] = None, keyword: Optional[str] = None, has_user : Optional[int] = None, is_record_point : Optional[int] = None, location_keyword : Optional[str] = None, encode : Optional[str] = None, syllabus : Optional[str] = None ): try: await copy() class_name_list = Class_name.all() Q_word = Q() if location_keyword: school_list = await Schools.filter(Q(address__icontains=location_keyword)).all() schools = [] for school in school_list: schools.append(school.id) Q_word = Q_word & Q(school_id__in = schools) if group_id: Q_word = Q_word & Q(group_id = group_id) if group_sort: Q_word = Q_word & Q(group_sort = group_sort) if encode: Q_word = Q_word & Q(encode = encode) if syllabus: Q_word = Q_word & Q(syllabus__icontains = syllabus) if category: Q_word2 = Q() for tmp_word in category.split(",") : Q_word2 = Q_word2 | Q(category__icontains=tmp_word) Q_word = Q_word &Q_word2 if location_id : location_id_list = eval(location_id) if not isinstance(location_id_list, list): location_id_list = [location_id_list] Q_word = Q_word & Q(school_id__in = location_id_list) if class_name_id : Q_word = Q_word & Q(id = class_name_id) if recommend : Q_word = Q_word & Q(recommend = recommend) if is_inner!=None: Q_word = Q_word & Q(is_inner = is_inner) if is_check!=None: if is_check == 3 : Q_word = Q_word & Q(is_record_point =1) #已關閉 else: Q_word = Q_word & Q(is_check = is_check,is_record_point__in = [0,None]) if is_delete is not None: Q_word = Q_word & Q(is_delete = is_delete) elif is_delete == 2: pass else : Q_word = Q_word & Q(is_delete = 0) if keyword: Q_word = Q_word & ( Q(name__icontains=keyword)| Q(category__icontains=keyword)| Q(introduction__icontains=keyword)| Q(organizer__icontains=keyword)| Q(group_sort__icontains=keyword) | Q(syllabus__icontains=keyword)) if is_record_point : Q_word = Q_word & Q(is_record_point =is_record_point) user_id = None if access_token: user_id = await check_token(access_token) if user_id: Q_word = Q_word & Q(create_user_id = user_id) else : return {"msg": "no access", "code": 500} if has_user==1: Q_word = Q_word & Q(create_user_id__isnull=False) class_name_list = class_name_list.filter(Q_word).all() if exclude_word: for tmp_word in exclude_word.split(",") : # print(tmp_word) class_name_list = class_name_list.exclude(Q(category__icontains=tmp_word)|Q(group_sort__icontains=tmp_word)).all() count = await class_name_list.all().count() if page_num and page_amount: class_name_list = class_name_list.offset((page_num-1)*page_amount).limit(page_amount) class_name_list = await class_name_list.all().order_by("-id") classes_name = [] state = {} special_class_list_name = None for class_name_obj in class_name_list: try: school_obj = await Schools.get(id=class_name_obj.school_id) school_obj_id = school_obj.id except: school_obj_id = 0 try: special_class_list_name = class_name_obj.special_class_list_name if special_class_list_name: state = await check_date_state(class_name_id=class_name_obj.id,special_class_list_name=special_class_list_name) else: state = await check_date_state(class_name_id=class_name_obj.id) class_data = class_name_obj.show_data() if school_obj_id: for key, item in school_obj.show_data().items(): class_data[key] = item class_data["state"]=state["result"]["state"] if class_data["state"] == "課程已關閉" : class_data["is_check"] = 3 except Exception as e: class_data = { "msg" : str(e) } classes_name.append(class_data) return {"msg": "success", "code": 200,"total_num" : count,"classes": classes_name} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_session") async def get_session( event_id : Optional[int] = None ): try: if not event_id: return {"msg": "please input event_id", "code": 500} class_session_list = await Class_detail.filter(class_list_id=event_id).all().order_by("start_time") classe_sessions = [] week_day = [0] * 7 start_week_time= [None] * 7 end_week_time= [None] * 7 for class_session_obj in class_session_list: class_session_data = { "session_id": class_session_obj.id, "class_event_id": class_session_obj.class_list_id, "start_time": class_session_obj.start_time, "end_time": class_session_obj.end_time, "sessions": class_session_obj.sessions, "content": class_session_obj.content } week_day[class_session_obj.start_time.weekday()] = 1 start_week_time[class_session_obj.start_time.weekday()]=class_session_obj.start_time.time().strftime("%H:%M") end_week_time[class_session_obj.end_time.weekday()]=class_session_obj.end_time.time().strftime("%H:%M") classe_sessions.append(class_session_data) if len(class_session_list)== 1 : data_form = "one day class" else : data_form = { "week_day" : week_day, "start_week_time" : start_week_time, "end_week_time" :end_week_time } return {"msg": "success", "code": 200, "data_form":data_form,"classe_sessions": classe_sessions} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/search_class_like") async def search_class_like( keyword: str, location_id : Optional[int] = None , group_id : Optional[int] = None, group_sort :Optional[str] = None, exclude_word: Optional[str] = None, category :Optional[str] = None, page_num : Optional[int] = None, page_amount : Optional[int] = None, recommend : Optional[int] = None, is_inner : Optional[int] = None, is_check : Optional[int] = None ): try: Q_word = Q() if group_id: Q_word = Q_word & Q(group_id = group_id) if group_sort: Q_word = Q_word & Q(group_sort = group_sort) if category: for tmp_word in category.split(",") : Q_word = Q_word | Q(category__icontains=tmp_word) if location_id : Q_word = Q_word & Q(school_id = location_id) if recommend : Q_word = Q_word & Q(recommend = recommend) if is_inner!=None: Q_word = Q_word & Q(is_inner = is_inner) if is_check!=None: Q_word = Q_word & Q(is_check = is_check) class_name_id = [] for class_list in await Class_list.filter(Q(lecturer__icontains=keyword)): class_name_id.append(class_list.name_id) for class_list in await One_day_class.filter(Q(teacher__icontains=keyword)): class_name_id.append(class_list.name_id) if keyword : Q_word = Q_word & ( Q(name__icontains=keyword)| Q(category__icontains=keyword)| Q(introduction__icontains=keyword)| Q(organizer__icontains=keyword)| Q(group_sort__icontains=keyword) | Q(id__in=class_name_id)| Q(syllabus__icontains=keyword)) class_name_list = Class_name.filter(Q_word).all() count = await class_name_list.all().count() if page_num and page_amount: class_name_list = class_name_list.offset((page_num-1)*page_amount).limit(page_amount) if exclude_word: for tmp_word in exclude_word.split(",") : # print(tmp_word) class_name_list = class_name_list.exclude(Q(category__icontains=tmp_word)|Q(group_sort__icontains=tmp_word)).all() class_name_list = await class_name_list.all().order_by("-id") classes_name = [] state = {} special_class_list_name = None for class_name_obj in class_name_list: try: school_obj = await Schools.get(id=class_name_obj.school_id) school_obj_id = school_obj.id except: school_obj_id = 0 try: special_class_list_name = class_name_obj.special_class_list_name if special_class_list_name: state = await check_date_state(class_name_id=class_name_obj.id,special_class_list_name=special_class_list_name) else: state = await check_date_state(class_name_id=class_name_obj.id) class_data = class_name_obj.show_data() if school_obj_id: for key, item in school_obj.show_data().items(): class_data[key] = item class_data["state"]=state["result"]["state"] except Exception as e: class_data = { "msg" : str(e) } classes_name.append(class_data) return {"msg": "success", "code": 200, "total_num" : count,"classes": classes_name} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/add_favorite_class") async def add_favorite_class( class_name_id: int, user_id = Depends(check_token), time_stemp: datetime = datetime.now() ): try: if not user_id: return {"msg": "no access", "code": 500} new_favorite_class = await Favorite_course.get_or_create( class_name_id=class_name_id, user_id=user_id, defaults={'time_stemp': time_stemp} ) return {"msg": "success", "code": 200,"is exist": not new_favorite_class[1],"id":new_favorite_class[0].id} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_favorite_class") async def get_favorite_class( user_id = Depends(check_token), no_details : Optional[int] = None ): try: class_list = await Favorite_course.filter(user_id = user_id).all() favorite_courses = [] for class_obj in class_list: if no_details: class_data = { "id": class_obj.id, "user_id": class_obj.user_id, "class_name_id" : class_obj.class_name_id, "time_stemp":class_obj.time_stemp } favorite_courses.append(class_data) else: class_data = { "id": class_obj.id, "user_id": class_obj.user_id, "class_name_id" : class_obj.class_name_id, "time_stemp":class_obj.time_stemp } result = await get_class_name(class_name_id = class_obj.class_name_id) try: class_detail = result["classes"][0] for key,item in class_detail.items(): class_data[key] = item favorite_courses.append(class_data) except: class_data["msg"] = "this class doesn't exit" favorite_courses.append(class_data) return {"msg": "success", "code": 200, "favorite_courses": favorite_courses} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/delete_favorite_class") async def delete_favorite_class( class_name_id: int, user_id = Depends(check_token) ): try: await Favorite_course.filter(class_name_id=class_name_id,user_id=user_id).delete() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/insert_online_course") async def insert_online_course( title : str = Form(default=''), category : str = Form(default=''), create_time :str = Form(default=datetime.now()), content : str = Form(default=''), video_url :str = Form(default='') ): try: new_online_course = await Online_course.create( title=title, create_time=create_time, category=category, content=content, video_url=video_url, group_id = 8 ) return {"msg": "success", "code": 200, "online_course_obj": new_online_course.id} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/update_online_course") async def update_online_course( id : int = Form(default=0), title : str = Form(default=''), category : str = Form(default=''), create_time :str = Form(default=datetime.now()), content : str = Form(default=''), video_url :str = Form(default='') ): try: online_course_obj = await Online_course.get(id=id) if title.strip() != '': online_course_obj.title = title if category.strip() != '': online_course_obj.category = category if create_time.strip() != '': online_course_obj.create_time = create_time if content.strip() != '': online_course_obj.content = content if video_url.strip() != '': online_course_obj.video_url = video_url await online_course_obj.save() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_online_courese") async def get_online_courese( online_courese_id : Optional[int] = None, category:Optional[str] = None, group_id : Optional[int] = None, page_num : Optional[int] = None, page_amount : Optional[int] = None, org :Optional[str] = None, no_org:Optional[str] = None, key_word:Optional[str] = None, ): try: online_courese_list = Online_course.all() Q_word = Q() if group_id: Q_word = Q_word & Q(group_id = group_id) if online_courese_id : Q_word = Q_word & Q(id = online_courese_id) if org : Q_word = Q_word & Q(org = org) if no_org: no_org_condition = Q(org__isnull=True) | ~Q(org=no_org) Q_word = Q_word & no_org_condition if category: category_conditions = Q() for tmp_word in category.split(",") : category_conditions |= Q(category__icontains=tmp_word) Q_word = Q_word & category_conditions #Q_word = Q_word | Q(category__icontains=tmp_word) if key_word: key_word_condition = Q(title__icontains=key_word) | Q(category__icontains=key_word) | Q(content__icontains=key_word) Q_word = Q_word & key_word_condition count = await online_courese_list.all().filter(Q_word).count() if page_num and page_amount: online_courese_list = online_courese_list.offset((page_num-1)*page_amount).limit(page_amount) online_courese_list = await online_courese_list.all().filter(Q_word).order_by("id") online_coureses = [] for online_coures_obj in online_courese_list: online_coures_data = online_coures_obj.show_data() online_coureses.append(online_coures_data) return {"msg": "success", "code": 200, "total_num" : count,"online_coures": online_coureses} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/delete_online_course") async def delete_online_course( online_course_id : int ): try: await Online_course.filter(id=online_course_id).delete() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_group_classes_and_articles") async def get_group_classes_and_articles( group_id : int, page_num : Optional[int] = None, page_amount : Optional[int] = None ): try: class_name_list = Class_name.filter(group_id = group_id,is_delete=0).all().order_by("-id") article_list = Article_list.filter(group_id = group_id,is_del=0).all().order_by("-id") class_count = await class_name_list.all().count() article_count = await article_list.all().count() if page_num and page_amount: class_name_list = class_name_list.offset((page_num-1)*page_amount).limit(page_amount) article_list = article_list.offset((page_num-1)*page_amount).limit(page_amount) class_name_list = await class_name_list.all() article_list = await article_list.all() article_objs = [] for article_obj in article_list: try : article_tmp = { "article_id": article_obj.id, "title": article_obj.title, "school_id" :article_obj.school_id, "group_sort" :article_obj.group_sort, "group_id" :article_obj.group_id, "category": article_obj.category, "create_time" : article_obj.create_time, "click_time" : article_obj.click_time, "depiction" : article_obj.depiction, "content" : article_obj.content, "files" : article_obj.files, "vedio_url" : article_obj.vedio_url, "tags" : article_obj.tags, "cover_img": article_obj.cover_img } except: article_tmp = { "msg" : "fail to get data" } article_objs.append(article_tmp) classes_name = [] for class_name_obj in class_name_list: try: school_obj = await Schools.get(id=class_name_obj.school_id) school_obj_id = school_obj.id except: school_obj_id = 0 try: special_class_list_name = class_name_obj.special_class_list_name if special_class_list_name: state = await check_date_state(class_name_id=class_name_obj.id,special_class_list_name=special_class_list_name) else: state = await check_date_state(class_name_id=class_name_obj.id) class_data = class_name_obj.show_data() if school_obj_id: for key, item in school_obj.show_data().items(): class_data[key] = item class_data["state"]=state["result"]["state"] except Exception as e: class_data = { "msg" : str(e) } classes_name.append(class_data) return {"msg": "success", "code": 200,"class_num" : class_count,"classes": classes_name,"article_num":article_count,"articles": article_objs} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/add_attend_record_by_event") async def add_attend_record( class_id : int = Form(default=0), ): try: if not class_id: return {"msg": "Please input right ","code":500} try: class_detail_obj = await Class_detail.filter(class_list_id = class_id).all() if not isinstance(class_detail_obj, list): class_detail_obj = [class_detail_obj] registration_obj = await Registration.filter(event_id = class_id,is_del = 0,).all() if not isinstance(registration_obj, list): registration_obj = [registration_obj] if not class_detail_obj: return {"msg": "no this class_detail_id session","code":500} except: return {"msg": "get class_detail_id error","code":500} for obj in class_detail_obj: class_event = await Class_list.get_or_none(id = class_id) if class_event is None: return {"msg": "no this class","code":500} for obj1 in registration_obj: new_record = await Attend_record.get_or_create( class_detail_id = obj.id, user_id = obj1.user_id, defaults={'class_name_id':class_event.name_id,'class_event_id':class_id,'is_attend': 0} ) return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/add_attend_record") async def add_attend_record( class_detail_id : int = Form(default=0), user_id : int = Form(default=0), is_attend : int = Form(default=0) ): try: if not class_detail_id or not user_id: return {"msg": "Please input right ","code":500} try: class_detail_obj = await Class_detail.get_or_none(id = class_detail_id) class_list = await Class_list.get_or_none(id = class_detail_obj.class_list_id) if not class_detail_obj or not class_list: return {"msg": "no this class_detail_id","code":500} except: return {"msg": "get class_detail_id error","code":500} msg = "" new_record,created = await Attend_record.get_or_create( class_detail_id = class_detail_id, user_id = user_id, defaults = { "class_name_id" : class_list.name_id, "class_list_id" : class_detail_obj.class_list_id, "is_attend" : is_attend } ) if not created: try: new_record.is_attend = is_attend await new_record.save() msg = "update success" except: msg = "update fail" else: msg = "created success" return {"msg": msg, "code": 200,"new_record_id":new_record.id} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/close_class") async def close_class(user_id:int = Depends(check_token),class_name_id : int = None,cancel:int=None): try: if user_id : user = await User.get(id=user_id) class_name = await Class_name.get_or_none(id = class_name_id) if not class_name : return {"msg": "no this class", "code": 500} elif class_name.is_record_point == 1: if cancel==1: user_attend_record = await Attend_record.filter(class_name_id=class_name_id,is_attend = 1) for tmp in user_attend_record: point = await User_point.get_or_none(user_id = tmp.user_id) class_detail = await Class_detail.get_or_none(id = tmp.class_detail_id) point.hours -= float(class_detail.hour) await point.save() class_name.is_record_point = 0 await class_name.save() return {"msg": "class closed has cancel", "code": 500} else: return {"msg": "class has closed", "code": 500} else : if class_name.create_user_id: if class_name.create_user_id != user_id: print(class_name.create_user_id,user_id) return {"msg": "no access", "code": 200} user_attend_record = await Attend_record.filter(class_name_id=class_name_id,is_attend = 1) for tmp in user_attend_record: point,created = await User_point.get_or_create( user_id = tmp.user_id, defaults = { "hours" : 0.0, "points" : 0.0 } ) class_detail = await Class_detail.get_or_none(id = tmp.class_detail_id) point.hours += float(class_detail.hour) await point.save() result = await count_point(tmp.user_id) class_name.is_record_point = 1 await class_name.save() return {"msg":"success close class and record point", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} async def count_point(user_id) : point = await User_point.get_or_none(user_id = user_id) records = await Point_exchange_record.filter(user_id = user_id) if point: point.points = int(point.hours/3) for record in records: point.points -= record.point_exchange await point.save() return {"msg":"success", "code": 200} else: return {"msg":"沒有點數資料", "code": 500} @classes.post("/delete_attend_record") async def delete_attend_record( id : int = 0 ): try : record = await Attend_record.get(id=id) await Attend_record.filter(id=id).delete() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/update_attend_record") async def update_attend_record( id : int = 0, class_detail_id : Optional[int] = None, user_id : Optional[int] = None, is_attend : Optional[int] = None ): if not id : return {"msg":"please input id"} try : tmp = await Attend_record.get(id=id) if class_detail_id!=None: tmp.class_detail_id = class_detail_id if user_id!=None: tmp.user_id = user_id if is_attend!=None: tmp.is_attend = is_attend await tmp.save() return {"msg": "success", "code": 200} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_attend_record") async def get_attend_record( class_event_id : Optional[int] = None, class_detail_id : Optional[int] = None, user_id : Optional[int] = None, is_attend : Optional[int] = None, page_num : Optional[int] = None, page_amount : Optional[int] = None ): try: attend_record_list = Attend_record.all() if class_event_id: id_list = [] class_detail_list = await Class_detail.filter(class_list_id = class_event_id) for tmp in class_detail_list : id_list.append(tmp.id) attend_record_list = Attend_record.filter(class_detail_id__in=id_list) if class_detail_id: attend_record_list = attend_record_list.filter(class_detail_id = class_detail_id) if user_id : attend_record_list = attend_record_list.filter(user_id = user_id) if is_attend : attend_record_list = attend_record_list.filter(is_attend = is_attend) count = await attend_record_list.all().count() if page_num and page_amount: attend_record_list = attend_record_list.offset((page_num-1)*page_amount).limit(page_amount) attend_record_list = await attend_record_list.all() attend_records = [] for obj in attend_record_list: class_detail_list = await Class_detail.get_or_none(id = obj.class_detail_id) if class_detail_list == None: return {"msg": f"class detail number error id={obj.class_detail_id}", "code": 500} #registration_list = await Registration.filter(event_id = class_detail_list[0].class_list_id,user_id =obj.user_id) user_list = await User.filter(id = obj.user_id) user_information_list = await User_information.filter(user_id = obj.user_id) class_list = await Class_list.get_or_none(id = obj.class_event_id) if class_list != None : class_name = await Class_name.get_or_none(id = class_list.name_id) #if len(registration_list)==0: # continue data = { "id" :obj.id, "class_name_id" : obj.class_name_id, "class_event_id":class_detail_list.class_list_id, "class_detail_id" :obj.class_detail_id, "user_id" :obj.user_id, "is_attend" :obj.is_attend, #"reg_confirm" : registration_list[0].reg_confirm, #"payment_status": registration_list[0].payment_status, #"five_digits": registration_list[0].five_digits, "real_name": user_information_list[0].name, "phone": user_information_list[0].phone, "email": user_list[0].email } if class_list and class_name : data["class_name"] = class_name.name data["start_time"] = class_detail_list.start_time data["end_time"]= class_detail_list.end_time attend_records.append(data) return {"msg": "success", "code": 200,"total_num" : count,"attend_record_list":attend_records} except Exception as e: return {"msg": str(e), "code": 500} import pymysql @classes.get("/copy") async def copy( from_id : Optional[int] = None ): try: connection = pymysql.connect( host='db.ptt.cx', user='choozmo', password='pAssw0rd', database='test_copy' ) class_list_ip = [] with connection.cursor() as cursor: sql = "SELECT * FROM class_name" cursor.execute(sql) connection.commit() rows = cursor.fetchall() encode = "HOP" now = datetime.now() encode += now.strftime("%Y%m%d") class_id_list = [] for row in rows: # print(row) class_name,created = await Class_name.update_or_create( name = row[1], defaults = { "school_id":1, "category" : row[3], "introduction" : row[4], "organizer" : row[5], "cover_img" : row[6], "group_id" :9, "group_sort" : "希望工程", "special_class_list_name" :None, "recommend" : 0, "is_inner" :1, "is_check" :1, "create_user_id" :None, "create_time" : datetime.now(), "is_delete" : row[10], "is_record_point" :0, "encode" :encode } ) #print(class_name.id) sql = f"SELECT * FROM schools where id = {row[2]}" cursor.execute(sql) connection.commit() school_row = cursor.fetchone() # print(school_row) try: school,school_created = await Schools.update_or_create( name = school_row[1], defaults = { "longitude" : school_row[2], "latitude" : school_row[3], "address" :school_row[4], "update_time" :school_row[5], "school_introduction":None, "email":None, "phone":None, "school_create_user_id" :None, "teachers" : None, "is_delete" : 0, "is_pass_proposal" : 1 } ) class_name.school_id = school.id await class_name.save() #print("step 1 complete") sql = f"SELECT * FROM class_list where name_id = {row[0]}" cursor.execute(sql) connection.commit() list_row = cursor.fetchone() # print(list_row) event,event_created = await Class_list.update_or_create( name_id = class_name.id, defaults = { "event": list_row[2], "start_time": list_row[3], "end_time": list_row[4], "location": list_row[10], "lecturer": list_row[9], "contact": list_row[8], "content": list_row[11], "URL": list_row[12], "people": None, "fee_method": list_row[19], "registration_way": list_row[15], "remark": list_row[16], "ATM_address":list_row[17], "fee_payment" :list_row[18], "create_user_id" :list_row[19], "event_create_time" : datetime.now(), "files" : list_row[20] } ) #print("step 2 complete") date,date_create = await Class_date.update_or_create( class_list_id = event.id, defaults = { "registration_start": list_row[5], "registration_end": list_row[6], "number_limit": list_row[13], "amount_left": 0 } ) #print("step 3 complete") class_id_list.append(class_name.id) except : await Class_name.filter(id = class_name.id).delete() my_log("error",__name__,f"{row[0]}copy error") class_delete_list = await Class_name.filter(group_id = 9,group_sort = "希望工程").exclude(id__in = class_id_list).delete() for class_ in class_delete_list : print("class_delete:",class_.id) cursor.close() connection.close() return {"msg": "success", "code": 500} except Exception as e: return {"msg": str(e), "code": 500} @classes.post("/insert_proposal") async def insert_class_name( class_name: str = Form(default=None), school_id : int = Form(default=None), category : str = Form(default=None), introduction: str = Form(default=None), organizer: str = Form(default=None), cover_img_file : UploadFile = File(default=None), fee_method: str = Form(default=None), number_limit: int = Form(default=None), number_minimum : int = Form(default=None), people : str = Form(default=None), files = Depends(create_upload_files) ): try: cover_img = '' if cover_img_file != None: contents = await cover_img_file.read() #save the file with open(f"{IMAGEDIR}{cover_img_file.filename}", "wb") as f: f.write(contents) cover_img = f"{IMAGEDIR_short}{cover_img_file.filename}" if files: if files.__contains__("msg"): return {"msg": files["msg"], "code": 500} school = await Schools.get_or_none(id = school_id) if school is None : return {"msg": f"no this school id {school_id}", "code": 500} new_proposal,created = await Proposal.get_or_create( school_id =school_id, defaults = { "class_name":class_name, "category":category, "introduction":introduction, "organizer":organizer, "cover_img":cover_img, "fee_method":fee_method, "number_limit":number_limit, "number_minimum":number_minimum, "people":people, "files":files, "create_time":datetime.now() } ) if not created: if class_name: new_proposal.class_name = class_name if category: new_proposal.category = category if introduction: new_proposal.introduction = introduction if cover_img_file: new_proposal.cover_img = cover_img if fee_method: new_proposal.fee_method = fee_method if number_limit: new_proposal.number_limit = number_limit if number_minimum: new_proposal.number_minimum = number_minimum if people: new_proposal.people = people if files != "{}": new_proposal.files = files await new_proposal.save() else: user_id = school.create_user_id if user_id: try: try : user = await User.get(id = user_id) user_info = await User_information.get(user_id = user_id) except: return {"msg": "請去建立工藝家履歷", "code": 500} email = user.email subject = "課程提案通知" # message = f"親愛的工藝家{user_info.name}您好,
\ # 感謝您埋下工藝種子!
\ # 您的提案將由管理員進行審核中,將於3-7個工作天內e-mail通知您審核結果。
\ # 若7天後尚未收到課程審核通知,請將後台畫面、聯絡資訊等截圖寄送至客服信箱:
\ # craftology@ntcri.gov.tw,以便客服為您查詢。
\ # 註:此封信件為系統自動發送,請勿回信,謝謝。" with open("/var/www/ntcri/assets/edm/porposal/index.html", 'r', encoding='utf-8') as html_file: html_template = html_file.read() message = html_template.replace("{username}",user_info.name) send_email(email,"",subject,message) except: print("have no email") return {"msg": "success", "code": 200, "new_proposal_id": new_proposal.id} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_proposal") async def insert_class_name( id : Optional[int] = None, school_id : Optional[int] = None ): try: proposal_list = Proposal.all() Q_word = Q() if id : Q_word = Q(id=id) if school_id : Q_word = Q_word & Q(school_id=school_id) proposal_list = await proposal_list.filter(Q_word) result = [] for proposal in proposal_list: school = await Schools.get_or_none(id=proposal.school_id) if school is None : return {"msg": f"no this school,id = {proposal.school_id}", "code": 500} info = proposal.show_data() info["school"]=school.show_data() result.append(info) return {"msg": "success", "code": 200, "proposal_list": result} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_favorite_course_count") async def insert_class_name( class_name_id_list : str = None ): try: result = [] if class_name_id_list : class_name_id = eval(class_name_id_list) distinct_class_name_ids = list(map(int,class_name_id)) for entry in distinct_class_name_ids: class_num = await Favorite_course.filter(class_name_id = entry).count() result.append({"class_name_id":entry,"count":class_num}) else: # 使用 Tortoise ORM 的 count 方法進行統計 distinct_class_name_ids = await Favorite_course.all().values_list('class_name_id') # print(distinct_class_name_ids) for entry in distinct_class_name_ids: class_num = await Favorite_course.filter(class_name_id = entry[0]).count() result.append({"class_name_id":entry[0],"count":class_num}) return {"msg": "success", "code": 200, "result": result} except Exception as e: return {"msg": str(e), "code": 500} @classes.get("/get_school_city_count") async def insert_class_name( school_city_list : str = None ): try: result = [] distinct_class_name_ids = [] if school_city_list : distinct_class_name_ids = eval(school_city_list) else: # 使用 Tortoise ORM 的 count 方法進行統計 distinct_class_name_ids = taiwan_city # print(distinct_class_name_ids) for entry in distinct_class_name_ids: count = 0 if "," in entry: for tmp in entry.split(","): count += await Schools.filter(address__icontains=tmp).count() else: count = await Schools.filter(address__icontains=entry).count() result.append({"city":entry,"count":count}) return {"msg": "success", "code": 200, "result": result} except Exception as e: return {"msg": str(e), "code": 500}