import copy
import fastapi
import fastapi.staticfiles as fastapiStaticfiles
import linebot
import linebot.models as linebotModels
import suggests
from GoogleNews import GoogleNews
import dataset
import datetime
from fastapi.responses import HTMLResponse
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
#
from linebot.models import (
MessageEvent, TextMessage, TextSendMessage,
SourceUser, SourceGroup, SourceRoom,
TemplateSendMessage, ConfirmTemplate, MessageAction,
ButtonsTemplate, ImageCarouselTemplate, ImageCarouselColumn, URIAction,
PostbackAction, DatetimePickerAction,
CameraAction, CameraRollAction, LocationAction,
CarouselTemplate, CarouselColumn, PostbackEvent,
StickerMessage, StickerSendMessage, LocationMessage, LocationSendMessage,
ImageMessage, VideoMessage, AudioMessage, FileMessage,
UnfollowEvent, FollowEvent, JoinEvent, LeaveEvent, BeaconEvent,
MemberJoinedEvent, MemberLeftEvent,
FlexSendMessage, BubbleContainer, ImageComponent, BoxComponent,
TextComponent, IconComponent, ButtonComponent,
SeparatorComponent, QuickReply, QuickReplyButton,
ImageSendMessage)
#uvicorn newbot:app --host 0.0.0.0 --port 5443 --ssl-keyfile=/etc/letsencrypt/live/api.ptt.cx/privkey.pem --ssl-certfile=/etc/letsencrypt/live/api.ptt.cx/fullchain.pem
#uvicorn main:app --host 0.0.0.0 --port 443 --ssl-keyfile=/etc/letsencrypt/live/ptt.cx/privkey.pem --ssl-certfile=/etc/letsencrypt/live/ptt.cx/chain.pem
#uvicorn main:app --host 0.0.0.0 --port 443 --key-file=/etc/letsencrypt/live/ptt.cx/privkey.pem --certfile=/etc/letsencrypt/live/ptt.cx/cert.pem
# --keyfile=./key.pem --certfile=./cert.pem
# --ssl-cert-reqs 1
#
# --ssl-ca-certs=/etc/letsencrypt/live/ptt.cx/fullchain.crt
app = fastapi.FastAPI()
#app.mount(
# '/static', fastapiStaticfiles.StaticFiles(directory='static'), name='static')
from linebot import (
LineBotApi, WebhookHandler
)
from linebot.exceptions import (
InvalidSignatureError
)
from linebot.models import (
MessageEvent, TextMessage,ImageSendMessage, TextSendMessage,FlexSendMessage, TemplateSendMessage,CarouselTemplate,ConfirmTemplate,PostbackAction,MessageAction,CarouselColumn,URIAction
)
import json
import codecs
seo=False
s_news=False
line_bot_api = LineBotApi('Gub58t+8u9z7nTBrZLLvXnCbwDR1Gmyrew3nGFlKRxsmvH/oIMGjjup2DygA4XhV1NenM6dFTO8yvpbtCDezCqP2BAV4eVn2Y63/EYqeTCw1S+oJ+BiLLzC8DdVrWu9jSfp7wXqrVNUxcYdk54eoWAdB04t89/1O/w1cDnyilFU=')
handler = WebhookHandler('f761bc6038c94a3baa815124e33dea50')
#line_bot_api = LineBotApi('YOUR_CHANNEL_ACCESS_TOKEN')
#handler = WebhookHandler('YOUR_CHANNEL_SECRET')
def get_aws():
result=''
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4')
cursor=db.query('SELECT from_unixtime(eventtime) as dt,area FROM hhh.aws_monitor order by eventtime desc limit 4;')
for c in cursor:
result+=str(c['dt'])+"\n"+c['area']+"\n"
return result
def get_idea():
result=''
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4')
cursor=db.query('SELECT query FROM hhh.gsc_weekly where clicks > 500 order by rand() limit 10;')
for c in cursor:
result+=str(c['query'])+"\n"
return result
def get_kw_ls(past=False):
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/yodb?charset=utf8mb4')
now = ''
sql_get_newest_data_date = 'SELECT * FROM trending_searches ORDER BY ts_date DESC'
for row in db.query(sql_get_newest_data_date):
now = row['ts_date']
break
if past:
now =now-datetime.timedelta(days=1)
now_start = now.strftime("%Y-%m-%d 00:00:00")
now_end = now.strftime("%Y-%m-%d 23:59:59")
sqls = 'SELECT * FROM trending_searches WHERE "'+now_end+'">= ts_date and "'+now_start+'"<=ts_date ORDER BY ts_date DESC'
ls = []
for row in db.query(sqls):
ls.append(row['ts_word'])
ls = list(dict.fromkeys(ls))
return ls
def get_kw(past=False):
keys = get_kw_ls(past)
title = '熱門搜尋關鍵字' if past == False else '歷史關鍵字走勢'
carousel_dict = {"type": "carousel","contents": []}
carousel_idx = -1
num = 1
for k in keys:
if (num-1)%12 ==0:
carousel_dict['contents'] = carousel_dict['contents']+[make_bubble(title)]
carousel_idx+=1
carousel_dict['contents'][carousel_idx]['body']['contents'] += [make_box(num,k)]
num+=1
return carousel_dict
@app.get("/aws")
async def aws():
result='
'
result+='製表時間: '+str(datetime.datetime.now())+'\n\n'
result+=''
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4')
cursor=db.query('SELECT from_unixtime(eventtime) as dt,area FROM hhh.aws_monitor order by eventtime desc limit 30;')
for c in cursor:
result+=""+str(c['dt'])+" "+c['area']+" \n"
result+='
'
return HTMLResponse(content=result, status_code=200)
@app.get("/msg/{item_id}")
async def coffee_msg(item_id):
True
line_bot_api.push_message(item_id, TextSendMessage(text='開啟下方完整券樣(密碼9888)至櫃台掃碼兌換。https://txp.rs/v/EvX69b4Xq9'))
return {"code":"ok" }
@app.post('/callback')
async def callback(request: fastapi.Request):
signature = request.headers['X-Line-Signature']
body = await request.body()
handler.handle(body.decode('utf-8'), signature)
return 'OK'
@app.get('/get_trend_image')
async def get_trend_image(kw):
#def get_trend_image(kw):
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/yodb?charset=utf8mb4')
sql_get_newest_data_date = 'SELECT * FROM interest_over_time WHERE iot_kword="'+kw+'" ORDER BY iot_date'
x_axis = []
y_axis = []
for row in db.query(sql_get_newest_data_date):
x_axis += [row['iot_date']]
y_axis += [row['iot_value']]
#fomat example
#x = ['7/15', '7/18', '7/19', '7/24', '7/25', '7/26']
#y = [1, 5, 2, 7, 9, 1]
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y/%m/%d'))
plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=30)) #座標軸刻度1天
target_path = 'trend_image/'+kw+".png"
plt.plot(x_axis, y_axis,'r-^') # 設定樣式
plt.savefig(target_path, bbox_inches='tight') #存檔,第二個參數表示把圖表外多餘的空間刪除
return FileResponse(target_path)
def get_news_by_kw(keyword):
googlenews = GoogleNews(lang='zh-TW')
kw=keyword
googlenews.set_lang('zh-TW')
googlenews.search(kw)
resultstr="新聞:"
idx=0
rs=googlenews.results()
for r in rs:
if idx>0:
resultstr+=','
else:
idx+=1
resultstr+=r['title']
return resultstr
# print(r['desc'])
# print(r['link'])
# print(r['datetime'])
def flex_test():
js=json.load(open('test.json','r',encoding='utf-8'))
for i in range(3):
row_dict = {}
row_dict['type'] = 'box'
row_dict['layout'] = 'baseline'
row_dict['contents']= [
{"type": "text","text": "第"+str(i)+"名","size": "sm","color": "#aaaaaa","flex": 2},
{"type": "text","text": "hello, world"+str(1),"flex": 5,"weight": "regular"}
]
js['body']['contents'] = js['body']['contents'] + [row_dict]
return js
def make_bubble(title):
box_dict = {
"type": "bubble",
"hero": {
"type": "image",
"url": "https://scdn.line-apps.com/n/channel_devcenter/img/fx/01_1_cafe.png",
"size": "full",
"gravity": "top",
"margin": "none",
"aspectRatio": "20:13"
},
"body": {
"type": "box",
"layout": "vertical",
"contents": [
{
"type": "text",
"text": title,
"size": "xl",
"weight": "bold",
"align": "center"
}
]
}
}
return box_dict
def make_box(num, content):
row_dict = {}
row_dict['type'] = 'box'
row_dict['layout'] = 'baseline'
row_dict['contents']= [
{
"type": "text",
"text": "第"+str(num)+"名",
"size": "sm",
"color": "#aaaaaa",
"flex": 2
},
{
"type": "text",
"text": content,
"flex": 5,
"weight": "regular"
}
]
return row_dict
def make_carousel():
carousel_dict = {"type": "carousel","contents": []}
carousel_idx = -1
for i in range(26):
if i%12 ==0:
carousel_dict['contents'] = carousel_dict['contents']+[make_bubble()]
carousel_idx+=1
carousel_dict['contents'][carousel_idx]['body']['contents'] += [make_box(i,'data'+str(i))]
return carousel_dict
@handler.add(FollowEvent)
def handle_follow(event):
print(event.source.user_id)
# do something
@handler.add(linebotModels.MessageEvent, message=linebotModels.TextMessage)
def message_text(event):
global seo
global s_news
## if event.message.text == 'push':
# line_bot_api.push_message(
# event.source.user_id, [
# # TextSendMessage(text='PUSH!'),
# ]
# )
if event.message.text == 's_news':
s_news=True
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text='請輸入要搜尋新聞的關鍵字:'))
return
if event.message.text == 'q_aws':
line_bot_api.reply_message(
event.reply_token,[TextSendMessage(text=get_aws()),TextSendMessage(text='完整報告: https://api.ptt.cx:5443/aws')])
return
if event.message.text == 'q_idea':
s_news=True
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=get_idea()))
return
############################
if event.message.text == 'hotkeys':
js = get_kw(False)
line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
return
if event.message.text == 'past_hotkeys':
js = get_kw(True)
line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
return
'''
if event.message.text == 'seo':
seo=True
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text='請輸入要找的關鍵字:'))
return
'''
if event.message.text=='js test':
FlexMessage = flex_test()
line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',FlexMessage))
return
if event.message.text=='熱門關鍵字':
js = get_kw(False)
line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
return
if event.message.text=='關鍵字歷史走勢':
js = get_kw(True)
line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
return
if event.message.text == '叫':
line_bot_api.reply_message(
event.reply_token, linebotModels.AudioSendMessage(
original_content_url=f'{baseUrl}/static/audio/noot_noot.mp3', duration=1000))
if event.message.text=='ok':
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text='最欣賞ChoozMo團隊說OK的人!!'))
if event.message.text=='c' or event.message.text=='C' :
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(
text='快速鍵',
quick_reply=QuickReply(
items=[
QuickReplyButton(
action=MessageAction(label="熱門關鍵字", text="hotkeys")
),
QuickReplyButton(
action=MessageAction(label="關鍵字歷史走勢", text="past_hotkeys")
),
#QuickReplyButton(
# action=MessageAction(label="幸福空間靈感", text="q_idea")
#),
#QuickReplyButton(
# action=MessageAction(label="查AWS", text="q_aws")
#),
#QuickReplyButton(
# action=MessageAction(label="關聯字", text="seo")
#),
])))
else:
if s_news:
result=get_news_by_kw(event.message.text)
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=result))
s_news=False
return
if seo:
res='相關字:'
s = suggests.suggests.get_suggests(event.message.text, source='google')
idx=0
for sg in s['suggests']:
if idx>0:
res+=','
else:
idx+=1
res+=sg
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=res))
seo=False
# print('test')
#if __name__ == "__main__":
# app.run(host='0.0.0.0', port=443,ssl_context=('/etc/letsencrypt/live/ptt.cx/fullchain.pem', '/etc/letsencrypt/live/ptt.cx/privkey.pem'))
# app.run(host='0.0.0.0', port=14404,ssl_context=('/tmp/cert.pem','/tmp/chain.pem' ))