- import copy
- import fastapi
- import fastapi.staticfiles as fastapiStaticfiles
- import linebot
- import linebot.models as linebotModels
- import suggests
- from GoogleNews import GoogleNews
- import dataset
- import datetime
- from fastapi.responses import HTMLResponse
- import matplotlib.dates as mdates
- import matplotlib.pyplot as plt
- from starlette.responses import StreamingResponse
- from fastapi.responses import FileResponse
- import numpy as np
- from pylab import plt
- from scipy.interpolate import make_interp_spline
- from matplotlib import dates
- import dataset
- import matplotlib.dates as mdates
- import random
- #
- from linebot.models import (
- MessageEvent, TextMessage, TextSendMessage,
- SourceUser, SourceGroup, SourceRoom,
- TemplateSendMessage, ConfirmTemplate, MessageAction,
- ButtonsTemplate, ImageCarouselTemplate, ImageCarouselColumn, URIAction,
- PostbackAction, DatetimePickerAction,
- CameraAction, CameraRollAction, LocationAction,
- CarouselTemplate, CarouselColumn, PostbackEvent,
- StickerMessage, StickerSendMessage, LocationMessage, LocationSendMessage,
- ImageMessage, VideoMessage, AudioMessage, FileMessage,
- UnfollowEvent, FollowEvent, JoinEvent, LeaveEvent, BeaconEvent,
- MemberJoinedEvent, MemberLeftEvent,
- FlexSendMessage, BubbleContainer, ImageComponent, BoxComponent,
- TextComponent, IconComponent, ButtonComponent,
- SeparatorComponent, QuickReply, QuickReplyButton,
- ImageSendMessage)
- #uvicorn newbot:app --host --port 5443 --ssl-keyfile=/etc/letsencrypt/live/api.ptt.cx/privkey.pem --ssl-certfile=/etc/letsencrypt/live/api.ptt.cx/fullchain.pem
- #uvicorn main:app --host --port 443 --ssl-keyfile=/etc/letsencrypt/live/ptt.cx/privkey.pem --ssl-certfile=/etc/letsencrypt/live/ptt.cx/chain.pem
- #uvicorn main:app --host --port 443 --key-file=/etc/letsencrypt/live/ptt.cx/privkey.pem --certfile=/etc/letsencrypt/live/ptt.cx/cert.pem
- # --keyfile=./key.pem --certfile=./cert.pem
- # --ssl-cert-reqs 1
- #
- # --ssl-ca-certs=/etc/letsencrypt/live/ptt.cx/fullchain.crt
- app = fastapi.FastAPI()
- #app.mount(
- # '/static', fastapiStaticfiles.StaticFiles(directory='static'), name='static')
- from linebot import (
- LineBotApi, WebhookHandler
- )
- from linebot.exceptions import (
- InvalidSignatureError
- )
- from linebot.models import (
- MessageEvent, TextMessage,ImageSendMessage, TextSendMessage,FlexSendMessage, TemplateSendMessage,CarouselTemplate,ConfirmTemplate,PostbackAction,MessageAction,CarouselColumn,URIAction
- )
- import json
- import codecs
- seo=False
- s_news=False
- line_bot_api = LineBotApi('Gub58t+8u9z7nTBrZLLvXnCbwDR1Gmyrew3nGFlKRxsmvH/oIMGjjup2DygA4XhV1NenM6dFTO8yvpbtCDezCqP2BAV4eVn2Y63/EYqeTCw1S+oJ+BiLLzC8DdVrWu9jSfp7wXqrVNUxcYdk54eoWAdB04t89/1O/w1cDnyilFU=')
- handler = WebhookHandler('f761bc6038c94a3baa815124e33dea50')
- #line_bot_api = LineBotApi('YOUR_CHANNEL_ACCESS_TOKEN')
- #handler = WebhookHandler('YOUR_CHANNEL_SECRET')
- def get_aws():
- result=''
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4')
- cursor=db.query('SELECT from_unixtime(eventtime) as dt,area FROM hhh.aws_monitor order by eventtime desc limit 4;')
- for c in cursor:
- result+=str(c['dt'])+"\n"+c['area']+"\n"
- return result
- def get_idea():
- result=''
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4')
- cursor=db.query('SELECT query FROM hhh.gsc_weekly where clicks > 500 order by rand() limit 10;')
- for c in cursor:
- result+=str(c['query'])+"\n"
- return result
- def get_kw_ls(past=False):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/yodb?charset=utf8mb4')
- now = ''
- sql_get_newest_data_date = 'SELECT * FROM trending_searches ORDER BY ts_date DESC'
- for row in db.query(sql_get_newest_data_date):
- now = row['ts_date']
- break
- if past:
- now =now-datetime.timedelta(days=1)
- now_start = now.strftime("%Y-%m-%d 00:00:00")
- now_end = now.strftime("%Y-%m-%d 23:59:59")
- sqls = 'SELECT * FROM trending_searches WHERE "'+now_end+'">= ts_date and "'+now_start+'"<=ts_date ORDER BY ts_date DESC'
- ls = []
- for row in db.query(sqls):
- ls.append(row['ts_word'])
- ls = list(dict.fromkeys(ls))
- return ls
- def get_kw(past=False):
- keys = get_kw_ls(past)
- title = '熱門搜尋關鍵字' if past == False else '歷史關鍵字走勢'
- carousel_dict = {"type": "carousel","contents": []}
- carousel_idx = -1
- num = 1
- for k in keys:
- if (num-1)%12 ==0:
- carousel_dict['contents'] = carousel_dict['contents']+[make_bubble(title,k)]
- carousel_idx+=1
- carousel_dict['contents'][carousel_idx]['body']['contents'] += [make_box(num,k)]
- num+=1
- return carousel_dict
- @app.get("/aws")
- async def aws():
- result='<html><head><link href="https://getbootstrap.com/docs/4.1/dist/css/bootstrap.min.css" rel="stylesheet"></head><body>'
- result+='製表時間: '+str(datetime.datetime.now())+'</br></br>\n\n'
- result+='<table class="table table-striped">'
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4')
- cursor=db.query('SELECT from_unixtime(eventtime) as dt,area FROM hhh.aws_monitor order by eventtime desc limit 30;')
- for c in cursor:
- result+="<tr><td>"+str(c['dt'])+"</td><td>"+c['area']+"</td></tr>\n"
- result+='</table></body></html>'
- return HTMLResponse(content=result, status_code=200)
- @app.get("/msg/{item_id}")
- async def coffee_msg(item_id):
- True
- line_bot_api.push_message(item_id, TextSendMessage(text='開啟下方完整券樣(密碼9888)至櫃台掃碼兌換。https://txp.rs/v/EvX69b4Xq9'))
- return {"code":"ok" }
- @app.post('/callback')
- async def callback(request: fastapi.Request):
- signature = request.headers['X-Line-Signature']
- body = await request.body()
- handler.handle(body.decode('utf-8'), signature)
- return 'OK'
- @app.get('/get_trend_image')
- async def get_trend_image(kw_id):
- #def get_trend_image(kw):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/yodb?charset=utf8mb4')
- kw = ''
- sql_get_newest_data_date = 'SELECT iot_kword FROM interest_over_time WHERE iotid="'+kw_id+'"'
- for row in db.query(sql_get_newest_data_date):
- kw=row['iot_kword']
- sql_get_newest_data_date = 'SELECT * FROM interest_over_time WHERE iot_kword="'+kw+'" ORDER BY iot_date'
- x_axis = []
- y_axis = []
- for row in db.query(sql_get_newest_data_date):
- x_axis += [row['iot_date']]
- y_axis += [row['iot_value']]
- x = np.array(x_axis)
- y = np.array(y_axis)
- # create an array of numbers for the dates
- x_dates = np.array([dates.date2num(i) for i in x])
- # create more uniform intervals in x axis and use spline to interpolate data
- x_smooth = np.linspace(x_dates.min(), x_dates.max(), 200)
- #y_smooth = spline(x_dates, y, x_smooth)
- y_smooth = make_interp_spline(x_dates, y)(x_smooth)
- # creating a new date array from the new date number array
- x_new = np.array([dates.num2date(i) for i in x_smooth])
- plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y/%m'))
- plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=100)) #座標軸刻度1天
- #plt.figure()
- plt.plot(x_new, y_smooth)
- target_path = 'trend_image/'+kw+'.png'
- plt.savefig(target_path)
- plt.clf()
- return FileResponse(target_path)
- def get_news_by_kw(keyword):
- googlenews = GoogleNews(lang='zh-TW')
- kw=keyword
- googlenews.set_lang('zh-TW')
- googlenews.search(kw)
- resultstr="新聞:"
- idx=0
- rs=googlenews.results()
- for r in rs:
- if idx>0:
- resultstr+=','
- else:
- idx+=1
- resultstr+=r['title']
- return resultstr
- # print(r['desc'])
- # print(r['link'])
- # print(r['datetime'])
- def flex_test():
- js=json.load(open('test.json','r',encoding='utf-8'))
- for i in range(3):
- row_dict = {}
- row_dict['type'] = 'box'
- row_dict['layout'] = 'baseline'
- row_dict['contents']= [
- {"type": "text","text": "第"+str(i)+"名","size": "sm","color": "#aaaaaa","flex": 2},
- {"type": "text","text": "hello, world"+str(1),"flex": 5,"weight": "regular"}
- ]
- js['body']['contents'] = js['body']['contents'] + [row_dict]
- return js
- def make_bubble(title,kw):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/yodb?charset=utf8mb4')
- sql_get_newest_data_date = 'SELECT iotid FROM interest_over_time WHERE iot_kword="'+kw+'"'
- id=''
- for row in db.query(sql_get_newest_data_date):
- id=row['iotid']
- id = random.randint(1,1050)
- box_dict = {
- "type": "bubble",
- "hero": {
- "type": "image",
- "url": "https://api.ptt.cx:6443/get_trend_image?kw_id="+str(id),
- "size": "full",
- "gravity": "top",
- "margin": "none",
- "aspectRatio": "20:13",
- "uri":"https://line.me/R/nv/suggestSettings"
- },
- "body": {
- "type": "box",
- "layout": "vertical",
- "contents": [
- {
- "type": "text",
- "text": title+"-"+kw,
- "size": "xl",
- "weight": "bold",
- "align": "center"
- }
- ]
- }
- }
- return box_dict
- def make_box(num, content):
- row_dict = {}
- row_dict['type'] = 'box'
- row_dict['layout'] = 'baseline'
- row_dict['contents']= [
- {
- "type": "text",
- "text": "第"+str(num)+"名",
- "size": "sm",
- "color": "#aaaaaa",
- "flex": 2
- },
- {
- "type": "text",
- "text": content,
- "flex": 5,
- "weight": "regular",
- "uri": "https://github.com/OpenShot/libopenshot"
- }
- ]
- return row_dict
- def make_carousel():
- carousel_dict = {"type": "carousel","contents": []}
- carousel_idx = -1
- for i in range(26):
- if i%12 ==0:
- carousel_dict['contents'] = carousel_dict['contents']+[make_bubble()]
- carousel_idx+=1
- carousel_dict['contents'][carousel_idx]['body']['contents'] += [make_box(i,'data'+str(i))]
- return carousel_dict
- @handler.add(FollowEvent)
- def handle_follow(event):
- print(event.source.user_id)
- # do something
- @handler.add(linebotModels.MessageEvent, message=linebotModels.TextMessage)
- def message_text(event):
- global seo
- global s_news
- ## if event.message.text == 'push':
- # line_bot_api.push_message(
- # event.source.user_id, [
- # # TextSendMessage(text='PUSH!'),
- # ]
- # )
- if event.message.text == 's_news':
- s_news=True
- line_bot_api.reply_message(
- event.reply_token,
- TextSendMessage(text='請輸入要搜尋新聞的關鍵字:'))
- return
- if event.message.text == 'q_aws':
- line_bot_api.reply_message(
- event.reply_token,[TextSendMessage(text=get_aws()),TextSendMessage(text='完整報告: https://api.ptt.cx:5443/aws')])
- return
- if event.message.text == 'q_idea':
- s_news=True
- line_bot_api.reply_message(
- event.reply_token,
- TextSendMessage(text=get_idea()))
- return
- ############################
- if event.message.text == 'hotkeys':
- js = get_kw(False)
- line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
- return
- if event.message.text == 'past_hotkeys':
- js = get_kw(True)
- line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
- return
- if event.message.text == '更多功能':
- line_bot_api.reply_message(
- event.reply_token,
- TextSendMessage(text='功能建置中,請耐心等候'))
- return
- '''
- if event.message.text == 'seo':
- seo=True
- line_bot_api.reply_message(
- event.reply_token,
- TextSendMessage(text='請輸入要找的關鍵字:'))
- return
- '''
- if event.message.text=='js test':
- FlexMessage = flex_test()
- line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',FlexMessage))
- return
- if event.message.text=='熱門關鍵字':
- js = get_kw(False)
- line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
- return
- if event.message.text=='關鍵字歷史走勢':
- js = get_kw(True)
- line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
- return
- if event.message.text == '叫':
- line_bot_api.reply_message(
- event.reply_token, linebotModels.AudioSendMessage(
- original_content_url=f'{baseUrl}/static/audio/noot_noot.mp3', duration=1000))
- if event.message.text=='ok':
- line_bot_api.reply_message(
- event.reply_token,
- TextSendMessage(text='最欣賞ChoozMo團隊說OK的人!!'))
- if event.message.text=='c' or event.message.text=='C' :
- line_bot_api.reply_message(
- event.reply_token,
- TextSendMessage(
- text='快速鍵',
- quick_reply=QuickReply(
- items=[
- QuickReplyButton(
- action=MessageAction(label="熱門關鍵字", text="hotkeys")
- ),
- QuickReplyButton(
- action=MessageAction(label="關鍵字歷史走勢", text="past_hotkeys")
- ),
- QuickReplyButton(
- action=MessageAction(label="更多功能", text="更多功能")
- ),
- #QuickReplyButton(
- # action=MessageAction(label="幸福空間靈感", text="q_idea")
- #),
- #QuickReplyButton(
- # action=MessageAction(label="查AWS", text="q_aws")
- #),
- #QuickReplyButton(
- # action=MessageAction(label="關聯字", text="seo")
- #),
- ])))
- else:
- if s_news:
- result=get_news_by_kw(event.message.text)
- line_bot_api.reply_message(
- event.reply_token,
- TextSendMessage(text=result))
- s_news=False
- return
- if seo:
- res='相關字:'
- s = suggests.suggests.get_suggests(event.message.text, source='google')
- idx=0
- for sg in s['suggests']:
- if idx>0:
- res+=','
- else:
- idx+=1
- res+=sg
- line_bot_api.reply_message(
- event.reply_token,
- TextSendMessage(text=res))
- seo=False
- # print('test')
- #if __name__ == "__main__":
- # app.run(host='', port=443,ssl_context=('/etc/letsencrypt/live/ptt.cx/fullchain.pem', '/etc/letsencrypt/live/ptt.cx/privkey.pem'))
- # app.run(host='', port=14404,ssl_context=('/tmp/cert.pem','/tmp/chain.pem' ))