|
- from fastapi import APIRouter, Form, Depends, HTTPException, File, UploadFile,Request
- from fastapi.responses import FileResponse, StreamingResponse
- from fastapi_login.exceptions import InvalidCredentialsException
- from fastapi_login import LoginManager
- from fastapi.responses import HTMLResponse
- from typing import List,Optional
- import pymysql
- import numpy as np
- import math
- import re
- import pandas as pd
- import ast
- import csv
- import matplotlib.pyplot as plt
- from matplotlib.ticker import MaxNLocator
- import matplotlib
- import io
- from datetime import datetime,timedelta
- from oauth2client.service_account import ServiceAccountCredentials
- from googleapiclient.discovery import build
- matplotlib.rc('font', family='STSong')
- plt.rcParams['font.sans-serif']=['SimHei']
- plt.rcParams['axes.unicode_minus']=False
- pd.set_option('display.max_columns', None)
- data = APIRouter()
- #USER_INFO_FILE = '/var/www/ntcri_api/app/api/user_information_change.csv'
- #ATTEND_RECORD_FILE = '/var/www/ntcri_api/app/api/attend_record_syn.csv'
- #CLASS_DETAIL_FILE = '/var/www/ntcri_api/app/api/class_detail.csv'
- #CLASS_LIST_FILE = '/var/www/ntcri_api/app/api/class_list.csv'
- #CLASS_NAME_FILE = '/var/www/ntcri_api/app/api/class_name1.csv'
- #SCHOOL_FILE = '/var/www/ntcri_api/app/api/school.csv'
- def call_sql(sql_query):
- connection = pymysql.connect(
- host='db.ptt.cx',
- user='choozmo',
- password='pAssw0rd',
- database='test'
- )
- cursor = connection.cursor()
- cursor.execute(sql_query)
- results = cursor.fetchall()
- df = pd.DataFrame(results, columns=[i[0] for i in cursor.description])
- cursor.close()
- connection.close()
-
- return df
- def match():
-
- # based on 'registration'
- sql_query = """
- SELECT
- `user_information`.`id` AS `user_information_id`, `birthday`, `gender`, `position`,
- `registration`.`id` AS `registration_id`, `registration`.`user_id`, `registration`.`create_time`,
- `class_list`.`id` AS `class_list_id`,`class_list`.`start_time`, `class_list`.`end_time`, `fee_method`,
- `class_name`.`id` AS `class_name_id`, `class_name`.`name`, `school_id`, `category`,`group_id`,`group_sort`, `is_inner`
- FROM `registration`
- JOIN `user_information` ON `registration`.`user_id` = `user_information`.`user_id`
- JOIN `class_list` ON `registration`.`event_id` = `class_list`.`id`
- JOIN `class_name` ON `class_list`.`name_id` = `class_name`.`id`
- """
-
- # based on 'attend_record'
- '''
- sql_query = """
- SELECT
- `user_information`.`id` AS `user_information_id`,`birthday`, `gender`, `position`,
- `attend_record`.`id` AS `attend_record_id`, `class_detail_id`, `attend_record`.`user_id`, `is_attend`,
- `class_detail`.`start_time` ,`class_detail`.`end_time`, `sessions`, `hour`,
- `class_list`.`id` AS `class_list_id`, `event`, `fee_method`,
- `class_name`.`id` AS `class_name_id`, `class_name`.`name`, `school_id`, `category`,`group_id`,`group_sort`, `is_inner`
- FROM `attend_record`
- JOIN `user_information` ON `attend_record`.`user_id` = `user_information`.`user_id`
- JOIN `class_detail` ON `attend_record`.`class_detail_id` = `class_detail`.`id`
- JOIN `class_list` ON `class_detail`.`class_list_id` = `class_list`.`id`
- JOIN `class_name` ON `class_list`.`name_id` = `class_name`.`id`
- """
- '''
- match_data = call_sql(sql_query)
- return match_data
- def student_info():
- def fix_dict_format(s):
- if isinstance(s, str):
- return s.encode().decode('unicode_escape')
- pairs = re.findall(r"'([^']+)':(\d+)", s)
- fixed_dict = "{" + ", ".join([f"'{key}': {value}" for key, value in pairs]) + "}"
- return fixed_dict
- else:
- return s
-
- # 讀檔,把 position 轉成 dictionary 格式
- #user_information = pd.read_csv(file)
- sql_query = """
- SELECT * FROM `user_information`
- """
- user_information = call_sql(sql_query)
-
- user_information['position'] = user_information['position'].apply(fix_dict_format)
- user_information['position'].fillna('{}', inplace=True)
- user_information['position'] = user_information['position'].apply(lambda x: ast.literal_eval(x) if pd.notnull(x) else {})
-
- # only學員
- student_information = user_information[user_information['position'].apply(lambda x: x.get('學員', 0)==1)]
- #user_information[user_information['gender'].str.strip() != '']['gender']
- return student_information
- def count_target_number(df, target, order=None):
- item_counts = df[target].value_counts(dropna=True)
- result_df = item_counts.reset_index()
- result_df.columns = [target, 'Count']
- nan_count = df[target].isna().sum()
- if nan_count > 0:
- nan_df = pd.DataFrame({target: [np.nan], 'Count': [nan_count]})
- result_df = pd.concat([result_df, nan_df], ignore_index=True)
-
- if order:
- result_df[target] = pd.Categorical(result_df[target], categories=order, ordered=True)
- result_df = result_df.sort_values(by=target)
-
- item_list = result_df[target].replace('', '無類別名稱').astype(str).tolist()
- count_list = result_df['Count'].astype(str).tolist()
-
- search_results = {target: str(item_list), "Count": str(count_list)}
-
- return search_results
- def search_specific_items(df, target, search_items):
-
-
- if not search_items:
- return count_target_number(df, target)
-
- all_items = df[target].unique
-
- not_in_list = [name for name in search_items if name not in all_items]
- if not_in_list:
- return {target: str(not_in_list), "Error": "Not in list"}
-
- item_counts = df[df[target].isin(search_items)][target].value_counts()
- item_list = item_counts.index.astype(str).tolist()
- count_list = item_counts.values.tolist()
- search_results = {"Item": item_list, "Count": count_list}
-
- return search_results
-
-
- @data.post("/age_bar")
- async def age_bar(age_group_list: Optional[List[int]] = [18, 25, 35, 45, 55, 65]):
- def create_labels(age_group_list):
- labels = [f"{start}-{end-1}" if end != float('inf') else f"{start}+" for start, end in zip(age_group_list[1:-2], age_group_list[2:-1])]
- labels = [f'{str(age_group_list[1]-1)}-'] + labels + [f'{str(age_group_list[-2])}+']
- return labels
-
- user_information = student_info()
-
- # 生日
- birthday = pd.to_datetime(user_information['birthday'], format='%Y-%m-%d')
- current_date = pd.to_datetime('today')
- user_information['age'] = ((current_date - birthday).dt.days / 365).round(1).astype(int)
- bins = [0] + (age_group_list) + [float('inf')]
- labels = create_labels(bins)
- user_information['age_group'] = pd.cut(user_information['age'], bins=bins, labels=labels, right=False)
- age_group_counts = user_information['age_group'].value_counts(sort=False)
- age_group_label = labels
- count_list = age_group_counts.values.tolist()
- search_results = {"Age_group" : str(age_group_label), "Count": str(count_list)}
-
- # 按年齡分群
- # bins = [0, 18, 25, 35, 45, 55, 65, float('inf')]
- # labels = ['18-', '18-24', '25-34', '35-44', '45-54', '55-64', '65+']
- #user_information['age_group'] = pd.cut(user_information['age'], bins=bins, labels=labels, right=False)
- return search_results
- @data.post("/gender_bar")
- def gender_bar(gender_type: Optional[str] = ''):
-
- user_information = student_info()
-
- if gender_type.strip() in ["男", "女"]:
- user_information = user_information[user_information['gender'] == gender_type]
-
- gender_counts = (user_information[user_information['gender'].str.strip() != '']['gender']).value_counts()
- gender_list = gender_counts.index.astype(str).tolist()
- count_list = gender_counts.values.tolist()
- search_results = {"Gender" : str(gender_list), "Count": str(count_list)}
-
- return search_results
- @data.post("/reg_gender")
- async def reg_gender():
- target = 'gender'
- match_data = match()
-
- return count_target_number(match_data, 'gender')
- @data.post("/reg_age")
- async def reg_age(age_group_list: Optional[List[int]] = [18, 25, 35, 45, 55, 65]):
- def create_labels(age_group_list):
- labels = [f"{start}-{end-1}" if end != float('inf') else f"{start}+" for start, end in zip(age_group_list[1:-2], age_group_list[2:-1])]
- labels = [f'{str(age_group_list[1]-1)}-'] + labels + [f'{str(age_group_list[-2])}+']
- return labels
-
- match_data = match()
-
- # 生日
- birthday = pd.to_datetime(match_data['birthday'], format='%Y-%m-%d')
- current_date = pd.to_datetime('today')
- match_data['age'] = ((current_date - birthday).dt.days / 365).round(1).astype(int)
- bins = [0] + (age_group_list) + [float('inf')]
- labels = create_labels(bins)
- match_data['age_group'] = pd.cut(match_data['age'], bins=bins, labels=labels, right=False)
- age_group_counts = match_data['age_group'].value_counts(sort=False)
- age_group_label = labels
- count_list = age_group_counts.values.tolist()
- search_results = {"Age_group" : str(age_group_label), "Count": str(count_list)}
-
- # 按年齡分群
- # bins = [0, 18, 25, 35, 45, 55, 65, float('inf')]
- # labels = ['18-', '18-24', '25-34', '35-44', '45-54', '55-64', '65+']
- #user_information['age_group'] = pd.cut(user_information['age'], bins=bins, labels=labels, right=False)
- return search_results
-
- def time_analysis_search(df, target, search_items):
- def time_analysis(df, target, name):
- df = df[df[target] == name]
- df['start_time'] = pd.to_datetime(df['start_time'])
- df['month'] = df['start_time'].dt.to_period('M')
- monthly_counts = df['month'].value_counts().sort_index()
- dates_list = monthly_counts.index.astype(str).tolist()
- count_list = monthly_counts.values.tolist()
- return (dates_list, count_list)
-
- all_items = df[target].unique()
-
- search_results = []
- for name in search_items or all_items:
- if name not in all_items:
- search_results.append({target: name, "Error": "Not in list"})
- else:
- dates_list, count_list = time_analysis(df, target, name)
- search_results.append({target: name, "Month": str(dates_list), "Count": str(count_list)})
- return search_results
- @data.post("/category_line")
- async def category_line(category_name: Optional[str] = ''):
- target = 'category'
- search_items = category_name.split()
-
- match_data = match()
- search_results = time_analysis_search(match_data, target, search_items)
-
- return search_results
- #@data.post("/group_line")
- async def group_line(group_id: Optional[str] = ''):
- #group_name_id = {'未來工藝學群': 1, '技藝工藝學群': 2, '生活工藝學群': 3, '青年工藝學群': 4, '世代工藝學群': 5,
- # '修護工藝學群': 6, '跨域工藝學群': 7, '線上工藝學群': 8, '希望工程學群': 9}
- target = 'group_id'
-
- search_items = group_id.split()
- try:
- search_items = [int(item) for item in search_items]
- except ValueError as e:
- return {"msg": str(e)}
-
- match_data = match()
- search_results = time_analysis_search(match_data, target, search_items)
-
- return search_results
- @data.post("/category_bar")
- async def category_bar(category_name: Optional[str] = ''):
- target = 'category'
-
- search_items = category_name.split()
-
- match_data = match()
- search_results = search_specific_items(match_data, target, search_items)
-
- return search_results
- @data.post("/group_bar")
- async def group_bar(group_id: Optional[str] = ''):
- #group_name_id = {'未來工藝學群': 1, '技藝工藝學群': 2, '生活工藝學群': 3, '青年工藝學群': 4, '世代工藝學群': 5,
- # '修護工藝學群': 6, '跨域工藝學群': 7, '線上工藝學群': 8, '希望工程學群': 9}
- target = 'group_id'
-
- search_items = group_id.split()
- try:
- search_items = [int(item) for item in search_items]
- except ValueError as e:
- return {"msg": str(e)}
-
- match_data = match()
- search_results = search_specific_items(match_data, target, search_items)
-
- return search_results
- @data.post("/first_course_bar")
- async def first_course_bar(by_class_name: Optional[int] = 0, by_category: Optional[int] = 0):
-
- if by_class_name == by_category == 1:
- return 'Please select only one.'
- elif by_class_name == by_category == 0:
- return 'Please select one.'
- elif by_class_name not in [0, 1] or by_category not in [0, 1]:
- return 'Please input 0 or 1.'
-
- match_data = match()
-
- match_data['create_time'] = pd.to_datetime(match_data['create_time'])
- earliest_records = match_data.groupby('user_id').agg({'create_time': 'min', 'name': 'first', 'category': 'first'}).reset_index()
-
- target = 'name' if by_class_name else 'category'
-
- first_class_counts = earliest_records[target].value_counts()
- course_list = first_class_counts.index.tolist()
- count_list = first_class_counts.values.tolist()
- return {target: str(course_list), "Count": str(count_list)}
- def repeat_time_search(df, target):
- def repeat_time_analysis(df, target, name):
- df = df[df[target] == name]
- counts = df['user_id'].value_counts()
-
- repeat_times = counts.value_counts().sort_index()
- repeat_time_list = repeat_times.index.tolist()
- count_list = np.array(repeat_times.values).flatten().tolist()
- return (repeat_time_list, count_list)
- all_items = df[target].unique()
-
- search_results = []
- for name in all_items:
- repeat_time_list, count_list = repeat_time_analysis(df, target, name)
- search_results.append({target: name, 'repeat_times': str(repeat_time_list), 'count': str(count_list)})
- return search_results
- @data.post("/repeat_participation_bar")
- async def repeat_participation_bar(by_class_name: Optional[int] = 0, by_category: Optional[int] = 0):
-
- if by_class_name == by_category == 1:
- return 'Please select only one.'
- elif by_class_name == by_category == 0:
- return 'Please select one.'
- elif by_class_name not in [0, 1] or by_category not in [0, 1]:
- return 'Please input 0 or 1.'
-
- target = 'name' if by_class_name else 'category'
-
- #search_items = [name.strip("'\"") for name in class_name.split()]
-
- match_data = match()
- search_results = repeat_time_search(match_data, target)
- return search_results
- @data.post("/school_city_pie")
- async def school_city_pie():
- #school = pd.read_csv(SCHOOL_FILE)
- sql_query = """
- SELECT * FROM `schools`
- """
- school = call_sql(sql_query)
-
- def extract_county_city(address):
- pattern = r"(.*?[縣市])"
- if isinstance(address, str):
- address = address.replace("臺", "台")
- match = re.search(pattern, address)
- if match:
- return match.group(1)[-3:]
- return None
- school['county_city'] = school['address'].apply(extract_county_city)
- city_counts = (school[school['county_city'].str.strip() != '']['county_city']).value_counts()
- total_count = city_counts.sum()
- #city_percentages = city_counts / total_count * 100
- threshold_percent = 0.01
- low_county = city_counts[city_counts / total_count < threshold_percent]
- city_counts['其他'] = low_county.sum()
- city_counts = city_counts[city_counts / total_count >= threshold_percent]
- result = {'City':str(city_counts.index.tolist()), 'Number of school':str(city_counts.values.tolist())}
-
- return result
-
-
- @data.post("/class_name_category_pie")
- async def class_name_category_pie():
-
- sql_query = """
- SELECT * FROM `class_name`
- """
- class_name = call_sql(sql_query)
- results = count_target_number(class_name, 'category')
-
- return results
- #@data.post("/class_detail_category_pie")
- async def class_detail_category_pie():
-
- sql_query = """
- SELECT `class_detail`.`id` AS `class_detail_id`, `class_detail`.`class_list_id`, `class_detail`.`start_time`, `class_list`.`name_id`,
- `class_name`.`name`, `class_name`.`school_id`, `class_name`.`category`, `class_name`.`group_id`
- FROM `class_detail`
- JOIN `class_list` ON `class_detail`.`class_list_id` = `class_list`.`id`
- JOIN `class_name` ON `class_list`.`name_id` = `class_name`.`id`
- """
- match_data = call_sql(sql_query)
-
- results = count_target_number(match_data, 'category')
- return results
- #@data.post("/registration_list_name")
- async def registration_list_name(category: Optional[int] = 0, group_id: Optional[int] = 0):
-
- if category == group_id == 1:
- return 'Please select only one.'
- elif category == group_id == 0:
- return 'Please select one.'
-
- sql_query = """
- SELECT `registration`.`event_id`, `class_list`.`name_id`, `class_name`.`name`, `class_name`.`school_id`, `class_name`.`category`, `class_name`.`group_id`
- FROM `registration`
- JOIN `class_list` ON `registration`.`event_id` = `class_list`.`id`
- JOIN `class_name` ON `class_list`.`name_id` = `class_name`.`id`
- """
- user_event = call_sql(sql_query)
- if category:
- return count_target_number(user_event, 'category')
- if group_id:
- return count_target_number(user_event, 'group_id')
- @data.get("/ga4_data")
- async def ga4_data(start_day: str = datetime.now().date(),end_day:str = datetime.now().date() ,name:str = "screenPageViews",dimensions:str="",page_size: str = "10"):
- start_time = datetime.strptime(start_day, "%Y-%m-%d").date() #現在時間
- end_time = datetime.strptime(end_day, "%Y-%m-%d").date()
- SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
- KEY_FILE_LOCATION = "./app/api/ntcri-space-400206-38246577c2b2.json"
- property_id= 'properties/378680283'
- credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES)
- analytics = build('analyticsdata', 'v1beta', credentials=credentials)
- if dimensions == "":
- body={
- "requests": [
- {
- "dateRanges": [
- {
- "startDate": str(start_time),
- "endDate": str(end_time)
- }
- ],
- "metrics": [
- {
- "name": name
- }
- ],
- "limit": page_size
- }
- ]
- }
- else:
- body={
- "requests": [
- {
- "dateRanges": [
- {
- "startDate": str(start_time),
- "endDate": str(end_time)
- }
- ],
- "metrics": [
- {
- "name": name
- }
- ],
- "dimensions": [ #input the dimensions you need
- {
- "name": dimensions
- }
- ],
- "limit": page_size
- }
- ]
- }
- response = analytics.properties().batchRunReports(property=property_id, body=body).execute()
- return response
|