import copy import fastapi import fastapi.staticfiles as fastapiStaticfiles import linebot import linebot.models as linebotModels import suggests from GoogleNews import GoogleNews import dataset import datetime from fastapi.responses import HTMLResponse import matplotlib.dates as mdates import matplotlib.pyplot as plt from starlette.responses import StreamingResponse from fastapi.responses import FileResponse import numpy as np from pylab import plt from scipy.interpolate import make_interp_spline from matplotlib import dates import dataset import matplotlib.dates as mdates import random import requests # from linebot.models import ( MessageEvent, TextMessage, TextSendMessage, SourceUser, SourceGroup, SourceRoom, TemplateSendMessage, ConfirmTemplate, MessageAction, ButtonsTemplate, ImageCarouselTemplate, ImageCarouselColumn, URIAction, PostbackAction, DatetimePickerAction, CameraAction, CameraRollAction, LocationAction, CarouselTemplate, CarouselColumn, PostbackEvent, StickerMessage, StickerSendMessage, LocationMessage, LocationSendMessage, ImageMessage, VideoMessage, AudioMessage, FileMessage, UnfollowEvent, FollowEvent, JoinEvent, LeaveEvent, BeaconEvent, MemberJoinedEvent, MemberLeftEvent, FlexSendMessage, BubbleContainer, ImageComponent, BoxComponent, TextComponent, IconComponent, ButtonComponent, SeparatorComponent, QuickReply, QuickReplyButton, ImageSendMessage) #uvicorn newbot:app --host 0.0.0.0 --port 5443 --ssl-keyfile=/etc/letsencrypt/live/api.ptt.cx/privkey.pem --ssl-certfile=/etc/letsencrypt/live/api.ptt.cx/fullchain.pem #uvicorn main:app --host 0.0.0.0 --port 443 --ssl-keyfile=/etc/letsencrypt/live/ptt.cx/privkey.pem --ssl-certfile=/etc/letsencrypt/live/ptt.cx/chain.pem #uvicorn main:app --host 0.0.0.0 --port 443 --key-file=/etc/letsencrypt/live/ptt.cx/privkey.pem --certfile=/etc/letsencrypt/live/ptt.cx/cert.pem # --keyfile=./key.pem --certfile=./cert.pem # --ssl-cert-reqs 1 # # --ssl-ca-certs=/etc/letsencrypt/live/ptt.cx/fullchain.crt app = fastapi.FastAPI() #app.mount( # '/static', fastapiStaticfiles.StaticFiles(directory='static'), name='static') from linebot import ( LineBotApi, WebhookHandler ) from linebot.exceptions import ( InvalidSignatureError ) from linebot.models import ( MessageEvent, TextMessage,ImageSendMessage, TextSendMessage,FlexSendMessage, TemplateSendMessage,CarouselTemplate,ConfirmTemplate,PostbackAction,MessageAction,CarouselColumn,URIAction ) import json import codecs seo=False s_news=False line_bot_api = LineBotApi('Gub58t+8u9z7nTBrZLLvXnCbwDR1Gmyrew3nGFlKRxsmvH/oIMGjjup2DygA4XhV1NenM6dFTO8yvpbtCDezCqP2BAV4eVn2Y63/EYqeTCw1S+oJ+BiLLzC8DdVrWu9jSfp7wXqrVNUxcYdk54eoWAdB04t89/1O/w1cDnyilFU=') handler = WebhookHandler('f761bc6038c94a3baa815124e33dea50') #line_bot_api = LineBotApi('YOUR_CHANNEL_ACCESS_TOKEN') #handler = WebhookHandler('YOUR_CHANNEL_SECRET') def notify_group(msg): glist=['1dbtJHbWVbrooXmQqc4r8OyRWDryjD4TMJ6DiDsdgsX'] for gid in glist: headers = { "Authorization": "Bearer " + gid, "Content-Type": "application/x-www-form-urlencoded" } params = {"message": msg} r = requests.post("https://notify-api.line.me/api/notify",headers=headers, params=params) def get_aws(): result='' db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4') cursor=db.query('SELECT from_unixtime(eventtime) as dt,area FROM hhh.aws_monitor order by eventtime desc limit 4;') for c in cursor: result+=str(c['dt'])+"\n"+c['area']+"\n" return result def get_idea(): result='' db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4') cursor=db.query('SELECT query FROM hhh.gsc_weekly where clicks > 500 order by rand() limit 10;') for c in cursor: result+=str(c['query'])+"\n" return result def get_kw_ls(past=False): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/yodb?charset=utf8mb4') now = '' sql_get_newest_data_date = 'SELECT * FROM trending_searches ORDER BY ts_date DESC' for row in db.query(sql_get_newest_data_date): now = row['ts_date'] break if past: now =now-datetime.timedelta(days=1) now_start = now.strftime("%Y-%m-%d 00:00:00") now_end = now.strftime("%Y-%m-%d 23:59:59") sqls = 'SELECT * FROM trending_searches WHERE "'+now_end+'">= ts_date and "'+now_start+'"<=ts_date ORDER BY ts_date DESC' ls = [] for row in db.query(sqls): ls.append(row['ts_word']) ls = list(dict.fromkeys(ls)) return ls def get_kw(past=False): keys = get_kw_ls(past) title = '熱門搜尋關鍵字' if past == False else '歷史關鍵字走勢' carousel_dict = {"type": "carousel","contents": []} carousel_idx = -1 num = 1 for k in keys: if (num-1)%12 ==0: carousel_dict['contents'] = carousel_dict['contents']+[make_bubble(title,k)] carousel_idx+=1 carousel_dict['contents'][carousel_idx]['body']['contents'] += [make_box(num,k)] num+=1 return carousel_dict @app.get("/aws") async def aws(): result='' result+='製表時間: '+str(datetime.datetime.now())+'

\n\n' result+='' db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4') cursor=db.query('SELECT from_unixtime(eventtime) as dt,area FROM hhh.aws_monitor order by eventtime desc limit 30;') for c in cursor: result+="\n" result+='
"+str(c['dt'])+""+c['area']+"
' return HTMLResponse(content=result, status_code=200) @app.get("/msg/{item_id}") async def coffee_msg(item_id): True line_bot_api.push_message(item_id, TextSendMessage(text='開啟下方完整券樣(密碼9888)至櫃台掃碼兌換。https://txp.rs/v/EvX69b4Xq9')) return {"code":"ok" } @app.post('/callback') async def callback(request: fastapi.Request): signature = request.headers['X-Line-Signature'] body = await request.body() handler.handle(body.decode('utf-8'), signature) return 'OK' @app.get('/get_trend_image') async def get_trend_image(kw_id): #def get_trend_image(kw): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/yodb?charset=utf8mb4') kw = '' sql_get_newest_data_date = 'SELECT iot_kword FROM interest_over_time WHERE iotid="'+kw_id+'"' for row in db.query(sql_get_newest_data_date): kw=row['iot_kword'] sql_get_newest_data_date = 'SELECT * FROM interest_over_time WHERE iot_kword="'+kw+'" ORDER BY iot_date' x_axis = [] y_axis = [] for row in db.query(sql_get_newest_data_date): x_axis += [row['iot_date']] y_axis += [row['iot_value']] x = np.array(x_axis) y = np.array(y_axis) # create an array of numbers for the dates x_dates = np.array([dates.date2num(i) for i in x]) # create more uniform intervals in x axis and use spline to interpolate data x_smooth = np.linspace(x_dates.min(), x_dates.max(), 200) #y_smooth = spline(x_dates, y, x_smooth) y_smooth = make_interp_spline(x_dates, y)(x_smooth) # creating a new date array from the new date number array x_new = np.array([dates.num2date(i) for i in x_smooth]) plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y/%m')) plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=100)) #座標軸刻度1天 #plt.figure() plt.plot(x_new, y_smooth) target_path = 'trend_image/'+kw+'.png' plt.savefig(target_path) plt.clf() return FileResponse(target_path) def get_news_by_kw(keyword): googlenews = GoogleNews(lang='zh-TW') kw=keyword googlenews.set_lang('zh-TW') googlenews.search(kw) resultstr="新聞:" idx=0 rs=googlenews.results() for r in rs: if idx>0: resultstr+=',' else: idx+=1 resultstr+=r['title'] return resultstr # print(r['desc']) # print(r['link']) # print(r['datetime']) def flex_test(): js=json.load(open('test.json','r',encoding='utf-8')) for i in range(3): row_dict = {} row_dict['type'] = 'box' row_dict['layout'] = 'baseline' row_dict['contents']= [ {"type": "text","text": "第"+str(i)+"名","size": "sm","color": "#aaaaaa","flex": 2}, {"type": "text","text": "hello, world"+str(1),"flex": 5,"weight": "regular"} ] js['body']['contents'] = js['body']['contents'] + [row_dict] return js def make_bubble(title,kw): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/yodb?charset=utf8mb4') sql_get_newest_data_date = 'SELECT iotid FROM interest_over_time WHERE iot_kword="'+kw+'"' id='' for row in db.query(sql_get_newest_data_date): id=row['iotid'] id = random.randint(1,1050) box_dict = { "type": "bubble", "hero": { "type": "image", "url": "https://api.ptt.cx:6443/get_trend_image?kw_id="+str(id), "size": "full", "gravity": "top", "margin": "none", "aspectRatio": "20:13" }, "body": { "type": "box", "layout": "vertical", "contents": [ { "type": "text", "text": title+"-"+kw, "size": "xl", "weight": "bold", "align": "center" } ] } } return box_dict def make_box(num, content): row_dict = {} row_dict['type'] = 'box' row_dict['layout'] = 'baseline' #row_dict["action"]= { # "type": "uri", # "label": "action", # "uri": "https://store.line.me" #} row_dict['contents']= [ { "type": "text", "text": "第"+str(num)+"名", "size": "sm", "color": "#aaaaaa", "flex": 2 }, { "type": "text", "text": content, "flex": 5, "weight": "regular" } ] return row_dict def make_carousel(): carousel_dict = {"type": "carousel","contents": []} carousel_idx = -1 for i in range(26): if i%12 ==0: carousel_dict['contents'] = carousel_dict['contents']+[make_bubble()] carousel_idx+=1 carousel_dict['contents'][carousel_idx]['body']['contents'] += [make_box(i,'data'+str(i))] return carousel_dict @handler.add(FollowEvent) def handle_follow(event): print(event.source.user_id) # do something @handler.add(linebotModels.MessageEvent, message=linebotModels.TextMessage) def message_text(event): global seo global s_news ## if event.message.text == 'push': # line_bot_api.push_message( # event.source.user_id, [ # # TextSendMessage(text='PUSH!'), # ] # ) notify_group('User :'+event.message.text) if event.message.text == 's_news': s_news=True line_bot_api.reply_message( event.reply_token, TextSendMessage(text='請輸入要搜尋新聞的關鍵字:')) return if event.message.text == 'q_aws': line_bot_api.reply_message( event.reply_token,[TextSendMessage(text=get_aws()),TextSendMessage(text='完整報告: https://api.ptt.cx:5443/aws')]) return if event.message.text == 'q_idea': s_news=True line_bot_api.reply_message( event.reply_token, TextSendMessage(text=get_idea())) return ############################ if event.message.text == 'hotkeys': js = get_kw(False) line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js)) return if event.message.text == 'past_hotkeys': js = get_kw(True) line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js)) return if event.message.text == '合作提案': line_bot_api.reply_message( event.reply_token, TextSendMessage(text='合作提案請寄信至 : jared@choozmo.com 信箱')) return if event.message.text == '更多功能': line_bot_api.reply_message( event.reply_token, TextSendMessage( text='更多功能', quick_reply=QuickReply( items=[ QuickReplyButton( action=MessageAction(label="合作提案", text="合作提案") ), ]))) return ''' if event.message.text == 'seo': seo=True line_bot_api.reply_message( event.reply_token, TextSendMessage(text='請輸入要找的關鍵字:')) return ''' if event.message.text=='js test': FlexMessage = flex_test() line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',FlexMessage)) return if event.message.text=='熱門關鍵字': js = get_kw(False) line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js)) return if event.message.text=='關鍵字歷史走勢': js = get_kw(True) line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js)) return if event.message.text == '叫': line_bot_api.reply_message( event.reply_token, linebotModels.AudioSendMessage( original_content_url=f'{baseUrl}/static/audio/noot_noot.mp3', duration=1000)) if event.message.text=='ok': line_bot_api.reply_message( event.reply_token, TextSendMessage(text='最欣賞ChoozMo團隊說OK的人!!')) ''' if event.message.text=='c' or event.message.text=='C' : line_bot_api.reply_message( event.reply_token, TextSendMessage( text='快速鍵', quick_reply=QuickReply( items=[ QuickReplyButton( action=MessageAction(label="熱門關鍵字", text="hotkeys") ), QuickReplyButton( action=MessageAction(label="關鍵字歷史走勢", text="past_hotkeys") ), QuickReplyButton( action=MessageAction(label="更多功能", text="更多功能") ), #QuickReplyButton( # action=MessageAction(label="幸福空間靈感", text="q_idea") #), #QuickReplyButton( # action=MessageAction(label="查AWS", text="q_aws") #), #QuickReplyButton( # action=MessageAction(label="關聯字", text="seo") #), ]))) else: if s_news: result=get_news_by_kw(event.message.text) line_bot_api.reply_message( event.reply_token, TextSendMessage(text=result)) s_news=False return if seo: res='相關字:' s = suggests.suggests.get_suggests(event.message.text, source='google') idx=0 for sg in s['suggests']: if idx>0: res+=',' else: idx+=1 res+=sg line_bot_api.reply_message( event.reply_token, TextSendMessage(text=res)) seo=False # print('test') ''' #if __name__ == "__main__": # app.run(host='0.0.0.0', port=443,ssl_context=('/etc/letsencrypt/live/ptt.cx/fullchain.pem', '/etc/letsencrypt/live/ptt.cx/privkey.pem')) # app.run(host='0.0.0.0', port=14404,ssl_context=('/tmp/cert.pem','/tmp/chain.pem' ))