from fastapi import FastAPI, Form, UploadFile, File, HTTPException, Request, Response import uvicorn from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from datetime import datetime from fastapi.staticfiles import StaticFiles from datetime import datetime from fastapi.responses import RedirectResponse import logging from logging.handlers import TimedRotatingFileHandler from urllib.parse import unquote import requests as r param = 'QHLPQyttnL' result = r.get('http://cmm.ai:3001/api/push/' + param + '?status=up&msg=OK&ping=') # http://cmm.ai:3001/api/push/cDJyD2r3ae?status=up&msg=OK&ping= log_folder = 'log' log_file = f'{log_folder}/app.log' # 日志格式 log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' date_format = '%Y-%m-%d %H:%M:%S' class URLDecodingFormatter(logging.Formatter): def format(self, record): # 使用父类的 format 方法获取原始的日志消息 original_message = super().format(record) # 解码 URL 编码部分 decoded_message = unquote(original_message) return decoded_message # 清除所有默认处理器 for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) # 创建日志处理器 file_handler = TimedRotatingFileHandler( log_file, when='midnight', interval=1, backupCount=14 # 保留14天的日志 ) file_handler.setFormatter(URLDecodingFormatter(log_format, datefmt=date_format)) # 获取根日志对象并添加处理器 logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(file_handler) app = FastAPI() # app.add_middleware(HTTPSRedirectMiddleware) # app.add_middleware(TrustedHostMiddleware) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.mount("/static", StaticFiles(directory="static"), name="static") # 根目錄導向docs @app.get("/") async def root(): logging.info("Root endpoint was called") return RedirectResponse(url="/docs#") from api.tts_router import ttsRouter from api.db_router import dbRouter from api.tendent_router import tendentRouter # from api.speech2text import router # from api.tts_try import ttsTryRouter app.include_router(ttsRouter, prefix="", tags=["文字轉語音"]) app.include_router(dbRouter, prefix="", tags=["supa 操作相關"]) app.include_router(tendentRouter, prefix="", tags=["天燈"]) # app.include_router(router, prefix='/speech2text', tags=["speech2text"]) # app.include_router(ttsTryRouter, prefix='/ttsTry', tags=["測試本地端tts"]) @app.get("/ad") def read_root(language :str = "ch"): message = {} if language == "ch" : message = { "type": "store", "body": { "cover_img": "https://cmm.ai:9101/static/ad_img/ad-img.png", "title": "台北101國際貴賓卡", "description":"國際貴賓卡專屬禮遇\n●即日起來台北101,提供2024年特別禮遇-申辦台北101國際貴賓卡,可享用國際旅客限定專屬三重好禮:\n●購物-品牌9折起特別優惠\n●禮遇-Welcome Pack+ NTD300現金折抵券\n●退稅-消費2000元以上提供5%快速退稅服務\n立即申辦", "date": "即日起", "price": "", "original_price": "", "website_url": "", "store_info_url": "", "included": [], "branch": [], "location" : "" }, } else : message = { "type": "store", "body": { "cover_img": "https://cmm.ai:9101/static/ad_img/ad-img.png", "title": "Taipei 101 International VIP Card", "description":"TOURIST CARD Exclusive Privileges\nStarting today at Taipei 101, we are offering special privileges for the year 2024 - apply for the Taipei 101 Tourist Card and enjoy exclusive triple benefits reserved for international travelers.\n● Shopping - Special offers starting from 10% off brand items.\n● PRIVILEGES-Welcome Pack + NTD300 cash voucher.\n● TAX REFUND- Offering 5% expedited processing service.\nApply now", "date": "Starting from today", "price": "", "original_price": "", "website_url": "", "store_info_url": "", "included": [], "branch": [], "location" : "" }, } return {"data": message} from api.image_operate import remove_background,detect_face @app.post("/image_check") async def image_check(image_file : UploadFile): currentDateAndTime = datetime.now() imgname = currentDateAndTime.strftime("%m-%d-%H-%M-%S")+ "-" + image_file.filename with open(f"/home/mia/101/static/image/{imgname}","wb") as save_img : contents = await image_file.read() save_img.write(contents) # await remove_background(f"/home/mia/101/static/image/{imgname}",f"/home/mia/101/static/image/remove/{imgname}") result = await detect_face(f"/home/mia/101/static/image/{imgname}") return result from fastapi.responses import FileResponse @app.get("/stream.m3u8") async def get_m3u8(): return FileResponse("static/stream/stream.m3u8") # @app.get("/segment/{segment_name}") # async def get_segment(segment_name: str): # return FileResponse(f"/home/mia/101/static/stream/segment_{segment_name}") app.mount("/segment", StaticFiles(directory="static/stream"), name="stream") from pathlib import Path import os from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() TS_DIRECTORY = Path("/home/mia/101/static/stream") MP3_DIRECTORY = Path("/home/mia/101/static/mp3") def clean_old_files(): ts_files = list(TS_DIRECTORY.glob("segment_*.ts")) ts_files.sort(key=lambda f: f.stat().st_mtime) # 按文件名排序,最旧的文件在前 if len(ts_files) > 20: files_to_delete = ts_files[:len(ts_files) - 20] # 超过 20 个的文件需要删除 for file in files_to_delete: try: os.remove(file) print(f"Deleted old file: {file}") except Exception as e: print(f"Error deleting file {file}: {e}") mp3_files = list(MP3_DIRECTORY.glob("recording_*.mp3")) mp3_files.sort(key=lambda f: f.stat().st_mtime) # 按文件名排序,最旧的文件在前 if len(mp3_files) > 20: files_to_delete = mp3_files[:len(mp3_files) - 20] # 超过 20 个的文件需要删除 for file in files_to_delete: try: os.remove(file) print(f"Deleted old file: {file}") except Exception as e: print(f"Error deleting file {file}: {e}") # 添加定时任务 scheduler.add_job(clean_old_files, 'interval', minutes=1) scheduler.start() @app.on_event("shutdown") def shutdown_event(): scheduler.shutdown() from starlette.responses import JSONResponse @app.get("/health") # 使用網址給 kuma 監測 async def health_check(): return JSONResponse(content={"status": "ok"}, status_code=200) if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=9101, reload=False, log_config=None)