'
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4')
cursor=db.query('SELECT from_unixtime(eventtime) as dt,area FROM hhh.aws_monitor order by eventtime desc limit 30;')
for c in cursor:
result+="
"+str(c['dt'])+"
"+c['area']+"
\n"
result+='
'
return HTMLResponse(content=result, status_code=200)
@app.get("/msg/{item_id}")
async def coffee_msg(item_id):
True
line_bot_api.push_message(item_id, TextSendMessage(text='開啟下方完整券樣(密碼9888)至櫃台掃碼兌換。https://txp.rs/v/EvX69b4Xq9'))
return {"code":"ok" }
@app.post('/callback')
async def callback(request: fastapi.Request):
signature = request.headers['X-Line-Signature']
body = await request.body()
handler.handle(body.decode('utf-8'), signature)
return 'OK'
@app.get('/get_trend_image')
async def get_trend_image(kw_id):
#def get_trend_image(kw):
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/yodb?charset=utf8mb4')
kw = ''
sql_get_newest_data_date = 'SELECT iot_kword FROM interest_over_time WHERE iotid="'+kw_id+'"'
for row in db.query(sql_get_newest_data_date):
kw=row['iot_kword']
sql_get_newest_data_date = 'SELECT * FROM interest_over_time WHERE iot_kword="'+kw+'" ORDER BY iot_date'
x_axis = []
y_axis = []
for row in db.query(sql_get_newest_data_date):
x_axis += [row['iot_date']]
y_axis += [row['iot_value']]
x = np.array(x_axis)
y = np.array(y_axis)
# create an array of numbers for the dates
x_dates = np.array([dates.date2num(i) for i in x])
# create more uniform intervals in x axis and use spline to interpolate data
x_smooth = np.linspace(x_dates.min(), x_dates.max(), 200)
#y_smooth = spline(x_dates, y, x_smooth)
y_smooth = make_interp_spline(x_dates, y)(x_smooth)
# creating a new date array from the new date number array
x_new = np.array([dates.num2date(i) for i in x_smooth])
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y/%m'))
plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=100)) #座標軸刻度1天
#plt.figure()
plt.plot(x_new, y_smooth)
target_path = 'trend_image/'+kw+'.png'
plt.savefig(target_path)
plt.clf()
return FileResponse(target_path)
def get_news_by_kw(keyword):
googlenews = GoogleNews(lang='zh-TW')
kw=keyword
googlenews.set_lang('zh-TW')
googlenews.search(kw)
resultstr="新聞:"
idx=0
rs=googlenews.results()
for r in rs:
if idx>0:
resultstr+=','
else:
idx+=1
resultstr+=r['title']
return resultstr
# print(r['desc'])
# print(r['link'])
# print(r['datetime'])
def flex_test():
js=json.load(open('test.json','r',encoding='utf-8'))
for i in range(3):
row_dict = {}
row_dict['type'] = 'box'
row_dict['layout'] = 'baseline'
row_dict['contents']= [
{"type": "text","text": "第"+str(i)+"名","size": "sm","color": "#aaaaaa","flex": 2},
{"type": "text","text": "hello, world"+str(1),"flex": 5,"weight": "regular"}
]
js['body']['contents'] = js['body']['contents'] + [row_dict]
return js
def make_bubble(title,kw):
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/yodb?charset=utf8mb4')
sql_get_newest_data_date = 'SELECT iotid FROM interest_over_time WHERE iot_kword="'+kw+'"'
id=''
for row in db.query(sql_get_newest_data_date):
id=row['iotid']
id = random.randint(1,1050)
box_dict = {
"type": "bubble",
"hero": {
"type": "image",
"url": "https://api.ptt.cx:6443/get_trend_image?kw_id="+str(id),
"size": "full",
"gravity": "top",
"margin": "none",
"aspectRatio": "20:13"
},
"body": {
"type": "box",
"layout": "vertical",
"contents": [
{
"type": "text",
"text": title+"-"+kw,
"size": "xl",
"weight": "bold",
"align": "center"
}
]
}
}
return box_dict
def make_box(num, content):
row_dict = {}
row_dict['type'] = 'box'
row_dict['layout'] = 'baseline'
#row_dict["action"]= {
# "type": "uri",
# "label": "action",
# "uri": "https://store.line.me"
#}
row_dict['contents']= [
{
"type": "text",
"text": "第"+str(num)+"名",
"size": "sm",
"color": "#aaaaaa",
"flex": 2
},
{
"type": "text",
"text": content,
"flex": 5,
"weight": "regular"
}
]
return row_dict
def make_carousel():
carousel_dict = {"type": "carousel","contents": []}
carousel_idx = -1
for i in range(26):
if i%12 ==0:
carousel_dict['contents'] = carousel_dict['contents']+[make_bubble()]
carousel_idx+=1
carousel_dict['contents'][carousel_idx]['body']['contents'] += [make_box(i,'data'+str(i))]
return carousel_dict
@handler.add(FollowEvent)
def handle_follow(event):
print(event.source.user_id)
# do something
@handler.add(linebotModels.MessageEvent, message=linebotModels.TextMessage)
def message_text(event):
global seo
global s_news
## if event.message.text == 'push':
# line_bot_api.push_message(
# event.source.user_id, [
# # TextSendMessage(text='PUSH!'),
# ]
# )
notify_group('User :'+event.message.text)
if event.message.text == 's_news':
s_news=True
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text='請輸入要搜尋新聞的關鍵字:'))
return
if event.message.text == 'q_aws':
line_bot_api.reply_message(
event.reply_token,[TextSendMessage(text=get_aws()),TextSendMessage(text='完整報告: https://api.ptt.cx:5443/aws')])
return
if event.message.text == 'q_idea':
s_news=True
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=get_idea()))
return
############################
if event.message.text == 'hotkeys':
js = get_kw(False)
line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
return
if event.message.text == 'past_hotkeys':
js = get_kw(True)
line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
return
if event.message.text == '合作提案':
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text='合作提案請寄信至 : jared@choozmo.com 信箱'))
return
if event.message.text == '更多功能':
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(
text='更多功能',
quick_reply=QuickReply(
items=[
QuickReplyButton(
action=MessageAction(label="合作提案", text="合作提案")
),
])))
return
'''
if event.message.text == 'seo':
seo=True
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text='請輸入要找的關鍵字:'))
return
'''
if event.message.text=='js test':
FlexMessage = flex_test()
line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',FlexMessage))
return
if event.message.text=='熱門關鍵字':
js = get_kw(False)
line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
return
if event.message.text=='關鍵字歷史走勢':
js = get_kw(True)
line_bot_api.reply_message(event.reply_token, FlexSendMessage('ChoozMo',js))
return
if event.message.text == '叫':
line_bot_api.reply_message(
event.reply_token, linebotModels.AudioSendMessage(
original_content_url=f'{baseUrl}/static/audio/noot_noot.mp3', duration=1000))
if event.message.text=='ok':
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text='最欣賞ChoozMo團隊說OK的人!!'))
'''
if event.message.text=='c' or event.message.text=='C' :
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(
text='快速鍵',
quick_reply=QuickReply(
items=[
QuickReplyButton(
action=MessageAction(label="熱門關鍵字", text="hotkeys")
),
QuickReplyButton(
action=MessageAction(label="關鍵字歷史走勢", text="past_hotkeys")
),
QuickReplyButton(
action=MessageAction(label="更多功能", text="更多功能")
),
#QuickReplyButton(
# action=MessageAction(label="幸福空間靈感", text="q_idea")
#),
#QuickReplyButton(
# action=MessageAction(label="查AWS", text="q_aws")
#),
#QuickReplyButton(
# action=MessageAction(label="關聯字", text="seo")
#),
])))
else:
if s_news:
result=get_news_by_kw(event.message.text)
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=result))
s_news=False
return
if seo:
res='相關字:'
s = suggests.suggests.get_suggests(event.message.text, source='google')
idx=0
for sg in s['suggests']:
if idx>0:
res+=','
else:
idx+=1
res+=sg
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=res))
seo=False
# print('test')
'''
#if __name__ == "__main__":
# app.run(host='0.0.0.0', port=443,ssl_context=('/etc/letsencrypt/live/ptt.cx/fullchain.pem', '/etc/letsencrypt/live/ptt.cx/privkey.pem'))
# app.run(host='0.0.0.0', port=14404,ssl_context=('/tmp/cert.pem','/tmp/chain.pem' ))