from fastapi import APIRouter, Form, Depends, HTTPException, File, UploadFile,Request from fastapi.responses import FileResponse, StreamingResponse from fastapi_login.exceptions import InvalidCredentialsException from fastapi_login import LoginManager from fastapi.responses import HTMLResponse from typing import List,Optional import pymysql import numpy as np import math import re import pandas as pd import ast import csv import matplotlib.pyplot as plt from matplotlib.ticker import MaxNLocator import matplotlib import io from datetime import datetime,timedelta from oauth2client.service_account import ServiceAccountCredentials from googleapiclient.discovery import build matplotlib.rc('font', family='STSong') plt.rcParams['font.sans-serif']=['SimHei'] plt.rcParams['axes.unicode_minus']=False pd.set_option('display.max_columns', None) data = APIRouter() #USER_INFO_FILE = '/var/www/ntcri_api/app/api/user_information_change.csv' #ATTEND_RECORD_FILE = '/var/www/ntcri_api/app/api/attend_record_syn.csv' #CLASS_DETAIL_FILE = '/var/www/ntcri_api/app/api/class_detail.csv' #CLASS_LIST_FILE = '/var/www/ntcri_api/app/api/class_list.csv' #CLASS_NAME_FILE = '/var/www/ntcri_api/app/api/class_name1.csv' #SCHOOL_FILE = '/var/www/ntcri_api/app/api/school.csv' def call_sql(sql_query): connection = pymysql.connect( host='db.ptt.cx', user='choozmo', password='pAssw0rd', database='test' ) cursor = connection.cursor() cursor.execute(sql_query) results = cursor.fetchall() df = pd.DataFrame(results, columns=[i[0] for i in cursor.description]) cursor.close() connection.close() return df def match(): # based on 'registration' sql_query = """ SELECT `user_information`.`id` AS `user_information_id`, `birthday`, `gender`, `position`, `registration`.`id` AS `registration_id`, `registration`.`user_id`, `registration`.`create_time`, `class_list`.`id` AS `class_list_id`,`class_list`.`start_time`, `class_list`.`end_time`, `fee_method`, `class_name`.`id` AS `class_name_id`, `class_name`.`name`, `school_id`, `category`,`group_id`,`group_sort`, `is_inner` FROM `registration` JOIN `user_information` ON `registration`.`user_id` = `user_information`.`user_id` JOIN `class_list` ON `registration`.`event_id` = `class_list`.`id` JOIN `class_name` ON `class_list`.`name_id` = `class_name`.`id` """ # based on 'attend_record' ''' sql_query = """ SELECT `user_information`.`id` AS `user_information_id`,`birthday`, `gender`, `position`, `attend_record`.`id` AS `attend_record_id`, `class_detail_id`, `attend_record`.`user_id`, `is_attend`, `class_detail`.`start_time` ,`class_detail`.`end_time`, `sessions`, `hour`, `class_list`.`id` AS `class_list_id`, `event`, `fee_method`, `class_name`.`id` AS `class_name_id`, `class_name`.`name`, `school_id`, `category`,`group_id`,`group_sort`, `is_inner` FROM `attend_record` JOIN `user_information` ON `attend_record`.`user_id` = `user_information`.`user_id` JOIN `class_detail` ON `attend_record`.`class_detail_id` = `class_detail`.`id` JOIN `class_list` ON `class_detail`.`class_list_id` = `class_list`.`id` JOIN `class_name` ON `class_list`.`name_id` = `class_name`.`id` """ ''' match_data = call_sql(sql_query) return match_data def student_info(): def fix_dict_format(s): if isinstance(s, str): return s.encode().decode('unicode_escape') pairs = re.findall(r"'([^']+)':(\d+)", s) fixed_dict = "{" + ", ".join([f"'{key}': {value}" for key, value in pairs]) + "}" return fixed_dict else: return s # 讀檔,把 position 轉成 dictionary 格式 #user_information = pd.read_csv(file) sql_query = """ SELECT * FROM `user_information` """ user_information = call_sql(sql_query) user_information['position'] = user_information['position'].apply(fix_dict_format) user_information['position'].fillna('{}', inplace=True) user_information['position'] = user_information['position'].apply(lambda x: ast.literal_eval(x) if pd.notnull(x) else {}) # only學員 student_information = user_information[user_information['position'].apply(lambda x: x.get('學員', 0)==1)] #user_information[user_information['gender'].str.strip() != '']['gender'] return student_information def count_target_number(df, target, order=None): item_counts = df[target].value_counts(dropna=True) result_df = item_counts.reset_index() result_df.columns = [target, 'Count'] nan_count = df[target].isna().sum() if nan_count > 0: nan_df = pd.DataFrame({target: [np.nan], 'Count': [nan_count]}) result_df = pd.concat([result_df, nan_df], ignore_index=True) if order: result_df[target] = pd.Categorical(result_df[target], categories=order, ordered=True) result_df = result_df.sort_values(by=target) item_list = result_df[target].replace('', '無類別名稱').astype(str).tolist() count_list = result_df['Count'].astype(str).tolist() search_results = {target: str(item_list), "Count": str(count_list)} return search_results def search_specific_items(df, target, search_items): if not search_items: return count_target_number(df, target) all_items = df[target].unique not_in_list = [name for name in search_items if name not in all_items] if not_in_list: return {target: str(not_in_list), "Error": "Not in list"} item_counts = df[df[target].isin(search_items)][target].value_counts() item_list = item_counts.index.astype(str).tolist() count_list = item_counts.values.tolist() search_results = {"Item": item_list, "Count": count_list} return search_results @data.post("/age_bar") async def age_bar(age_group_list: Optional[List[int]] = [18, 25, 35, 45, 55, 65]): def create_labels(age_group_list): labels = [f"{start}-{end-1}" if end != float('inf') else f"{start}+" for start, end in zip(age_group_list[1:-2], age_group_list[2:-1])] labels = [f'{str(age_group_list[1]-1)}-'] + labels + [f'{str(age_group_list[-2])}+'] return labels user_information = student_info() # 生日 birthday = pd.to_datetime(user_information['birthday'], format='%Y-%m-%d') current_date = pd.to_datetime('today') user_information['age'] = ((current_date - birthday).dt.days / 365).round(1).astype(int) bins = [0] + (age_group_list) + [float('inf')] labels = create_labels(bins) user_information['age_group'] = pd.cut(user_information['age'], bins=bins, labels=labels, right=False) age_group_counts = user_information['age_group'].value_counts(sort=False) age_group_label = labels count_list = age_group_counts.values.tolist() search_results = {"Age_group" : str(age_group_label), "Count": str(count_list)} # 按年齡分群 # bins = [0, 18, 25, 35, 45, 55, 65, float('inf')] # labels = ['18-', '18-24', '25-34', '35-44', '45-54', '55-64', '65+'] #user_information['age_group'] = pd.cut(user_information['age'], bins=bins, labels=labels, right=False) return search_results @data.post("/gender_bar") def gender_bar(gender_type: Optional[str] = ''): user_information = student_info() if gender_type.strip() in ["男", "女"]: user_information = user_information[user_information['gender'] == gender_type] gender_counts = (user_information[user_information['gender'].str.strip() != '']['gender']).value_counts() gender_list = gender_counts.index.astype(str).tolist() count_list = gender_counts.values.tolist() search_results = {"Gender" : str(gender_list), "Count": str(count_list)} return search_results @data.post("/reg_gender") async def reg_gender(): target = 'gender' match_data = match() return count_target_number(match_data, 'gender') @data.post("/reg_age") async def reg_age(age_group_list: Optional[List[int]] = [18, 25, 35, 45, 55, 65]): def create_labels(age_group_list): labels = [f"{start}-{end-1}" if end != float('inf') else f"{start}+" for start, end in zip(age_group_list[1:-2], age_group_list[2:-1])] labels = [f'{str(age_group_list[1]-1)}-'] + labels + [f'{str(age_group_list[-2])}+'] return labels match_data = match() # 生日 birthday = pd.to_datetime(match_data['birthday'], format='%Y-%m-%d') current_date = pd.to_datetime('today') match_data['age'] = ((current_date - birthday).dt.days / 365).round(1).astype(int) bins = [0] + (age_group_list) + [float('inf')] labels = create_labels(bins) match_data['age_group'] = pd.cut(match_data['age'], bins=bins, labels=labels, right=False) age_group_counts = match_data['age_group'].value_counts(sort=False) age_group_label = labels count_list = age_group_counts.values.tolist() search_results = {"Age_group" : str(age_group_label), "Count": str(count_list)} # 按年齡分群 # bins = [0, 18, 25, 35, 45, 55, 65, float('inf')] # labels = ['18-', '18-24', '25-34', '35-44', '45-54', '55-64', '65+'] #user_information['age_group'] = pd.cut(user_information['age'], bins=bins, labels=labels, right=False) return search_results def time_analysis_search(df, target, search_items): def time_analysis(df, target, name): df = df[df[target] == name] df['start_time'] = pd.to_datetime(df['start_time']) df['month'] = df['start_time'].dt.to_period('M') monthly_counts = df['month'].value_counts().sort_index() dates_list = monthly_counts.index.astype(str).tolist() count_list = monthly_counts.values.tolist() return (dates_list, count_list) all_items = df[target].unique() search_results = [] for name in search_items or all_items: if name not in all_items: search_results.append({target: name, "Error": "Not in list"}) else: dates_list, count_list = time_analysis(df, target, name) search_results.append({target: name, "Month": str(dates_list), "Count": str(count_list)}) return search_results @data.post("/category_line") async def category_line(category_name: Optional[str] = ''): target = 'category' search_items = category_name.split() match_data = match() search_results = time_analysis_search(match_data, target, search_items) return search_results #@data.post("/group_line") async def group_line(group_id: Optional[str] = ''): #group_name_id = {'未來工藝學群': 1, '技藝工藝學群': 2, '生活工藝學群': 3, '青年工藝學群': 4, '世代工藝學群': 5, # '修護工藝學群': 6, '跨域工藝學群': 7, '線上工藝學群': 8, '希望工程學群': 9} target = 'group_id' search_items = group_id.split() try: search_items = [int(item) for item in search_items] except ValueError as e: return {"msg": str(e)} match_data = match() search_results = time_analysis_search(match_data, target, search_items) return search_results @data.post("/category_bar") async def category_bar(category_name: Optional[str] = ''): target = 'category' search_items = category_name.split() match_data = match() search_results = search_specific_items(match_data, target, search_items) return search_results @data.post("/group_bar") async def group_bar(group_id: Optional[str] = ''): #group_name_id = {'未來工藝學群': 1, '技藝工藝學群': 2, '生活工藝學群': 3, '青年工藝學群': 4, '世代工藝學群': 5, # '修護工藝學群': 6, '跨域工藝學群': 7, '線上工藝學群': 8, '希望工程學群': 9} target = 'group_id' search_items = group_id.split() try: search_items = [int(item) for item in search_items] except ValueError as e: return {"msg": str(e)} match_data = match() search_results = search_specific_items(match_data, target, search_items) return search_results @data.post("/first_course_bar") async def first_course_bar(by_class_name: Optional[int] = 0, by_category: Optional[int] = 0): if by_class_name == by_category == 1: return 'Please select only one.' elif by_class_name == by_category == 0: return 'Please select one.' elif by_class_name not in [0, 1] or by_category not in [0, 1]: return 'Please input 0 or 1.' match_data = match() match_data['create_time'] = pd.to_datetime(match_data['create_time']) earliest_records = match_data.groupby('user_id').agg({'create_time': 'min', 'name': 'first', 'category': 'first'}).reset_index() target = 'name' if by_class_name else 'category' first_class_counts = earliest_records[target].value_counts() course_list = first_class_counts.index.tolist() count_list = first_class_counts.values.tolist() return {target: str(course_list), "Count": str(count_list)} def repeat_time_search(df, target): def repeat_time_analysis(df, target, name): df = df[df[target] == name] counts = df['user_id'].value_counts() repeat_times = counts.value_counts().sort_index() repeat_time_list = repeat_times.index.tolist() count_list = np.array(repeat_times.values).flatten().tolist() return (repeat_time_list, count_list) all_items = df[target].unique() search_results = [] for name in all_items: repeat_time_list, count_list = repeat_time_analysis(df, target, name) search_results.append({target: name, 'repeat_times': str(repeat_time_list), 'count': str(count_list)}) return search_results @data.post("/repeat_participation_bar") async def repeat_participation_bar(by_class_name: Optional[int] = 0, by_category: Optional[int] = 0): if by_class_name == by_category == 1: return 'Please select only one.' elif by_class_name == by_category == 0: return 'Please select one.' elif by_class_name not in [0, 1] or by_category not in [0, 1]: return 'Please input 0 or 1.' target = 'name' if by_class_name else 'category' #search_items = [name.strip("'\"") for name in class_name.split()] match_data = match() search_results = repeat_time_search(match_data, target) return search_results @data.post("/school_city_pie") async def school_city_pie(): #school = pd.read_csv(SCHOOL_FILE) sql_query = """ SELECT * FROM `schools` """ school = call_sql(sql_query) def extract_county_city(address): pattern = r"(.*?[縣市])" if isinstance(address, str): address = address.replace("臺", "台") match = re.search(pattern, address) if match: return match.group(1)[-3:] return None school['county_city'] = school['address'].apply(extract_county_city) city_counts = (school[school['county_city'].str.strip() != '']['county_city']).value_counts() total_count = city_counts.sum() #city_percentages = city_counts / total_count * 100 threshold_percent = 0.01 low_county = city_counts[city_counts / total_count < threshold_percent] city_counts['其他'] = low_county.sum() city_counts = city_counts[city_counts / total_count >= threshold_percent] result = {'City':str(city_counts.index.tolist()), 'Number of school':str(city_counts.values.tolist())} return result @data.post("/class_name_category_pie") async def class_name_category_pie(): sql_query = """ SELECT * FROM `class_name` """ class_name = call_sql(sql_query) results = count_target_number(class_name, 'category') return results #@data.post("/class_detail_category_pie") async def class_detail_category_pie(): sql_query = """ SELECT `class_detail`.`id` AS `class_detail_id`, `class_detail`.`class_list_id`, `class_detail`.`start_time`, `class_list`.`name_id`, `class_name`.`name`, `class_name`.`school_id`, `class_name`.`category`, `class_name`.`group_id` FROM `class_detail` JOIN `class_list` ON `class_detail`.`class_list_id` = `class_list`.`id` JOIN `class_name` ON `class_list`.`name_id` = `class_name`.`id` """ match_data = call_sql(sql_query) results = count_target_number(match_data, 'category') return results #@data.post("/registration_list_name") async def registration_list_name(category: Optional[int] = 0, group_id: Optional[int] = 0): if category == group_id == 1: return 'Please select only one.' elif category == group_id == 0: return 'Please select one.' sql_query = """ SELECT `registration`.`event_id`, `class_list`.`name_id`, `class_name`.`name`, `class_name`.`school_id`, `class_name`.`category`, `class_name`.`group_id` FROM `registration` JOIN `class_list` ON `registration`.`event_id` = `class_list`.`id` JOIN `class_name` ON `class_list`.`name_id` = `class_name`.`id` """ user_event = call_sql(sql_query) if category: return count_target_number(user_event, 'category') if group_id: return count_target_number(user_event, 'group_id') @data.get("/ga4_data") async def ga4_data(start_day: str = datetime.now().date(),end_day:str = datetime.now().date() ,name:str = "screenPageViews",dimensions:str="",page_size: str = "10"): start_time = datetime.strptime(start_day, "%Y-%m-%d").date() #現在時間 end_time = datetime.strptime(end_day, "%Y-%m-%d").date() SCOPES = ['https://www.googleapis.com/auth/analytics.readonly'] KEY_FILE_LOCATION = "./app/api/ntcri-space-400206-38246577c2b2.json" property_id= 'properties/378680283' credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES) analytics = build('analyticsdata', 'v1beta', credentials=credentials) if dimensions == "": body={ "requests": [ { "dateRanges": [ { "startDate": str(start_time), "endDate": str(end_time) } ], "metrics": [ { "name": name } ], "limit": page_size } ] } else: body={ "requests": [ { "dateRanges": [ { "startDate": str(start_time), "endDate": str(end_time) } ], "metrics": [ { "name": name } ], "dimensions": [ #input the dimensions you need { "name": dimensions } ], "limit": page_size } ] } response = analytics.properties().batchRunReports(property=property_id, body=body).execute() return response