import copy import fastapi import fastapi.staticfiles as fastapiStaticfiles import linebot import linebot.models as linebotModels import suggests from GoogleNews import GoogleNews import dataset import datetime from fastapi.responses import HTMLResponse # from linebot.models import ( MessageEvent, TextMessage, TextSendMessage, SourceUser, SourceGroup, SourceRoom, TemplateSendMessage, ConfirmTemplate, MessageAction, ButtonsTemplate, ImageCarouselTemplate, ImageCarouselColumn, URIAction, PostbackAction, DatetimePickerAction, CameraAction, CameraRollAction, LocationAction, CarouselTemplate, CarouselColumn, PostbackEvent, StickerMessage, StickerSendMessage, LocationMessage, LocationSendMessage, ImageMessage, VideoMessage, AudioMessage, FileMessage, UnfollowEvent, FollowEvent, JoinEvent, LeaveEvent, BeaconEvent, MemberJoinedEvent, MemberLeftEvent, FlexSendMessage, BubbleContainer, ImageComponent, BoxComponent, TextComponent, IconComponent, ButtonComponent, SeparatorComponent, QuickReply, QuickReplyButton, ImageSendMessage) #uvicorn newbot:app --host 0.0.0.0 --port 5443 --ssl-keyfile=/etc/letsencrypt/live/api.ptt.cx/privkey.pem --ssl-certfile=/etc/letsencrypt/live/api.ptt.cx/fullchain.pem #uvicorn main:app --host 0.0.0.0 --port 443 --ssl-keyfile=/etc/letsencrypt/live/ptt.cx/privkey.pem --ssl-certfile=/etc/letsencrypt/live/ptt.cx/chain.pem #uvicorn main:app --host 0.0.0.0 --port 443 --key-file=/etc/letsencrypt/live/ptt.cx/privkey.pem --certfile=/etc/letsencrypt/live/ptt.cx/cert.pem # --keyfile=./key.pem --certfile=./cert.pem # --ssl-cert-reqs 1 # # --ssl-ca-certs=/etc/letsencrypt/live/ptt.cx/fullchain.crt app = fastapi.FastAPI() #app.mount( # '/static', fastapiStaticfiles.StaticFiles(directory='static'), name='static') from linebot import ( LineBotApi, WebhookHandler ) from linebot.exceptions import ( InvalidSignatureError ) from linebot.models import ( MessageEvent, TextMessage,ImageSendMessage, TextSendMessage,FlexSendMessage, TemplateSendMessage,CarouselTemplate,ConfirmTemplate,PostbackAction,MessageAction,CarouselColumn,URIAction ) import json import codecs seo=False s_news=False line_bot_api = LineBotApi('Gub58t+8u9z7nTBrZLLvXnCbwDR1Gmyrew3nGFlKRxsmvH/oIMGjjup2DygA4XhV1NenM6dFTO8yvpbtCDezCqP2BAV4eVn2Y63/EYqeTCw1S+oJ+BiLLzC8DdVrWu9jSfp7wXqrVNUxcYdk54eoWAdB04t89/1O/w1cDnyilFU=') handler = WebhookHandler('f761bc6038c94a3baa815124e33dea50') #line_bot_api = LineBotApi('YOUR_CHANNEL_ACCESS_TOKEN') #handler = WebhookHandler('YOUR_CHANNEL_SECRET') def get_aws(): result='' db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4') cursor=db.query('SELECT from_unixtime(eventtime) as dt,area FROM hhh.aws_monitor order by eventtime desc limit 4;') for c in cursor: result+=str(c['dt'])+"\n"+c['area']+"\n" return result def get_idea(): result='' db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4') cursor=db.query('SELECT query FROM hhh.gsc_weekly where clicks > 500 order by rand() limit 10;') for c in cursor: result+=str(c['query'])+"\n" return result def get_hotkeys(past=False): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/yodb?charset=utf8mb4') now = '' sql_get_newest_data_date = 'SELECT * FROM trending_searches ORDER BY ts_date DESC' for row in db.query(sql_get_newest_data_date): now = row['ts_date'] break if past: now =now-datetime.timedelta(days=1) now_start = now.strftime("%Y-%m-%d 00:00:00") now_end = now.strftime("%Y-%m-%d 23:59:59") sqls = 'SELECT * FROM trending_searches WHERE "'+now_end+'">= ts_date and "'+now_start+'"<=ts_date ORDER BY ts_date DESC' ls = [] for row in db.query(sqls): ls.append(row['ts_word']) #ls = list(dict.fromkeys(ls)) return ls @app.get("/aws") async def aws(): result='' result+='製表時間: '+str(datetime.datetime.now())+'

\n\n' result+='' db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4') cursor=db.query('SELECT from_unixtime(eventtime) as dt,area FROM hhh.aws_monitor order by eventtime desc limit 30;') for c in cursor: result+="\n" result+='
"+str(c['dt'])+""+c['area']+"
' return HTMLResponse(content=result, status_code=200) @app.get("/msg/{item_id}") async def coffee_msg(item_id): True line_bot_api.push_message(item_id, TextSendMessage(text='開啟下方完整券樣(密碼9888)至櫃台掃碼兌換。https://txp.rs/v/EvX69b4Xq9')) return {"code":"ok" } @app.post('/callback') async def callback(request: fastapi.Request): signature = request.headers['X-Line-Signature'] body = await request.body() handler.handle(body.decode('utf-8'), signature) return 'OK' def get_news_by_kw(keyword): googlenews = GoogleNews(lang='zh-TW') kw=keyword googlenews.set_lang('zh-TW') googlenews.search(kw) resultstr="新聞:" idx=0 rs=googlenews.results() for r in rs: if idx>0: resultstr+=',' else: idx+=1 resultstr+=r['title'] return resultstr # print(r['desc']) # print(r['link']) # print(r['datetime']) def flex_test(): js=json.load(open('test.json','r',encoding='utf-8')) for i in range(3): row_dict = {} row_dict['type'] = 'box' row_dict['layout'] = 'baseline' row_dict['contents']= [ { "type": "text", "text": "第"+str(i)+"名", "size": "sm", "color": "#aaaaaa", "flex": 2 }, { "type": "text", "text": "hello, world"+str(1), "flex": 5, "weight": "regular" } ] js['body']['contents'] = js['body']['contents'] + [row_dict] return js @handler.add(FollowEvent) def handle_follow(event): print(event.source.user_id) # do something @handler.add(linebotModels.MessageEvent, message=linebotModels.TextMessage) def message_text(event): global seo global s_news ## if event.message.text == 'push': # line_bot_api.push_message( # event.source.user_id, [ # # TextSendMessage(text='PUSH!'), # ] # ) if event.message.text == 's_news': s_news=True line_bot_api.reply_message( event.reply_token, TextSendMessage(text='請輸入要搜尋新聞的關鍵字:')) return if event.message.text == 'q_aws': line_bot_api.reply_message( event.reply_token,[TextSendMessage(text=get_aws()),TextSendMessage(text='完整報告: https://api.ptt.cx:5443/aws')]) return if event.message.text == 'q_idea': s_news=True line_bot_api.reply_message( event.reply_token, TextSendMessage(text=get_idea())) return ############################ if event.message.text == 'hotkeys': line_bot_api.reply_message( event.reply_token,[TextSendMessage(text=get_aws()),TextSendMessage(text=get_hotkeys(False))]) return if event.message.text == 'past_hotkeys': line_bot_api.reply_message( event.reply_token,[TextSendMessage(text=get_aws()),TextSendMessage(text=get_hotkeys(True))]) return ''' if event.message.text == 'seo': seo=True line_bot_api.reply_message( event.reply_token, TextSendMessage(text='請輸入要找的關鍵字:')) return ''' if event.message.text=='js test': FlexMessage = flex_test() line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',FlexMessage)) return if event.message.text=='熱門關鍵字': js=json.load(open('test.json','r',encoding='utf-8')) keys = get_hotkeys(past=False) num = 1 for k in keys: row_dict = {} row_dict['type'] = 'box' row_dict['layout'] = 'baseline' row_dict['contents']= [ { "type": "text", "text": "第"+str(num)+"名", "size": "sm", "color": "#aaaaaa", "flex": 2 }, { "type": "text", "text": k, "flex": 5, "weight": "regular" } ] num = num + 1 js['body']['contents'] = js['body']['contents'] + [row_dict] line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',FlexMessage)) return if event.message.text=='關鍵字歷史走勢': out_result = get_hotkeys(True) line_bot_api.reply_message( event.reply_token, TextSendMessage(text=out_result)) if event.message.text == '叫': line_bot_api.reply_message( event.reply_token, linebotModels.AudioSendMessage( original_content_url=f'{baseUrl}/static/audio/noot_noot.mp3', duration=1000)) if event.message.text=='ok': line_bot_api.reply_message( event.reply_token, TextSendMessage(text='最欣賞ChoozMo團隊說OK的人!!')) if event.message.text=='c' or event.message.text=='C' : line_bot_api.reply_message( event.reply_token, TextSendMessage( text='快速鍵', quick_reply=QuickReply( items=[ QuickReplyButton( action=MessageAction(label="熱門關鍵字", text="hotkeys") ), QuickReplyButton( action=MessageAction(label="關鍵字歷史走勢", text="past_hotkeys") ), #QuickReplyButton( # action=MessageAction(label="幸福空間靈感", text="q_idea") #), #QuickReplyButton( # action=MessageAction(label="查AWS", text="q_aws") #), #QuickReplyButton( # action=MessageAction(label="關聯字", text="seo") #), ]))) else: if s_news: result=get_news_by_kw(event.message.text) line_bot_api.reply_message( event.reply_token, TextSendMessage(text=result)) s_news=False return if seo: res='相關字:' s = suggests.suggests.get_suggests(event.message.text, source='google') idx=0 for sg in s['suggests']: if idx>0: res+=',' else: idx+=1 res+=sg line_bot_api.reply_message( event.reply_token, TextSendMessage(text=res)) seo=False # print('test') #if __name__ == "__main__": # app.run(host='0.0.0.0', port=443,ssl_context=('/etc/letsencrypt/live/ptt.cx/fullchain.pem', '/etc/letsencrypt/live/ptt.cx/privkey.pem')) # app.run(host='0.0.0.0', port=14404,ssl_context=('/tmp/cert.pem','/tmp/chain.pem' ))