import uvicorn import fastapi from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from linebot import LineBotApi, WebhookHandler from linebot.models import ( MessageEvent, TextMessage, TextSendMessage, FollowEvent, TemplateSendMessage, ButtonsTemplate, URITemplateAction, ) import dataset import requests import json import qrcode # from PIL import Image # import base64, io from random import randrange import models import datetime as dt app = fastapi.FastAPI() app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # bot config line_bot_api = LineBotApi("SJT7VPT4RMQFLcS27jQBy3FcC24gtDrkcwJWZ5Xzqesr5T78LOKudHEJzt0k3b2S7n4KPwf27J7DVz2c8NQ4plSaaQylEeB1cYrfejaE/RPG/lCIQBYe4iBTzo26s4i2PcmT89837per/lTyvhVIKAdB04t89/1O/w1cDnyilFU=") handler = WebhookHandler("411ae3ef7e766739ed2c2c27b249d010") # callback event @app.post("/callback") async def callback(request: fastapi.Request): signature = request.headers['X-Line-Signature'] body = await request.body() handler.handle(body.decode('utf-8'), signature) return 'OK' # follow event @handler.add(FollowEvent) def handle_follow(event): # get user id when follow real_user_id = event.source.user_id # db connect and search db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4') table = db['users'] result1 = table.find_one(userid=real_user_id) # 都存在db的話 if result1: db.close() line_bot_api.reply_message( event.reply_token, TextSendMessage(text='很高興再見到您!')) # 建立全新使用者 else: # create user account api url = 'https://nft-api-staging.joyso.io/api/v1/accounts' headers = {'Authorization': 'Basic bmZ0OmMxOTEzOWMzYjM3YjdjZWU3ZmY3OTFiZGU3NzdjZWNl'} # setup for temp use (unique id) rand_num = str(randrange(99999)) user_id = event.source.user_id + rand_num data = 'uid=' + user_id r = requests.post(url=url, headers=headers, data=data) # extract the account address dict_str = json.loads(r.text) user_account = dict_str['account'] user_address = user_account['address'] # generate qr code from user id qr = qrcode.QRCode( version=1, box_size=10, border=5) qr.add_data(user_address) qr.make(fit=True) img_qr = qr.make_image(fill='black', back_color='white') filename = "/var/www/ArkCard-Linebot/ArkCard-web/qrcode/" + real_user_id + '.png' img_save = img_qr.save(filename) # add to db data = dict(userid=real_user_id, useraddress=user_address) table.insert(data) db.close() line_bot_api.reply_message( event.reply_token, TextSendMessage(text='歡迎加入好友')) # message handler @handler.add(MessageEvent, message=TextMessage) def message(event): if '我要發送' in event.message.text: button_template_message = ButtonsTemplate( title=' ', text='點擊並打開收藏的NFT,可以選擇想要發送的NFT給對方!', actions=[ URITemplateAction( label='打開發送頁', uri='https://ark.cards/collect.html?' + event.source.user_id),]) line_bot_api.reply_message( event.reply_token, TemplateSendMessage( alt_text="Receive", template=button_template_message)) elif '我要接收' in event.message.text: button_template_message = ButtonsTemplate( title=' ', text='點擊並打開接收頁面,即可分享接收地址給對方!', actions=[ URITemplateAction( label='打開接收頁', uri='https://ark.cards/qr-code.html?' + event.source.user_id),]) line_bot_api.reply_message( event.reply_token, TemplateSendMessage( alt_text="Receive", template=button_template_message)) elif 'NFT商店' in event.message.text: button_template_message = ButtonsTemplate( title=' ', text='點擊並打開NFT商品頁,就可以購買您所想要的NFT商品哦!', actions=[ URITemplateAction( label='打開NFT商品頁', uri='https://ark.cards/shop.html?' + event.source.user_id),]) line_bot_api.reply_message( event.reply_token, TemplateSendMessage( alt_text="Receive", template=button_template_message)) elif 'NFT收藏' in event.message.text: button_template_message = ButtonsTemplate( title=' ', text='點擊並打開收藏的NFT,可以查看收到的NFT!', actions=[ URITemplateAction( label='打開收藏頁', uri='https://ark.cards/collect.html?' + event.source.user_id), ]) line_bot_api.reply_message( event.reply_token, TemplateSendMessage( alt_text="Receive", template=button_template_message)) else: button_template_message = ButtonsTemplate( title=' ', text='更多的服務內容,歡迎請上我們的官網!', actions=[ URITemplateAction( label='ArkCard的官網', uri='https://ark.cards'),]) line_bot_api.reply_message( event.reply_token, TemplateSendMessage( alt_text="Receive", template=button_template_message)) @app.post("/push/") def push_text(user, message): line_bot_api.push_message(user, TextSendMessage(text=message)) # nft collection api @app.get("/collection/{userid}") def collection(userid): # db connect db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4') table3 = db['nftdrops'] table2 = db['nft'] nftdrops = {} nft = {} nfts_all = {} i = 0 j = 0 if not table3.find_one(userid=userid) and not table2.find_one(userid=userid): db.close() return "error: user don't have any nft" else: results1 = table3.find(userid=userid) for item in results1: nft_id = item['nftid'] nftdrops[i] = table2.find_one(id=nft_id) i += 1 results2 = table2.find(userid=userid) for item in results2: nft[j] = item j += 1 nfts_all[0] = nftdrops nfts_all[1] = nft return nfts_all db.close() # receive handler @app.get("/receive/{userid}") def receive(userid): # db connect db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4') table = db['users'] table.find_one(userid=userid) if not table.find_one(userid=userid): db.close() return "ERROR: User Not Found" else: result = table.find_one(userid=userid) return {"userid": result['userid'], "useraddress": result['useraddress']} db.close() # send handler @app.post("/send") async def receive(userModel : models.TransactionNft): # db connect db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4') table = db['users'] table2 = db['nft'] table3 = db['nftdrops'] # input and find userid nftid = userModel.nftid address = userModel.address result = table.find_one(useraddress=address) # first confirm if the user exist if not result: db.close() return {'msg': 'user address not found'} else: userid = result['userid'] # update nft owner if table3.find_one(nftid=nftid): data = dict(nftid=nftid, userid=userid) table3.update(data, ['nftid']) # push訊息 result3 = table2.find_one(id=nftid) title = result3['title'] message = "您的NFT : " + title + ", 已劃轉成功!" push_text(userid, message) db.close() elif table2.find_one(id=nftid): result3 = table2.find_one(id=nftid) pre_own = result3['userid'] data = dict(id=nftid, userid=userid) table2.update(data, ['id']) # push訊息 title = result3['title'] fr = "您的NFT : " + title + ", 已發送成功!" to = "您的NFT : "+title+", 已收到!" push_text(userid, to) push_text(pre_own, fr) db.close() else: db.close() return {'msg': 'nft not found'} return {'msg': 'OK'} # shop handler @app.get("/shop/{userid}") def shop(userid): # db connect db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4') sql = 'SELECT DISTINCT(title), id, imgurl, userid FROM arkcard.nft WHERE id<1001 and userid IS NULL GROUP BY title LIMIT 5' result = db.query(sql) rows = {} i = 0 for row in result: rows[i] = row i += 1 return rows db.close() @app.post("/buy") async def buy(userModel: models.BuyNft): # db connect db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4') table2 = db['nft'] # input nftid = userModel.nftid userid = userModel.userid if not table2.find_one(id=nftid): db.close() print("error: nft not found") return "該NFT商品不存在!如果有疑問,請洽網站的服務信箱!" else: user_obj = table2.find_one(id=nftid) user_obj['userid'] = userid table2.update(dict(user_obj), ['id']) # push訊息 result3 = table2.find_one(id=nftid) title = result3['title'] message = "您的NFT : " + title + ", 已購買成功!" push_text(userid, message) db.close() return "您已購買成功!" @app.post("/event") async def nftdrops(userModel : models.NftDrops): # db connect db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4') table3 = db['nftdrops'] # input對應 eventid = '1' nftid = '1001' nftid2 = '1002' userid = userModel.userid email = userModel.email now = dt.datetime.now() # 如果userid不在db, 寫入到一個空值nft if not table3.find_one(userid=userid): # 新增資料 table3.insert(dict(eventid=eventid, nftid=nftid, userid=userid, email=email, time=now)) table3.insert(dict(eventid=eventid, nftid=nftid2, userid=userid, email=email, time=now)) db.close() return "新增成功" # 如果userid存在,回傳通知 else: db.close() return "已有資料" if __name__ == '__main__': uvicorn.run("main:app", host="0.0.0.0", port=8228, reload=True,ssl_context=('/etc/letsencrypt/live/ark.cards/fullchain.pem', '/etc/letsencrypt/live/ark.cards/privkey.pem'))