from ast import Or
from email.policy import default
from sre_constants import ANY
from fastapi import APIRouter, Form, Depends, HTTPException, File, UploadFile,Request
from typing import List,Optional
from fastapi.responses import FileResponse
from random import randint
from fastapi.security import OAuth2PasswordRequestForm
from app.models.models import User,User_information,Favorite_course,Article_list,Class_date,Attend_record,One_day_class,Outter_class_list,Proposal
from app.models.models import Class_list,Schools,Class_detail,Class_name,Registration,Group_name,Online_course,User_resume,User_point,Point_exchange_record
from app.api import deps
from sqlalchemy.orm import Session
from typing import Any, Dict
import secrets
from fastapi_login.exceptions import InvalidCredentialsException
from fastapi_login import LoginManager
from datetime import timedelta,datetime
from app.config import settings
from pathlib import Path
from jose import jwt
from tortoise.queryset import Q
from fastapi.responses import HTMLResponse
import requests
import json
from itertools import chain
from tortoise import fields
import random
from app.log import my_log
import rpyc
import emails
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
import re
classes = APIRouter()
# logging.basicConfig(
# filename='app-basic.log',
# level=logging.INFO,
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
# datefmt='%Y-%m-%d %H:%M:%S'
# )
# logger = logging.getLogger(__name__)
def send_email(
email_to: str,
token: str,
subject_template: str = "",
html_template: str = "",
environment: Dict[str, Any] = {},
):
# message = emails.Message(
# subject=JinjaTemplate(subject_template),
# html=JinjaTemplate(html_template),
# mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
# )
try:
subject=subject_template
html=html_template
mailobj={}
mailobj['toaddr']=email_to
mailobj['title']=subject
mailobj['totext']=html
conn = rpyc.connect("192.168.192.80", 12345)
conn.root.mailto(mailobj)
return {"message":f"send email"}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/test_email")
async def test_email(
email_to: str,
subject_template: str = "",
html_template: str = ""
):
email = email_to
subject = subject_template
message = html_template
result = send_email(email,"",subject,message)
return result
IMAGEDIR = "/var/www/ntcri/assets/"
IMAGEDIR_short = "assets/"
encode_set = {
"樹藝" : "ARB",
"漆藝" : "LAC",
"藍染" : "AIZ",
"蠟雕" : "WAX",
"竹工藝籃" :"BAM",
"金工/飾品" :"MWT",
"蠟燭/香氛/調香" : "CAN" ,
"編織/羊毛氈/縫紉" : "KNI" ,
"植栽/花藝" : "PLA" ,
"陶藝/玻璃" : "CER" ,
"皮件/皮革" : "LEA" ,
"插畫/繪畫/寫字" : "Ill",
"木工/竹藝" :"WOO",
"安心活動" : "HER"
}
taiwan_city = ["基隆","台北,臺北","新北","宜蘭","花蓮","台東,臺東","桃園","新竹","苗栗","台中,臺中","彰化","南投","雲林","嘉義","台南,臺南","高雄","屏東"]
async def create_upload_files(files:Optional[List[UploadFile]] = File(None)):
files_url = {}
if files :
file_num = 1
print(file_num)
for file in files:
contents = await file.read()
words = file.filename.split('.')
# if words[len(words)-1] != ("png" or "jpg" or "pdf" or "jpeg"):
# return {"msg":"wrong filetype"}
#save the file
with open(f"{IMAGEDIR}{file.filename}", "wb") as f:
f.write(contents)
file_name = "file" + str(file_num)
print(file_name)
files_url[file_name]=f"{IMAGEDIR_short}{file.filename}"
file_num=file_num+1
return files_url
async def check_token(access_token: str):
result = await User.get_or_none(token=access_token)
if not result:
print("no access")
return None
user_id = result.id
return user_id
async def update_location_time(location_id: int):
if location_id:
school = await Schools.get(id=location_id)
school.update_time = datetime.now()
await school.save()
return {"msg": "success", "code": 200}
@classes.post("/change_class_img")
async def change_class_img(
category: str ,
files:Optional[List[UploadFile]] = File(None)
):
try:
files_url = {}
file_num = 0
for file in files:
contents = await file.read()
#save the file
with open(f"{IMAGEDIR}{file.filename}", "wb") as f:
f.write(contents)
file_name = "file" + str(file_num)
files_url[file_name]=f"{IMAGEDIR_short}{file.filename}"
file_num=file_num+1
class_name_list = await Class_name.filter(Q(category__icontains=category)& Q(is_inner = 0)).all()
for class_name_obj in class_name_list:
x = random.randrange(4)
file_name = "file" + str(x)
class_name_obj.cover_img = files_url[file_name]
await class_name_obj.save()
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
class resume():
imgs = UploadFile
teacher_name : str
work_type : str
experience : str
expertise : str
license : str
media : str
introduction: str
# @classes.post("/check_city")
# async def check_city(
# Lng: str = None,
# Lat: str = None,
# ):
# try:
# country : Any
# city : Any
# if Lng and Lat:
# url = 'https://maps.googleapis.com/maps/api/geocode/json?'
# params = {'key':'AIzaSyDETbkTAmZNOECx2u4rmNudHvyoQTgDbc4', 'language':'zh-TW','latlng':''}
# params['latlng'] = Lat + ',' + Lng
# #print(params['address'])
# res = requests.get(url, params=params)
# mes = json.loads(res.text)
# list = mes['results'][0]['address_components']
# for tmp in list :
# if "administrative_area_level_1" in tmp['types']:
# city = tmp["long_name"]
# if "administrative_area_level_2" in tmp['types']:
# country = tmp["long_name"]
# else :
# return {"msg": "please input Lng and Lat", "code": 500}
# return {"msg": "success", "code": 200, "result": city+','+country}
# except Exception as e:
# return {"msg": str(e), "code": 500}
@classes.post("/insert_school")
async def insert_school(
request: Request,
location_name: str = Form(default=''),
Lng: str = Form(default=None),
Lat: str = Form(default=None),
address : str = Form(default=''),
school_introduction : str = Form(default=''),
email : str = Form(default=''),
phone : str = Form(default=''),
access_token:str = Form(default=None),
teachers_list :str = Form(default=None),
is_pass_proposal : int = Form(default=0),
principal_user_email : str = Form(default=None),
):
try:
user_id = None
if access_token:
user_id = await check_token(access_token)
if user_id == None:
return {"msg": "沒有權限", "code": 500}
# teachers = []
# for teacher in teachers_list:
# try:
# teacher_info = await User.get(email=teacher,is_avtive=1)
# teachers.append(teacher_info.id)
# except:
# my_log("info",__name__,f"no this teacher")
try:
# 這裡response採用json格式,可以自由選擇為json?或是xml?
if Lat == None or Lng == None:
url = 'https://maps.googleapis.com/maps/api/geocode/json?'
params = {'key':'AIzaSyDETbkTAmZNOECx2u4rmNudHvyoQTgDbc4', 'address':''}
params['address'] = address
#print(params['address'])
res = requests.get(url, params=params)
mes = json.loads(res.text)
Lat = mes['results'][0]['geometry']['location']['lat']
Lng = mes['results'][0]['geometry']['location']['lng']
print(Lat,Lng)
except :
my_log("error",__name__,f"not get lat and lng")
principal_user_id =None
if principal_user_email:
try:
principal_user = await User.get(email=principal_user_email,is_avtive=1)
principal_user_id = principal_user.id
except:
my_log("info",__name__,f"no this teacher")
new_school = await Schools.create(
name=location_name,
longitude=Lng,
latitude=Lat,
address = address,
update_time = datetime.now(),
introduction = school_introduction ,
email= email,
phone = phone,
create_user_id = user_id,
is_delete = 0,
is_pass_proposal = is_pass_proposal,
principal_user_id = principal_user_id,
teachers = teachers_list
)
client_ip = request.client.host
my_log("info",__name__,f"Client IP: {client_ip} - new school:{new_school.id}")
return {"msg": "success", "code": 200, "location_id": new_school.id}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/insert_class_name")
async def insert_class_name(
name: str = Form(default=''),
location_id: int = Form(default=1),
category: str = Form(default=''),
introduction: str = Form(default=''),
organizer: str = Form(default=''),
cover_img_file:UploadFile = File(default=''),
group_id : int = Form(default=1),
group_sort :str = Form(default=''),
recommend: int = Form(default=0),
special_class_list_name : str = Form(default=None),
is_inner : int = Form(default=0),
is_check : int = Form(default=0),
access_token:str = Form(default=None),
teachers_resume : str = Form(default=None),
syllabus : str = Form(default=None)
):
try:
cover_img = ''
if cover_img_file != '':
contents = await cover_img_file.read()
#save the file
with open(f"{IMAGEDIR}{cover_img_file.filename}", "wb") as f:
f.write(contents)
cover_img = f"{IMAGEDIR_short}{cover_img_file.filename}"
user_id = None
if access_token:
user_id = await check_token(access_token)
if user_id == None:
return {"msg": "沒有權限", "code": 500}
# 課程編碼
encode = ""
if encode_set.__contains__(category) :
encode = encode_set[category]
else:
encode = "OTH"
now = datetime.now()
encode += now.strftime("%Y%m%d")
new_class_name = await Class_name.create(
name=name,
school_id=location_id,
category=category,
introduction=introduction,
organizer=organizer,
cover_img=cover_img,
group_id=group_id,
group_sort=group_sort,
recommend = recommend,
special_class_list_name = special_class_list_name,
is_inner = is_inner,
is_check = is_check,
create_user_id = user_id,
create_time = datetime.now(),
is_delete = 0,
is_record_point =0,
teachers_resume = teachers_resume,
syllabus = syllabus
)
encode += str(new_class_name.id)
new_class_name.encode = encode
await new_class_name.save()
update_location_time(location_id= location_id)
return {"msg": "success", "code": 200, "new_class_name_id": new_class_name.id}
except Exception as e:
return {"msg": str(e), "code": 500}
# @classes.get("/insert_encode")
# async def insert_encode():
# try:
# class_name = await Class_name.all()
# for classes in class_name:
# encode = ""
# if encode_set.__contains__(classes.category) :
# encode = encode_set[classes.category]
# else:
# encode = "OTH"
# now = classes.create_time
# encode += now.strftime("%Y%m%d")
# encode += str(classes.id)
# classes.encode = encode
# await classes.save()
# except Exception as e:
# return {"msg": str(e), "code": 500}
# 創建場次
@classes.post("/insert_event")
async def insert_event(
request : Request,
name_id: int = Form(default=0),
event: str = Form(default=''),
start_time: datetime = Form(default=None),
end_time: datetime = Form(default=None),
contact: str = Form(default=''),
lecturer: str = Form(default=''),
location: str = Form(default=''),
content: str = Form(default=''),
URL: str = Form(default=''),
people : str = Form(default=''),
fee_method: str = Form(default=''),
registration_way: str = Form(default=''),
registration_start: datetime = Form(default=None),
registration_end: datetime = Form(default=None),
number_limit: int = Form(default=None),
remark : str = Form(default=''),
ATM_address: str = Form(default=''),
access_token:str = Form(default=None),
fee_payment: str = Form(default=''),
number_minimum : int = Form(default=None),
files_url = Depends(create_upload_files)
):
try:
user_id = None
if access_token:
user_id = await check_token(access_token)
if user_id == None:
return {"msg": "沒有權限", "code": 500}
# 檢查是否有該課程
class_name_list = await Class_name.filter(id=name_id).all()
if class_name_list == []:
return {"msg": "沒有此課程", "code": 500}
if files_url:
if files_url.__contains__("msg"):
return {"msg": files_url["msg"], "code": 500}
# 判斷時間合法
if start_time and end_time :
if start_time >= end_time :
return {"msg": "時間輸入錯誤", "code": 500}
new_class = await Class_list.create(
name_id=name_id,
event =event,
start_time=start_time,
end_time=end_time,
contact=contact,
lecturer=lecturer,
location=location,
content=content,
URL=URL,
people=people,
fee_method=fee_method,
registration_way=registration_way,
remark=remark,
ATM_address=ATM_address,
create_user_id = user_id,
fee_payment = fee_payment,
event_create_time = datetime.now(),
files=files_url
)
client_ip = request.client.host
try:
if (registration_start and registration_end and number_limit) or number_minimum:
await Class_date.create(
class_list_id = new_class.id,
registration_start = registration_start,
registration_end = registration_end,
number_limit = number_limit,
amount_left = number_limit,
number_minimum = number_minimum
)
except:
print("input class_date fail")
# 寄開課信
try :
if user_id:
try:
user = await User.get_or_none(id = user_id)
user_info = await User_information.get_or_none(user_id=user_id)
if user is None or user_info is None:
my_log("info",__name__,f"Client IP: {client_ip} - {user_id} has no user resume")
return {"msg": "請去建立工藝家履歷", "code": 500}
email = user.email
subject = "創建課程通知"
# message = f"親愛的工藝家{user_info.name}您好,
\
# 感謝您埋下工藝種子!
\
# 您創建的課程由管理員進行審核中,將於3-7個工作天內e-mail通知您審核結果。
\
# 若7天後尚未收到課程審核通知,請將後台畫面、聯絡資訊等截圖寄送至客服信箱:
\
# craftology@ntcri.gov.tw,以便客服為您查詢。
\
# 註:此封信件為系統自動發送,請勿回信,謝謝。"
with open("/var/www/ntcri/assets/edm/success_open_class/index.html", 'r', encoding='utf-8') as html_file:
html_template = html_file.read()
message = html_template.replace("{username}",user_info.name)
send_email(email,"",subject,message)
except Exception as e:
return {"msg": str(e), "code": 500}
except:
print("no this user")
return {"msg": "success", "code": 200, "class_id": new_class.id}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/auto_create_session")
async def auto_create_session(
class_event_id : int = Form(default=0),
week_day_str : str = Form(default="[]"),
start_week_time : str = Form(default="[]"),
end_week_time : str = Form(default="[]"),
):
try:
week_day = eval(week_day_str)
start_week_time= eval(start_week_time)
end_week_time= eval(end_week_time)
#print(week_day)
class_obj = await Class_list.get_or_none(id = class_event_id)
if class_obj is None :
return {"msg": "no this class", "code": 500}
time_stemp = class_obj.start_time
session = 1
session_list = await Class_detail.filter(class_list_id=class_event_id).delete()
while time_stemp <= class_obj.end_time:
if week_day[time_stemp.weekday()]:
s_start_time = time_stemp.date().strftime("%Y-%m-%d")+" "+start_week_time[time_stemp.weekday()]
s_end_time = time_stemp.date().strftime("%Y-%m-%d")+" "+end_week_time[time_stemp.weekday()]
#time_del = class_obj.end_time.hour - time_stemp.hour
#time_del_minutes = class_obj.end_time.minute - time_stemp.minute
#time_end = time_stemp + timedelta(hours=time_del,minutes=time_del_minutes)
#print(time_end,class_obj.end_time.hour)
day1 = datetime.strptime(s_start_time, '%Y-%m-%d %H:%M:%S')
day2 = datetime.strptime(s_end_time, '%Y-%m-%d %H:%M:%S')
time_difference = day2 - day1
total_minutes = int(time_difference.total_seconds() // 60)
hour=total_minutes//60 + round((total_minutes%60)/60,1)
new_session = await Class_detail.create(
class_list_id=class_event_id,
#start_time=time_stemp,
#end_time=time_end,
start_time=s_start_time,
end_time=s_end_time,
hour = hour,
sessions=session,
content = ""
)
session+=1
time_stemp = time_stemp + timedelta(days=1)
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/insert_session")
async def insert_session(
class_event_id : int = Form(default=0),
start_time: datetime = Form(default=datetime.now()),
end_time: datetime = Form(default=datetime.now()),
content : str = Form(default='')
):
try:
session_list = await Class_detail.filter(class_list_id=class_event_id).all()
session = 0
if session_list != []:
for session_obj in session_list:
if session < session_obj.sessions:
session = session_obj.sessions
time_difference = end_time - start_time
session_list = await Class_detail.filter(class_list_id=class_event_id).delete()
total_minutes = int(time_difference.total_seconds() / 60)
hour=total_minutes//60 + round((total_minutes%60)/60,1)
new_session = await Class_detail.create(
class_list_id=class_event_id,
start_time=start_time,
end_time=end_time,
sessions=session +1,
content = content,
hour = hour
)
return {"msg": "success", "code": 200, "new_session_id": new_session.id}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/update_school")
async def update_school(
location_id: int = Form(default=0),
location_name: str = Form(default=''),
Lng: str = Form(default=''),
Lat: str = Form(default=''),
address : str = Form(default=''),
school_introduction : str = Form(default=None),
email : str = Form(default=None),
phone : str = Form(default=None),
teachers_list : List[str] = Form(default=[]),
is_pass_proposal : int = Form(default=None),
principal_user_email : str = Form(default=None)
):
try:
school = await Schools.get(id=location_id)
if location_name.strip() != '':
school.name = location_name
if Lng != '':
school.longitude = Lng
if Lat.strip() != '':
school.latitude = Lat
if address.strip() != '':
school.address = address
if school_introduction:
school.introduction = school_introduction
if email:
school.email = email
if phone:
school.phone = phone
if is_pass_proposal is not None:
if is_pass_proposal == 0 and school.is_pass_proposal == 2:
# 寄開課信
try :
if school.create_user_id:
try:
try :
user = await User.get(id = school.create_user_id)
user_info = await User_information.get(user_id = school.create_user_id)
except:
return {"msg": "請去建立工藝家履歷", "code": 500}
email = user.email
subject = "重新提案通知"
# message = f"親愛的工藝家{user_info.name}您好,
\
# 感謝您埋下工藝種子!
\
# 您的提案由管理員進行審核中,將於3-7個工作天內e-mail通知您審核結果。
\
# 若7天後尚未收到課程審核通知,請將後台畫面、聯絡資訊等截圖寄送至客服信箱:
\
# craftology@ntcri.gov.tw,以便客服為您查詢。
\
# 註:此封信件為系統自動發送,請勿回信,謝謝。"
with open("/var/www/ntcri/assets/edm/porposal/index.html", 'r', encoding='utf-8') as html_file:
html_template = html_file.read()
message = html_template.replace("{username}",user_info.name)
send_email(email,"",subject,message)
except:
print("have no email")
except:
print("no this user")
if is_pass_proposal != 0 and is_pass_proposal ==1 :
if school.create_user_id:
try:
user = await User.get_or_none(id = school.create_user_id)
user_info = await User_information.get(user_id = school.create_user_id)
if user is None :
return {"msg": "請去建立工藝家履歷", "code": 500}
email = user.email
subject = "提案審核通過通知"
# message = f"親愛的工藝家{user_info.name}您好,
\
# 恭喜您埋下工藝種子已成功發芽,讓我們一起期待它的成長茁壯!
\
# 您的提案已由管理員審核通過!
\
# 您可透過「會員專區」→「我的提案」檢視提案內容,並開始開課。"
with open("/var/www/ntcri/assets/edm/pass_porposal/index.html", 'r', encoding='utf-8') as html_file:
html_template = html_file.read()
message = html_template.replace("{username}",user_info.name)
send_email(email,"",subject,message)
except:
print("have no email")
school.is_pass_proposal = is_pass_proposal
if principal_user_email:
try:
principal_user = await User.get(email=principal_user_email,is_avtive=1)
school.principal_user_id = principal_user.id
except:
my_log("info",__name__,f"no this teacher")
if teachers_list != []:
teachers = []
for teacher in teachers_list:
try:
teacher_info = await User.get(email=teacher,is_avtive=1)
teachers.append(teacher_info.id)
except:
my_log("info",__name__,f"no this teacher")
school.teachers = str(teachers)
await school.save()
update_location_time(location_id= location_id)
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/update_class_name")
async def update_class_name(
request : Request,
class_name_id: int = Form(default=0),
name: str = Form(default=None),
location_id: int = Form(default=None),
category: str = Form(default=None),
introduction: str = Form(default=None),
organizer: str = Form(default=None),
cover_img_file:UploadFile = File(default=""),
group_id : int = Form(default=None),
group_sort : str = Form(default=None),
recommend: int = Form(default=None),
special_class_list_name : str = Form(default=None),
is_inner : int = Form(default=None),
is_check : int = Form(default=None),
teachers_resume : str = Form(default=None),
syllabus : str = Form(default=None)
):
try:
class_name = await Class_name.get(id=class_name_id)
if name is not None:
class_name.name = name
if location_id is not None:
class_name.school_id = location_id
update_location_time(location_id= location_id)
if category is not None:
class_name.category = category
if introduction is not None:
class_name.introduction = introduction
if organizer is not None:
class_name.organizer = organizer
if group_id is not None :
class_name.group_id = group_id
if is_inner is not None:
class_name.is_inner = is_inner
if syllabus is not None:
class_name.syllabus = syllabus
if teachers_resume is not None:
class_name.teachers_resume = teachers_resume
if cover_img_file != "":
contents = await cover_img_file.read()
with open(f"{IMAGEDIR}{cover_img_file.filename}", "wb") as f:
f.write(contents)
class_name.cover_img = f"{IMAGEDIR_short}{cover_img_file.filename}"
if group_sort is not None:
class_name.group_sort = group_sort
if recommend is not None:
class_name.recommend = recommend
client_ip = request.client.host
if is_check is not None:
if class_name.is_check == 0 and is_check ==1:
user_id = class_name.create_user_id
if user_id:
try:
user = await User.get(id = user_id)
user_info = await User_information.get(user_id = user_id)
except:
# my_log(type,name,msg)
my_log("info",__name__,f"Client IP: {client_ip} - {user_id} has no user resume")
return {"msg": "請去建立工藝家履歷", "code": 500}
email = user.email
subject = "課程核准通知"
# message = f"親愛的工藝家{user_info.name}您好,
\
# 恭喜您埋下工藝種子已成功發芽,讓我們一起期待它的成長茁壯!
\
# 您創建的課程已由管理員審核通過並上架成功!
\
# 您可透過「會員專區」→「我的開課」檢視跟管理學員報名狀況。
\
# 註:此封信件為系統自動發送,請勿回信,謝謝。"
with open("/var/www/ntcri/assets/edm/pass_check/index.html", 'r', encoding='utf-8') as html_file:
html_template = html_file.read()
message = html_template.replace("{username}",user_info.name)
try:
send_email(email,"",subject,message)
except:
my_log("error",__name__,f"Client IP: {client_ip} - send email fail")
class_name.is_check = is_check
if special_class_list_name:
class_name.special_class_list_name = special_class_list_name
await class_name.save()
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/cancel_proposal")
async def cancel_proposal(
request: Request,
location_id: int = Form(default=0),
access_token:str = Form(default=None),
message:str = Form(default=None)
):
try:
user_id = None
if access_token:
user_id = await check_token(access_token)
if user_id:
user = await User.get(id=user_id)
else :
return {"msg": "no access", "code": 200}
if user.is_superuser != 2:
return {"msg": "no access", "code": 200}
# 檢查是否有該課程
class_name_list = await Schools.filter(id=location_id).all()
if class_name_list == []:
my_log("info",__name__,f"Client IP: {client_ip} - no this class_name_id:{location_id}")
return {"msg": "沒有此課程", "code": 200}
class_name = await Schools.get(id=location_id)
class_name.is_pass_proposal = 2
await class_name.save()
if class_name.create_user_id:
creater = await User.get(id=class_name.create_user_id)
user_info = await User_information.get_or_none(user_id=user_id)
email = creater.email
subject = '提案駁回通知'
# email_message = f'親愛的工藝家{creater.username}您好,
\
# 很遺憾您埋下的工藝種子未能成功發芽。
\
# 您的提案經管理員審核,因{message}未能通過審核,
\
# 請別灰心!只要修正完善內容再次提交即有機會讓您的工藝種子發芽。
\
# 若對於審核結果有疑慮,請將後台畫面、聯絡資訊等截圖寄送至客服信箱:
\
# craftology@ntcri.gov.tw,以便客服為您查詢。
\
# 註:此封信件為系統自動發送,請勿回信,謝謝。'
with open("/var/www/ntcri/assets/edm/porposal_refuse/index.html", 'r', encoding='utf-8') as html_file:
html_template = html_file.read()
email_message = html_template.replace("{username}",user_info.name)
email_message = email_message.replace("{msg}",message)
send_email(email,"",subject,email_message)
else :
print("no creater")
client_ip = request.client.host
my_log("info",__name__,f"{client_ip} - school_id:{location_id} change is_pass_proposal to 2")
return {"msg": "success", "code": 200}
except Exception as e:
client_ip = request.client.host
my_log("error",__name__,f"{client_ip} - An exception occurred: {e} \n Request : {request}")
return {"msg": str(e), "code": 500}
@classes.post("/cancel_class_check")
async def cancel_class_check(
request: Request,
class_name_id: int = Form(default=0),
access_token:str = Form(default=None),
message:str = Form(default=None)
):
try:
user_id = None
if access_token:
user_id = await check_token(access_token)
if user_id:
user = await User.get(id=user_id)
else :
return {"msg": "no access", "code": 200}
if user.is_superuser != 2:
return {"msg": "no access", "code": 200}
# 檢查是否有該課程
class_name_list = await Class_name.filter(id=class_name_id).all()
if class_name_list == []:
my_log("info",__name__,f"Client IP: {client_ip} - no this class_name_id:{class_name_id}")
return {"msg": "沒有此課程", "code": 200}
class_name = await Class_name.get(id=class_name_id)
class_name.is_check = 2
await class_name.save()
if class_name.create_user_id:
creater = await User.get(id=class_name.create_user_id)
user_info = await User_information.get_or_none(user_id=user_id)
email = creater.email
subject = '開課駁回通知'
# email_message = f'親愛的工藝家{creater.username}您好,
\
# 很遺憾您埋下的工藝種子未能成功發芽。
\
# 您創建的課程經管理員審核,因{message}未能通過審核,
\
# 請別灰心!只要修正完善內容再次提交即有機會讓您的工藝種子發芽。
\
# 若對於審核結果有疑慮,請將後台畫面、聯絡資訊等截圖寄送至客服信箱:
\
# craftology@ntcri.gov.tw,以便客服為您查詢。
\
# 註:此封信件為系統自動發送,請勿回信,謝謝。'
with open("/var/www/ntcri/assets/edm/class_refuse/index.html", 'r', encoding='utf-8') as html_file:
html_template = html_file.read()
email_message = html_template.replace("{username}",user_info.name)
email_message = email_message.replace("{msg}",message)
send_email(email,"",subject,email_message)
else :
print("no creater")
client_ip = request.client.host
my_log("info",__name__,f"{client_ip} - class_name_id:{class_name_id} change is_check to 2")
return {"msg": "success", "code": 200}
except Exception as e:
client_ip = request.client.host
my_log("error",__name__,f"{client_ip} - An exception occurred: {e} \n Request : {request}")
return {"msg": str(e), "code": 500}
@classes.post("/update_event")
async def update_event(
id: int = Form(default=0),
name_id: int = Form(default=None),
event: str = Form(default=None),
start_time: datetime = Form(default=None),
end_time: datetime = Form(default=None),
contact: str = Form(default=None),
lecturer: str = Form(default=None),
location: str = Form(default=None),
content: str = Form(default=None),
URL: str = Form(default=None),
people : str = Form(default=None),
fee_method: str = Form(default=None),
registration_way: str = Form(default=None),
registration_start: datetime = Form(default=None),
registration_end: datetime = Form(default=None),
number_limit: int = Form(default=None),
remark : str = Form(default=None),
ATM_address :str = Form(default=None),
fee_payment : str = Form(default=None),
number_minimum: int = Form(default=None),
files_url = Depends(create_upload_files)
):
try:
class_obj = await Class_list.get_or_none(id=id)
if class_obj is None :
return {"msg": "no this event", "code": 500}
if name_id != None:
class_obj.name_id = name_id
if event != None:
class_obj.event = event
if start_time != None:
class_obj.start_time = start_time
if end_time != None:
class_obj.end_time = end_time
if lecturer != None:
class_obj.lecturer = lecturer
if location != None:
class_obj.location = location
if contact != None:
class_obj.contact = contact
if content!= None:
class_obj.content = content
if URL!= None:
class_obj.URL = URL
if people!= None:
class_obj.people = people
if fee_method!= None:
class_obj.fee_method = fee_method
if registration_way!= None:
class_obj.registration_way = registration_way
if remark!= None:
class_obj.remark = remark
if ATM_address!= None:
class_obj.ATM_address = ATM_address
if fee_payment!= None:
class_obj.fee_payment = fee_payment
if files_url:
if files_url.__contains__("msg"):
return {"msg": files_url["msg"], "code": 500}
class_obj.files_url = files_url
class_date_obj,created = await Class_date.get_or_create(
class_list_id=id,
defaults={
"registration_start" : registration_start,
"registration_end" : registration_end,
"number_limit" : number_limit,
"amount_left" : number_limit,
"number_minimum" : number_minimum
}
)
if not created :
if registration_start :
class_date_obj.registration_start = registration_start
if registration_end :
class_date_obj.registration_end = registration_end
if number_limit :
class_date_obj.number_limit = number_limit
if number_minimum != None:
class_date_obj.number_minimum = number_minimum
await class_date_obj.save()
await class_obj.save()
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/update_session")
async def update_session(
session_id : int = Form(default=0),
class_event_id : int = Form(default=0),
start_time: datetime = Form(default=datetime.now()),
end_time: datetime = Form(default=datetime.now()),
sessions: str = Form(default=0),
content : str = Form(default='')
):
try:
class_session_obj = await Class_detail.get(id=session_id)
if class_event_id != 0:
class_session_obj.class_list_id = class_event_id
if start_time != '':
class_session_obj.start_time = start_time
if end_time != '':
class_session_obj.end_time = end_time
if sessions.strip() != '':
class_session_obj.sessions = sessions
if content.strip() != '':
class_session_obj.content = content
await class_session_obj.save()
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/delete_school")
async def delete_school(location_id: int):
if location_id:
try:
school = await Schools.get(id=location_id)
except:
return {"msg": "無法找到據點", "code": 500}
school.is_delete = (school.is_delete+1)%2
await school.save()
return {"msg": "success", "code": 200}
@classes.post("/delete_session")
async def delete_session(id: int):
if id:
await Class_detail.filter(id=id).delete()
return {"msg": "success", "code": 200}
@classes.post("/delete_event")
async def delete(id: int):
if id:
await Class_detail.filter(class_list_id=id).delete()
await Class_list.filter(id=id).delete()
await Class_date.filter(class_list_id=id).delete()
return {"msg": "success", "code": 200}
@classes.post("/delete_class_name")
async def delete(id: int):
if id:
try:
class_name = await Class_name.get(id=id)
except:
return {"msg": "無法找到課程", "code": 500}
class_name.is_delete = (class_name.is_delete+1)%2 # 相反狀態(如果本來被刪除就恢復)
await class_name.save()
# class_event_list = await Class_list.filter(name_id=id).all()
# for class_event_obj in class_event_list:
# await Class_detail.filter(class_list_id=class_event_obj.id).delete()
# await Class_date.filter(class_list_id=class_event_obj.id).delete()
# await Class_list.filter(name_id=id).delete()
# await Class_name.filter(id=id).delete()
return {"msg": "success", "code": 200}
@classes.get("/get_class_state")
async def check_date_state(
class_name_id : Optional[int] = None,
class_event_id : Optional[int] = None,
special_class_list_name : Optional[str] = None,
):
if class_name_id:
Q_word = Q(name_id=class_name_id)
elif class_event_id:
Q_word = Q(id=class_event_id)
else:
return {"msg": 'please input ID', "code": 500}
if special_class_list_name == "one_day_class":
class_list = await One_day_class.filter(Q_word).all()
elif special_class_list_name == "outter_class_list":
class_list = await Outter_class_list.filter(Q_word).all()
else :
class_list = await Class_list.filter(Q_word).all()
result = {"state": "尚未開放報名"}
try:
if class_list == [] :
result["state"] = "尚未創建場次"
for class_obj in class_list:
try:
class_name = await Class_name.get_or_none(id = class_obj.name_id)
if class_name.is_check == 0 :
result["state"] = "課程等待審核"
break
if class_name.is_check == 2 :
result["state"] = "課程審核未通過"
break
if class_name:
print(class_name.is_record_point)
print(class_name.name)
school = await Schools.get_or_none(id = class_name.school_id)
print(class_name.school_id)
if school :
print(school.is_pass_proposal)
if school.is_pass_proposal == 0 :
result["state"] = "正在提案階段"
break
if school.is_pass_proposal == 2 :
result["state"] = "提案已被駁回"
break
if class_name.is_record_point == 1 :
result["state"] = "課程已關閉"
break
if class_obj.start_time and class_obj.end_time:
if class_obj.start_time.replace(tzinfo=None) <= datetime.now() and class_obj.end_time.replace(tzinfo=None) >= datetime.now():
result["state"] = "開課中"
break
elif class_obj.end_time.replace(tzinfo=None) < datetime.now():
result["state"] = "課程已結束"
else:
pass
else :
result["state"] = "尚未創建課程"
except Exception as e:
pass
if special_class_list_name == "one_day_class":
if class_obj.reg_deadline.replace(tzinfo=None) >= datetime.now() :
result["state"] = "報名中"
break
elif class_obj.reg_deadline.replace(tzinfo=None) < datetime.now():
result["state"] = "報名截止(報名額滿)"
break
else :
try:
class_date_obj = await Class_date.get(class_list_id=class_obj.id)
if class_date_obj.registration_start.replace(tzinfo=None) <= datetime.now() and class_date_obj.registration_end.replace(tzinfo=None) >= datetime.now() :
if class_date_obj.amount_left == 0:
result["state"] = "報名截止(報名額滿)"
break
else:
result["state"] = "報名中"
break
elif class_date_obj.registration_end.replace(tzinfo=None) < datetime.now():
result["state"] = "報名截止(人數已滿)"
break
except Exception as e:
pass
return {"msg": "success", "code": 200, "result": result}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_event")
async def search_event(
class_name_id: Optional[int] = None,
event_id : Optional[int] = None,
access_token: Optional[str] = None
):
try:
try :
class_name_obj = await Class_name.get(id=class_name_id)
except:
return {"msg": "no this class id", "code": 500}
special_class_list_name = class_name_obj.special_class_list_name
Q_word = Q()
if class_name_id:
Q_word = Q_word | Q(name_id = class_name_id)
elif event_id:
Q_word = Q_word | Q(id = event_id)
else :
return {"msg": "please input class_name_id or event_id", "code": 500}
user_id = None
if access_token:
user_id = await check_token(access_token)
if user_id:
Q_word = Q_word & Q(create_user_id = user_id)
model_fields =[]
state = {}
if special_class_list_name==None:
class_list = Class_list.filter(Q_word).all().order_by("-start_time")
elif special_class_list_name=='one_day_class':
class_list = One_day_class.filter(Q_word).all().order_by("-start_time")
elif special_class_list_name=='outter_class_list':
class_list = Outter_class_list.filter(Q_word).all().order_by("-start_time")
else:
class_list = Class_list.filter(Q_word).all().order_by("-start_time")
class_list = await class_list.all()
classes = []
for class_obj in class_list:
class_name_obj = await Class_name.get(id=class_obj.name_id)
class_name = class_name_obj.name
try :
class_data = class_obj.show_data()
class_data["class_name"] = class_name
state = await check_date_state(class_event_id=class_obj.id,special_class_list_name=special_class_list_name)
class_data["state"] = state["result"]["state"]
try:
class_date_obj = await Class_date.get(class_list_id=class_obj.id)
class_data["registration_start"] = class_date_obj.registration_start
class_data["registration_end"] = class_date_obj.registration_end
class_data["number_limit"] = class_date_obj.number_limit
class_data["amount_left"] = class_date_obj.amount_left
class_data["number_minimum"] = class_date_obj.number_minimum
model_fields.append("registration_start","registration_end","number_limit","amount_left")
except:
print("沒有報名時間&人數限制的資料")
except:
class_data = {
"msg" : "fail to get data"
}
classes.append(class_data)
return {"msg": "success", "code": 200, "classes": classes}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_school")
async def get_school(
location_id : Optional[int] = None,
keyword : Optional[str] = None,
location_keyword : Optional[str] = None,
page_num : Optional[int] = None,
page_amount : Optional[int] = None,
category : Optional[str] = None,
access_token: Optional[str] = None,
is_delete: Optional[int] = None,
is_pass_proposal : Optional[int] = None,
principal_user_email : Optional[str] = None
):
try:
await copy()
my_log("info",__name__,"get_school")
Q_word = Q()
if location_id :
Q_word = Q_word & Q(id = location_id)
if keyword :
Q_word = Q_word & Q(name__icontains=keyword)
if location_keyword :
Q_word = Q_word & Q(address__icontains=location_keyword)
if is_pass_proposal is not None :
Q_word = Q_word &Q(is_pass_proposal=is_pass_proposal)
if principal_user_email :
try:
principal_user = await User.get(email=principal_user_email,is_avtive=1)
except:
my_log("info",__name__,f"no this teacher")
Q_word = Q_word & Q(principal_user_id=principal_user.id)
if is_delete is not None:
Q_word = Q_word &Q(is_delete=is_delete)
else :
Q_word = Q_word &Q(is_delete=0)
user_id = None
if access_token:
user_id = await check_token(access_token)
if user_id:
Q_word = Q_word &Q(create_user_id = user_id)
Q_word2 = Q()
if category:
for tmp_word in category.split(",") :
Q_word2 = Q_word2 | Q(category__icontains=tmp_word)
Q_word = Q_word & Q_word2
school_list = Schools.filter(Q_word)
# for school_obj in school_list_tmp:
# class_list = await Class_name.filter(Q(school_id = school_obj.id) & Q_word)
# if class_list == []:
# school_list = school_list.exclude(id = school_obj.id)
count = await school_list.all().count()
if page_num and page_amount:
school_list = school_list.offset((page_num-1)*page_amount).limit(page_amount)
school_list = await school_list.all().order_by("-update_time")
schools = []
for school_obj in school_list:
try :
school_data = school_obj.show_data()
if school_data["Lng"] == (None or '') or school_data["Lat"] == (None or ''):
try:
# 這裡response採用json格式,可以自由選擇為json?或是xml?
url = 'https://maps.googleapis.com/maps/api/geocode/json?'
params = {'key':'AIzaSyDETbkTAmZNOECx2u4rmNudHvyoQTgDbc4', 'address':''}
params['address'] = school_data["address"]
#print(params['address'])
res = requests.get(url, params=params)
mes = json.loads(res.text)
school_data["Lat"] = mes['results'][0]['geometry']['location']['lat']
school_data["Lng"] = mes['results'][0]['geometry']['location']['lng']
print(school_data["Lat"],school_data["Lng"])
school_obj.longitude = school_data["Lng"]
school_obj.latitude = school_data["Lng"]
await school_obj.save()
except :
my_log("error",__name__,f"not get lat and lng")
schools.append(school_data)
except:
schools.append({"msg : data wrong"})
return {"msg": "success", "code": 200, "total_num" : count,"schools": schools}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_group_name")
async def get_school_group(
id : Optional[int] = 0
):
try:
if id==0:
school_group_list = await Group_name.all()
print(school_group_list)
else:
school_group_list = [await Group_name.get(id=id)]
print(school_group_list)
school_groups = []
for school_obj in school_group_list:
school_data = {
"group_id": school_obj.id,
"group_name": school_obj.group_name,
"describe": school_obj.describe
}
school_groups.append(school_data)
return {"msg": "success", "code": 200, "school_groups": school_groups}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/update_group_name")
async def update_school_group(
id : int = Form(default=0),
group_name : str = Form(default=''),
describe : str = Form(default=''),
):
try:
group_name_obj = await Group_name.get(id=id)
if group_name.strip() != '':
group_name_obj.group_name = group_name
if describe.strip() != '':
group_name_obj.describe = describe
await group_name_obj.save()
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_class_name")
async def get_class_name(
location_id : str = None ,
class_name_id : Optional[int] = None,
group_id : Optional[int] = None,
group_sort :Optional[str] = None,
exclude_word: Optional[str] = None,
category :Optional[str] = None,
page_num : Optional[int] = None,
page_amount : Optional[int] = None,
recommend : Optional[int] = None,
is_inner : Optional[int] = None,
is_check : Optional[int] = None,
access_token: Optional[str] = None,
is_delete: Optional[int] = None,
keyword: Optional[str] = None,
has_user : Optional[int] = None,
is_record_point : Optional[int] = None,
location_keyword : Optional[str] = None,
encode : Optional[str] = None,
syllabus : Optional[str] = None
):
try:
await copy()
class_name_list = Class_name.all()
Q_word = Q()
if location_keyword:
school_list = await Schools.filter(Q(address__icontains=location_keyword)).all()
schools = []
for school in school_list:
schools.append(school.id)
Q_word = Q_word & Q(school_id__in = schools)
if group_id:
Q_word = Q_word & Q(group_id = group_id)
if group_sort:
Q_word = Q_word & Q(group_sort = group_sort)
if encode:
Q_word = Q_word & Q(encode = encode)
if syllabus:
Q_word = Q_word & Q(syllabus__icontains = syllabus)
if category:
Q_word2 = Q()
for tmp_word in category.split(",") :
Q_word2 = Q_word2 | Q(category__icontains=tmp_word)
Q_word = Q_word &Q_word2
if location_id :
location_id_list = eval(location_id)
if not isinstance(location_id_list, list):
location_id_list = [location_id_list]
Q_word = Q_word & Q(school_id__in = location_id_list)
if class_name_id :
Q_word = Q_word & Q(id = class_name_id)
if recommend :
Q_word = Q_word & Q(recommend = recommend)
if is_inner!=None:
Q_word = Q_word & Q(is_inner = is_inner)
if is_check!=None:
if is_check == 3 :
Q_word = Q_word & Q(is_record_point =1) #已關閉
else:
Q_word = Q_word & Q(is_check = is_check,is_record_point__in = [0,None])
if is_delete is not None:
Q_word = Q_word & Q(is_delete = is_delete)
elif is_delete == 2:
pass
else :
Q_word = Q_word & Q(is_delete = 0)
if keyword:
Q_word = Q_word & ( Q(name__icontains=keyword)|
Q(category__icontains=keyword)|
Q(introduction__icontains=keyword)|
Q(organizer__icontains=keyword)|
Q(group_sort__icontains=keyword) |
Q(syllabus__icontains=keyword))
if is_record_point :
Q_word = Q_word & Q(is_record_point =is_record_point)
user_id = None
if access_token:
user_id = await check_token(access_token)
if user_id:
Q_word = Q_word & Q(create_user_id = user_id)
else :
return {"msg": "no access", "code": 500}
if has_user==1:
Q_word = Q_word & Q(create_user_id__isnull=False)
class_name_list = class_name_list.filter(Q_word).all()
if exclude_word:
for tmp_word in exclude_word.split(",") :
# print(tmp_word)
class_name_list = class_name_list.exclude(Q(category__icontains=tmp_word)|Q(group_sort__icontains=tmp_word)).all()
count = await class_name_list.all().count()
if page_num and page_amount:
class_name_list = class_name_list.offset((page_num-1)*page_amount).limit(page_amount)
class_name_list = await class_name_list.all().order_by("-id")
classes_name = []
state = {}
special_class_list_name = None
for class_name_obj in class_name_list:
try:
school_obj = await Schools.get(id=class_name_obj.school_id)
school_obj_id = school_obj.id
except:
school_obj_id = 0
try:
special_class_list_name = class_name_obj.special_class_list_name
if special_class_list_name:
state = await check_date_state(class_name_id=class_name_obj.id,special_class_list_name=special_class_list_name)
else:
state = await check_date_state(class_name_id=class_name_obj.id)
class_data = class_name_obj.show_data()
if school_obj_id:
for key, item in school_obj.show_data().items():
class_data[key] = item
class_data["state"]=state["result"]["state"]
if class_data["state"] == "課程已關閉" :
class_data["is_check"] = 3
except Exception as e:
class_data = {
"msg" : str(e)
}
classes_name.append(class_data)
return {"msg": "success", "code": 200,"total_num" : count,"classes": classes_name}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_session")
async def get_session(
event_id : Optional[int] = None
):
try:
if not event_id:
return {"msg": "please input event_id", "code": 500}
class_session_list = await Class_detail.filter(class_list_id=event_id).all().order_by("start_time")
classe_sessions = []
week_day = [0] * 7
start_week_time= [None] * 7
end_week_time= [None] * 7
for class_session_obj in class_session_list:
class_session_data = {
"session_id": class_session_obj.id,
"class_event_id": class_session_obj.class_list_id,
"start_time": class_session_obj.start_time,
"end_time": class_session_obj.end_time,
"sessions": class_session_obj.sessions,
"content": class_session_obj.content
}
week_day[class_session_obj.start_time.weekday()] = 1
start_week_time[class_session_obj.start_time.weekday()]=class_session_obj.start_time.time().strftime("%H:%M")
end_week_time[class_session_obj.end_time.weekday()]=class_session_obj.end_time.time().strftime("%H:%M")
classe_sessions.append(class_session_data)
if len(class_session_list)== 1 :
data_form = "one day class"
else :
data_form = {
"week_day" : week_day,
"start_week_time" : start_week_time,
"end_week_time" :end_week_time
}
return {"msg": "success", "code": 200, "data_form":data_form,"classe_sessions": classe_sessions}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/search_class_like")
async def search_class_like(
keyword: str,
location_id : Optional[int] = None ,
group_id : Optional[int] = None,
group_sort :Optional[str] = None,
exclude_word: Optional[str] = None,
category :Optional[str] = None,
page_num : Optional[int] = None,
page_amount : Optional[int] = None,
recommend : Optional[int] = None,
is_inner : Optional[int] = None,
is_check : Optional[int] = None
):
try:
Q_word = Q()
if group_id:
Q_word = Q_word & Q(group_id = group_id)
if group_sort:
Q_word = Q_word & Q(group_sort = group_sort)
if category:
for tmp_word in category.split(",") :
Q_word = Q_word | Q(category__icontains=tmp_word)
if location_id :
Q_word = Q_word & Q(school_id = location_id)
if recommend :
Q_word = Q_word & Q(recommend = recommend)
if is_inner!=None:
Q_word = Q_word & Q(is_inner = is_inner)
if is_check!=None:
Q_word = Q_word & Q(is_check = is_check)
class_name_id = []
for class_list in await Class_list.filter(Q(lecturer__icontains=keyword)):
class_name_id.append(class_list.name_id)
for class_list in await One_day_class.filter(Q(teacher__icontains=keyword)):
class_name_id.append(class_list.name_id)
if keyword :
Q_word = Q_word & ( Q(name__icontains=keyword)|
Q(category__icontains=keyword)|
Q(introduction__icontains=keyword)|
Q(organizer__icontains=keyword)|
Q(group_sort__icontains=keyword) |
Q(id__in=class_name_id)|
Q(syllabus__icontains=keyword))
class_name_list = Class_name.filter(Q_word).all()
count = await class_name_list.all().count()
if page_num and page_amount:
class_name_list = class_name_list.offset((page_num-1)*page_amount).limit(page_amount)
if exclude_word:
for tmp_word in exclude_word.split(",") :
# print(tmp_word)
class_name_list = class_name_list.exclude(Q(category__icontains=tmp_word)|Q(group_sort__icontains=tmp_word)).all()
class_name_list = await class_name_list.all().order_by("-id")
classes_name = []
state = {}
special_class_list_name = None
for class_name_obj in class_name_list:
try:
school_obj = await Schools.get(id=class_name_obj.school_id)
school_obj_id = school_obj.id
except:
school_obj_id = 0
try:
special_class_list_name = class_name_obj.special_class_list_name
if special_class_list_name:
state = await check_date_state(class_name_id=class_name_obj.id,special_class_list_name=special_class_list_name)
else:
state = await check_date_state(class_name_id=class_name_obj.id)
class_data = class_name_obj.show_data()
if school_obj_id:
for key, item in school_obj.show_data().items():
class_data[key] = item
class_data["state"]=state["result"]["state"]
except Exception as e:
class_data = {
"msg" : str(e)
}
classes_name.append(class_data)
return {"msg": "success", "code": 200, "total_num" : count,"classes": classes_name}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/add_favorite_class")
async def add_favorite_class(
class_name_id: int,
user_id = Depends(check_token),
time_stemp: datetime = datetime.now()
):
try:
if not user_id:
return {"msg": "no access", "code": 500}
new_favorite_class = await Favorite_course.get_or_create(
class_name_id=class_name_id,
user_id=user_id,
defaults={'time_stemp': time_stemp}
)
return {"msg": "success", "code": 200,"is exist": not new_favorite_class[1],"id":new_favorite_class[0].id}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_favorite_class")
async def get_favorite_class(
user_id = Depends(check_token),
no_details : Optional[int] = None
):
try:
class_list = await Favorite_course.filter(user_id = user_id).all()
favorite_courses = []
for class_obj in class_list:
if no_details:
class_data = {
"id": class_obj.id,
"user_id": class_obj.user_id,
"class_name_id" : class_obj.class_name_id,
"time_stemp":class_obj.time_stemp
}
favorite_courses.append(class_data)
else:
class_data = {
"id": class_obj.id,
"user_id": class_obj.user_id,
"class_name_id" : class_obj.class_name_id,
"time_stemp":class_obj.time_stemp
}
result = await get_class_name(class_name_id = class_obj.class_name_id)
try:
class_detail = result["classes"][0]
for key,item in class_detail.items():
class_data[key] = item
favorite_courses.append(class_data)
except:
class_data["msg"] = "this class doesn't exit"
favorite_courses.append(class_data)
return {"msg": "success", "code": 200, "favorite_courses": favorite_courses}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/delete_favorite_class")
async def delete_favorite_class(
class_name_id: int,
user_id = Depends(check_token)
):
try:
await Favorite_course.filter(class_name_id=class_name_id,user_id=user_id).delete()
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/insert_online_course")
async def insert_online_course(
title : str = Form(default=''),
category : str = Form(default=''),
create_time :str = Form(default=datetime.now()),
content : str = Form(default=''),
video_url :str = Form(default='')
):
try:
new_online_course = await Online_course.create(
title=title,
create_time=create_time,
category=category,
content=content,
video_url=video_url,
group_id = 8
)
return {"msg": "success", "code": 200, "online_course_obj": new_online_course.id}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/update_online_course")
async def update_online_course(
id : int = Form(default=0),
title : str = Form(default=''),
category : str = Form(default=''),
create_time :str = Form(default=datetime.now()),
content : str = Form(default=''),
video_url :str = Form(default='')
):
try:
online_course_obj = await Online_course.get(id=id)
if title.strip() != '':
online_course_obj.title = title
if category.strip() != '':
online_course_obj.category = category
if create_time.strip() != '':
online_course_obj.create_time = create_time
if content.strip() != '':
online_course_obj.content = content
if video_url.strip() != '':
online_course_obj.video_url = video_url
await online_course_obj.save()
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_online_courese")
async def get_online_courese(
online_courese_id : Optional[int] = None,
category:Optional[str] = None,
group_id : Optional[int] = None,
page_num : Optional[int] = None,
page_amount : Optional[int] = None,
org :Optional[str] = None,
no_org:Optional[str] = None,
key_word:Optional[str] = None,
):
try:
online_courese_list = Online_course.all()
Q_word = Q()
if group_id:
Q_word = Q_word & Q(group_id = group_id)
if online_courese_id :
Q_word = Q_word & Q(id = online_courese_id)
if org :
Q_word = Q_word & Q(org = org)
if no_org:
no_org_condition = Q(org__isnull=True) | ~Q(org=no_org)
Q_word = Q_word & no_org_condition
if category:
category_conditions = Q()
for tmp_word in category.split(",") :
category_conditions |= Q(category__icontains=tmp_word)
Q_word = Q_word & category_conditions
#Q_word = Q_word | Q(category__icontains=tmp_word)
if key_word:
key_word_condition = Q(title__icontains=key_word) | Q(category__icontains=key_word) | Q(content__icontains=key_word)
Q_word = Q_word & key_word_condition
count = await online_courese_list.all().filter(Q_word).count()
if page_num and page_amount:
online_courese_list = online_courese_list.offset((page_num-1)*page_amount).limit(page_amount)
online_courese_list = await online_courese_list.all().filter(Q_word).order_by("id")
online_coureses = []
for online_coures_obj in online_courese_list:
online_coures_data = online_coures_obj.show_data()
online_coureses.append(online_coures_data)
return {"msg": "success", "code": 200, "total_num" : count,"online_coures": online_coureses}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/delete_online_course")
async def delete_online_course(
online_course_id : int
):
try:
await Online_course.filter(id=online_course_id).delete()
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_group_classes_and_articles")
async def get_group_classes_and_articles(
group_id : int,
page_num : Optional[int] = None,
page_amount : Optional[int] = None
):
try:
class_name_list = Class_name.filter(group_id = group_id,is_delete=0).all().order_by("-id")
article_list = Article_list.filter(group_id = group_id,is_del=0).all().order_by("-id")
class_count = await class_name_list.all().count()
article_count = await article_list.all().count()
if page_num and page_amount:
class_name_list = class_name_list.offset((page_num-1)*page_amount).limit(page_amount)
article_list = article_list.offset((page_num-1)*page_amount).limit(page_amount)
class_name_list = await class_name_list.all()
article_list = await article_list.all()
article_objs = []
for article_obj in article_list:
try :
article_tmp = {
"article_id": article_obj.id,
"title": article_obj.title,
"school_id" :article_obj.school_id,
"group_sort" :article_obj.group_sort,
"group_id" :article_obj.group_id,
"category": article_obj.category,
"create_time" : article_obj.create_time,
"click_time" : article_obj.click_time,
"depiction" : article_obj.depiction,
"content" : article_obj.content,
"files" : article_obj.files,
"vedio_url" : article_obj.vedio_url,
"tags" : article_obj.tags,
"cover_img": article_obj.cover_img
}
except:
article_tmp = {
"msg" : "fail to get data"
}
article_objs.append(article_tmp)
classes_name = []
for class_name_obj in class_name_list:
try:
school_obj = await Schools.get(id=class_name_obj.school_id)
school_obj_id = school_obj.id
except:
school_obj_id = 0
try:
special_class_list_name = class_name_obj.special_class_list_name
if special_class_list_name:
state = await check_date_state(class_name_id=class_name_obj.id,special_class_list_name=special_class_list_name)
else:
state = await check_date_state(class_name_id=class_name_obj.id)
class_data = class_name_obj.show_data()
if school_obj_id:
for key, item in school_obj.show_data().items():
class_data[key] = item
class_data["state"]=state["result"]["state"]
except Exception as e:
class_data = {
"msg" : str(e)
}
classes_name.append(class_data)
return {"msg": "success", "code": 200,"class_num" : class_count,"classes": classes_name,"article_num":article_count,"articles": article_objs}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/add_attend_record_by_event")
async def add_attend_record(
class_id : int = Form(default=0),
):
try:
if not class_id:
return {"msg": "Please input right ","code":500}
try:
class_detail_obj = await Class_detail.filter(class_list_id = class_id).all()
if not isinstance(class_detail_obj, list):
class_detail_obj = [class_detail_obj]
registration_obj = await Registration.filter(event_id = class_id,is_del = 0,).all()
if not isinstance(registration_obj, list):
registration_obj = [registration_obj]
if not class_detail_obj:
return {"msg": "no this class_detail_id session","code":500}
except:
return {"msg": "get class_detail_id error","code":500}
for obj in class_detail_obj:
class_event = await Class_list.get_or_none(id = class_id)
if class_event is None:
return {"msg": "no this class","code":500}
for obj1 in registration_obj:
new_record = await Attend_record.get_or_create(
class_detail_id = obj.id,
user_id = obj1.user_id,
defaults={'class_name_id':class_event.name_id,'class_event_id':class_id,'is_attend': 0}
)
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/add_attend_record")
async def add_attend_record(
class_detail_id : int = Form(default=0),
user_id : int = Form(default=0),
is_attend : int = Form(default=0)
):
try:
if not class_detail_id or not user_id:
return {"msg": "Please input right ","code":500}
try:
class_detail_obj = await Class_detail.get_or_none(id = class_detail_id)
class_list = await Class_list.get_or_none(id = class_detail_obj.class_list_id)
if not class_detail_obj or not class_list:
return {"msg": "no this class_detail_id","code":500}
except:
return {"msg": "get class_detail_id error","code":500}
msg = ""
new_record,created = await Attend_record.get_or_create(
class_detail_id = class_detail_id,
user_id = user_id,
defaults = {
"class_name_id" : class_list.name_id,
"class_list_id" : class_detail_obj.class_list_id,
"is_attend" : is_attend
}
)
if not created:
try:
new_record.is_attend = is_attend
await new_record.save()
msg = "update success"
except:
msg = "update fail"
else:
msg = "created success"
return {"msg": msg, "code": 200,"new_record_id":new_record.id}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/close_class")
async def close_class(user_id:int = Depends(check_token),class_name_id : int = None,cancel:int=None):
try:
if user_id :
user = await User.get(id=user_id)
class_name = await Class_name.get_or_none(id = class_name_id)
if not class_name :
return {"msg": "no this class", "code": 500}
elif class_name.is_record_point == 1:
if cancel==1:
user_attend_record = await Attend_record.filter(class_name_id=class_name_id,is_attend = 1)
for tmp in user_attend_record:
point = await User_point.get_or_none(user_id = tmp.user_id)
class_detail = await Class_detail.get_or_none(id = tmp.class_detail_id)
point.hours -= float(class_detail.hour)
await point.save()
class_name.is_record_point = 0
await class_name.save()
return {"msg": "class closed has cancel", "code": 500}
else:
return {"msg": "class has closed", "code": 500}
else :
if class_name.create_user_id:
if class_name.create_user_id != user_id:
print(class_name.create_user_id,user_id)
return {"msg": "no access", "code": 200}
user_attend_record = await Attend_record.filter(class_name_id=class_name_id,is_attend = 1)
for tmp in user_attend_record:
point,created = await User_point.get_or_create(
user_id = tmp.user_id,
defaults = {
"hours" : 0.0,
"points" : 0.0
}
)
class_detail = await Class_detail.get_or_none(id = tmp.class_detail_id)
point.hours += float(class_detail.hour)
await point.save()
result = await count_point(tmp.user_id)
class_name.is_record_point = 1
await class_name.save()
return {"msg":"success close class and record point", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
async def count_point(user_id) :
point = await User_point.get_or_none(user_id = user_id)
records = await Point_exchange_record.filter(user_id = user_id)
if point:
point.points = int(point.hours/3)
for record in records:
point.points -= record.point_exchange
await point.save()
return {"msg":"success", "code": 200}
else:
return {"msg":"沒有點數資料", "code": 500}
@classes.post("/delete_attend_record")
async def delete_attend_record(
id : int = 0
):
try :
record = await Attend_record.get(id=id)
await Attend_record.filter(id=id).delete()
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/update_attend_record")
async def update_attend_record(
id : int = 0,
class_detail_id : Optional[int] = None,
user_id : Optional[int] = None,
is_attend : Optional[int] = None
):
if not id :
return {"msg":"please input id"}
try :
tmp = await Attend_record.get(id=id)
if class_detail_id!=None:
tmp.class_detail_id = class_detail_id
if user_id!=None:
tmp.user_id = user_id
if is_attend!=None:
tmp.is_attend = is_attend
await tmp.save()
return {"msg": "success", "code": 200}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_attend_record")
async def get_attend_record(
class_event_id : Optional[int] = None,
class_detail_id : Optional[int] = None,
user_id : Optional[int] = None,
is_attend : Optional[int] = None,
page_num : Optional[int] = None,
page_amount : Optional[int] = None
):
try:
attend_record_list = Attend_record.all()
if class_event_id:
id_list = []
class_detail_list = await Class_detail.filter(class_list_id = class_event_id)
for tmp in class_detail_list :
id_list.append(tmp.id)
attend_record_list = Attend_record.filter(class_detail_id__in=id_list)
if class_detail_id:
attend_record_list = attend_record_list.filter(class_detail_id = class_detail_id)
if user_id :
attend_record_list = attend_record_list.filter(user_id = user_id)
if is_attend :
attend_record_list = attend_record_list.filter(is_attend = is_attend)
count = await attend_record_list.all().count()
if page_num and page_amount:
attend_record_list = attend_record_list.offset((page_num-1)*page_amount).limit(page_amount)
attend_record_list = await attend_record_list.all()
attend_records = []
for obj in attend_record_list:
class_detail_list = await Class_detail.get_or_none(id = obj.class_detail_id)
if class_detail_list == None:
return {"msg": f"class detail number error id={obj.class_detail_id}", "code": 500}
#registration_list = await Registration.filter(event_id = class_detail_list[0].class_list_id,user_id =obj.user_id)
user_list = await User.filter(id = obj.user_id)
user_information_list = await User_information.filter(user_id = obj.user_id)
class_list = await Class_list.get_or_none(id = obj.class_event_id)
if class_list != None :
class_name = await Class_name.get_or_none(id = class_list.name_id)
#if len(registration_list)==0:
# continue
data = {
"id" :obj.id,
"class_name_id" : obj.class_name_id,
"class_event_id":class_detail_list.class_list_id,
"class_detail_id" :obj.class_detail_id,
"user_id" :obj.user_id,
"is_attend" :obj.is_attend,
#"reg_confirm" : registration_list[0].reg_confirm,
#"payment_status": registration_list[0].payment_status,
#"five_digits": registration_list[0].five_digits,
"real_name": user_information_list[0].name,
"phone": user_information_list[0].phone,
"email": user_list[0].email
}
if class_list and class_name :
data["class_name"] = class_name.name
data["start_time"] = class_detail_list.start_time
data["end_time"]= class_detail_list.end_time
attend_records.append(data)
return {"msg": "success", "code": 200,"total_num" : count,"attend_record_list":attend_records}
except Exception as e:
return {"msg": str(e), "code": 500}
import pymysql
@classes.get("/copy")
async def copy(
from_id : Optional[int] = None
):
try:
connection = pymysql.connect(
host='db.ptt.cx',
user='choozmo',
password='pAssw0rd',
database='test_copy'
)
class_list_ip = []
with connection.cursor() as cursor:
sql = "SELECT * FROM class_name"
cursor.execute(sql)
connection.commit()
rows = cursor.fetchall()
encode = "HOP"
now = datetime.now()
encode += now.strftime("%Y%m%d")
class_id_list = []
for row in rows:
# print(row)
class_name,created = await Class_name.update_or_create(
name = row[1],
defaults = {
"school_id":1,
"category" : row[3],
"introduction" : row[4],
"organizer" : row[5],
"cover_img" : row[6],
"group_id" :9,
"group_sort" : "希望工程",
"special_class_list_name" :None,
"recommend" : 0,
"is_inner" :1,
"is_check" :1,
"create_user_id" :None,
"create_time" : datetime.now(),
"is_delete" : row[10],
"is_record_point" :0,
"encode" :encode
}
)
#print(class_name.id)
sql = f"SELECT * FROM schools where id = {row[2]}"
cursor.execute(sql)
connection.commit()
school_row = cursor.fetchone()
# print(school_row)
try:
school,school_created = await Schools.update_or_create(
name = school_row[1],
defaults = {
"longitude" : school_row[2],
"latitude" : school_row[3],
"address" :school_row[4],
"update_time" :school_row[5],
"school_introduction":None,
"email":None,
"phone":None,
"school_create_user_id" :None,
"teachers" : None,
"is_delete" : 0,
"is_pass_proposal" : 1
}
)
class_name.school_id = school.id
await class_name.save()
#print("step 1 complete")
sql = f"SELECT * FROM class_list where name_id = {row[0]}"
cursor.execute(sql)
connection.commit()
list_row = cursor.fetchone()
# print(list_row)
event,event_created = await Class_list.update_or_create(
name_id = class_name.id,
defaults = {
"event": list_row[2],
"start_time": list_row[3],
"end_time": list_row[4],
"location": list_row[10],
"lecturer": list_row[9],
"contact": list_row[8],
"content": list_row[11],
"URL": list_row[12],
"people": None,
"fee_method": list_row[19],
"registration_way": list_row[15],
"remark": list_row[16],
"ATM_address":list_row[17],
"fee_payment" :list_row[18],
"create_user_id" :list_row[19],
"event_create_time" : datetime.now(),
"files" : list_row[20]
}
)
#print("step 2 complete")
date,date_create = await Class_date.update_or_create(
class_list_id = event.id,
defaults = {
"registration_start": list_row[5],
"registration_end": list_row[6],
"number_limit": list_row[13],
"amount_left": 0
}
)
#print("step 3 complete")
class_id_list.append(class_name.id)
except :
await Class_name.filter(id = class_name.id).delete()
my_log("error",__name__,f"{row[0]}copy error")
class_delete_list = await Class_name.filter(group_id = 9,group_sort = "希望工程").exclude(id__in = class_id_list).delete()
for class_ in class_delete_list :
print("class_delete:",class_.id)
cursor.close()
connection.close()
return {"msg": "success", "code": 500}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.post("/insert_proposal")
async def insert_class_name(
class_name: str = Form(default=None),
school_id : int = Form(default=None),
category : str = Form(default=None),
introduction: str = Form(default=None),
organizer: str = Form(default=None),
cover_img_file : UploadFile = File(default=None),
fee_method: str = Form(default=None),
number_limit: int = Form(default=None),
number_minimum : int = Form(default=None),
people : str = Form(default=None),
files = Depends(create_upload_files)
):
try:
cover_img = ''
if cover_img_file != None:
contents = await cover_img_file.read()
#save the file
with open(f"{IMAGEDIR}{cover_img_file.filename}", "wb") as f:
f.write(contents)
cover_img = f"{IMAGEDIR_short}{cover_img_file.filename}"
if files:
if files.__contains__("msg"):
return {"msg": files["msg"], "code": 500}
school = await Schools.get_or_none(id = school_id)
if school is None :
return {"msg": f"no this school id {school_id}", "code": 500}
new_proposal,created = await Proposal.get_or_create(
school_id =school_id,
defaults = {
"class_name":class_name,
"category":category,
"introduction":introduction,
"organizer":organizer,
"cover_img":cover_img,
"fee_method":fee_method,
"number_limit":number_limit,
"number_minimum":number_minimum,
"people":people,
"files":files,
"create_time":datetime.now()
}
)
if not created:
if class_name:
new_proposal.class_name = class_name
if category:
new_proposal.category = category
if introduction:
new_proposal.introduction = introduction
if cover_img_file:
new_proposal.cover_img = cover_img
if fee_method:
new_proposal.fee_method = fee_method
if number_limit:
new_proposal.number_limit = number_limit
if number_minimum:
new_proposal.number_minimum = number_minimum
if people:
new_proposal.people = people
if files != "{}":
new_proposal.files = files
await new_proposal.save()
else:
user_id = school.create_user_id
if user_id:
try:
try :
user = await User.get(id = user_id)
user_info = await User_information.get(user_id = user_id)
except:
return {"msg": "請去建立工藝家履歷", "code": 500}
email = user.email
subject = "課程提案通知"
# message = f"親愛的工藝家{user_info.name}您好,
\
# 感謝您埋下工藝種子!
\
# 您的提案將由管理員進行審核中,將於3-7個工作天內e-mail通知您審核結果。
\
# 若7天後尚未收到課程審核通知,請將後台畫面、聯絡資訊等截圖寄送至客服信箱:
\
# craftology@ntcri.gov.tw,以便客服為您查詢。
\
# 註:此封信件為系統自動發送,請勿回信,謝謝。"
with open("/var/www/ntcri/assets/edm/porposal/index.html", 'r', encoding='utf-8') as html_file:
html_template = html_file.read()
message = html_template.replace("{username}",user_info.name)
send_email(email,"",subject,message)
except:
print("have no email")
return {"msg": "success", "code": 200, "new_proposal_id": new_proposal.id}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_proposal")
async def insert_class_name(
id : Optional[int] = None,
school_id : Optional[int] = None
):
try:
proposal_list = Proposal.all()
Q_word = Q()
if id :
Q_word = Q(id=id)
if school_id :
Q_word = Q_word & Q(school_id=school_id)
proposal_list = await proposal_list.filter(Q_word)
result = []
for proposal in proposal_list:
school = await Schools.get_or_none(id=proposal.school_id)
if school is None :
return {"msg": f"no this school,id = {proposal.school_id}", "code": 500}
info = proposal.show_data()
info["school"]=school.show_data()
result.append(info)
return {"msg": "success", "code": 200, "proposal_list": result}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_favorite_course_count")
async def insert_class_name(
class_name_id_list : str = None
):
try:
result = []
if class_name_id_list :
class_name_id = eval(class_name_id_list)
distinct_class_name_ids = list(map(int,class_name_id))
for entry in distinct_class_name_ids:
class_num = await Favorite_course.filter(class_name_id = entry).count()
result.append({"class_name_id":entry,"count":class_num})
else:
# 使用 Tortoise ORM 的 count 方法進行統計
distinct_class_name_ids = await Favorite_course.all().values_list('class_name_id')
# print(distinct_class_name_ids)
for entry in distinct_class_name_ids:
class_num = await Favorite_course.filter(class_name_id = entry[0]).count()
result.append({"class_name_id":entry[0],"count":class_num})
return {"msg": "success", "code": 200, "result": result}
except Exception as e:
return {"msg": str(e), "code": 500}
@classes.get("/get_school_city_count")
async def insert_class_name(
school_city_list : str = None
):
try:
result = []
distinct_class_name_ids = []
if school_city_list :
distinct_class_name_ids = eval(school_city_list)
else:
# 使用 Tortoise ORM 的 count 方法進行統計
distinct_class_name_ids = taiwan_city
# print(distinct_class_name_ids)
for entry in distinct_class_name_ids:
count = 0
if "," in entry:
for tmp in entry.split(","):
count += await Schools.filter(address__icontains=tmp).count()
else:
count = await Schools.filter(address__icontains=entry).count()
result.append({"city":entry,"count":count})
return {"msg": "success", "code": 200, "result": result}
except Exception as e:
return {"msg": str(e), "code": 500}