123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- import uvicorn
- import fastapi
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.responses import HTMLResponse
- from linebot import LineBotApi, WebhookHandler
- from linebot.models import (
- MessageEvent, TextMessage, TextSendMessage, FollowEvent, TemplateSendMessage, ButtonsTemplate, URITemplateAction,
- )
- import dataset
- import requests
- import json
- import qrcode
- # from PIL import Image
- # import base64, io
- from random import randrange
- import models
- import datetime as dt
- app = fastapi.FastAPI()
- app.add_middleware(
- CORSMiddleware,
- allow_origins=['*'],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # bot config
- line_bot_api = LineBotApi("SJT7VPT4RMQFLcS27jQBy3FcC24gtDrkcwJWZ5Xzqesr5T78LOKudHEJzt0k3b2S7n4KPwf27J7DVz2c8NQ4plSaaQylEeB1cYrfejaE/RPG/lCIQBYe4iBTzo26s4i2PcmT89837per/lTyvhVIKAdB04t89/1O/w1cDnyilFU=")
- handler = WebhookHandler("411ae3ef7e766739ed2c2c27b249d010")
- # callback event
- @app.post("/callback")
- async def callback(request: fastapi.Request):
- signature = request.headers['X-Line-Signature']
- body = await request.body()
- handler.handle(body.decode('utf-8'), signature)
- return 'OK'
- # follow event
- @handler.add(FollowEvent)
- def handle_follow(event):
- # get user id when follow
- real_user_id = event.source.user_id
- # db connect and search
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4')
- table = db['users']
- result1 = table.find_one(userid=real_user_id)
- # 都存在db的話
- if result1:
- db.close()
- line_bot_api.reply_message(
- event.reply_token,
- TextSendMessage(text='很高興再見到您!'))
- # 建立全新使用者
- else:
- # create user account api
- url = 'https://nft-api-staging.joyso.io/api/v1/accounts'
- headers = {'Authorization': 'Basic bmZ0OmMxOTEzOWMzYjM3YjdjZWU3ZmY3OTFiZGU3NzdjZWNl'}
- # setup for temp use (unique id)
- rand_num = str(randrange(99999))
- user_id = event.source.user_id + rand_num
- data = 'uid=' + user_id
- r = requests.post(url=url, headers=headers, data=data)
- # extract the account address
- dict_str = json.loads(r.text)
- user_account = dict_str['account']
- user_address = user_account['address']
- # generate qr code from user id
- qr = qrcode.QRCode(
- version=1,
- box_size=10,
- border=5)
- qr.add_data(user_address)
- qr.make(fit=True)
- img_qr = qr.make_image(fill='black', back_color='white')
- filename = "/var/www/ArkCard-Linebot/ArkCard-web/qrcode/" + real_user_id + '.png'
- img_save = img_qr.save(filename)
- # add to db
- data = dict(userid=real_user_id, useraddress=user_address)
- table.insert(data)
- db.close()
- line_bot_api.reply_message(
- event.reply_token,
- TextSendMessage(text='歡迎加入好友'))
- # message handler
- @handler.add(MessageEvent, message=TextMessage)
- def message(event):
- if '我要發送' in event.message.text:
- button_template_message = ButtonsTemplate(
- title=' ',
- text='點擊並打開收藏的NFT,可以選擇想要發送的NFT給對方!',
- actions=[
- URITemplateAction(
- label='打開發送頁',
- uri='https://ark.cards/collect.html?' + event.source.user_id),])
- line_bot_api.reply_message(
- event.reply_token,
- TemplateSendMessage(
- alt_text="Receive",
- template=button_template_message))
- elif '我要接收' in event.message.text:
- button_template_message = ButtonsTemplate(
- title=' ',
- text='點擊並打開接收頁面,即可分享接收地址給對方!',
- actions=[
- URITemplateAction(
- label='打開接收頁',
- uri='https://ark.cards/qr-code.html?' + event.source.user_id),])
- line_bot_api.reply_message(
- event.reply_token,
- TemplateSendMessage(
- alt_text="Receive",
- template=button_template_message))
- elif 'NFT商店' in event.message.text:
- button_template_message = ButtonsTemplate(
- title=' ',
- text='點擊並打開NFT商品頁,就可以購買您所想要的NFT商品哦!',
- actions=[
- URITemplateAction(
- label='打開NFT商品頁',
- uri='https://ark.cards/shop.html?' + event.source.user_id),])
- line_bot_api.reply_message(
- event.reply_token,
- TemplateSendMessage(
- alt_text="Receive",
- template=button_template_message))
- elif 'NFT收藏' in event.message.text:
- button_template_message = ButtonsTemplate(
- title=' ',
- text='點擊並打開收藏的NFT,可以查看收到的NFT!',
- actions=[
- URITemplateAction(
- label='打開收藏頁',
- uri='https://ark.cards/collect.html?' + event.source.user_id), ])
- line_bot_api.reply_message(
- event.reply_token,
- TemplateSendMessage(
- alt_text="Receive",
- template=button_template_message))
- else:
- button_template_message = ButtonsTemplate(
- title=' ',
- text='更多的服務內容,歡迎請上我們的官網!',
- actions=[
- URITemplateAction(
- label='ArkCard的官網',
- uri='https://ark.cards'),])
- line_bot_api.reply_message(
- event.reply_token,
- TemplateSendMessage(
- alt_text="Receive",
- template=button_template_message))
- @app.post("/push/")
- def push_text(user, message):
- line_bot_api.push_message(user, TextSendMessage(text=message))
- # nft collection api
- @app.get("/collection/{userid}")
- def collection(userid):
- # db connect
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4')
- table3 = db['nftdrops']
- table2 = db['nft']
- nftdrops = {}
- nft = {}
- nfts_all = {}
- i = 0
- j = 0
- if not table3.find_one(userid=userid) and not table2.find_one(userid=userid):
- db.close()
- return "error: user don't have any nft"
- else:
- results1 = table3.find(userid=userid)
- for item in results1:
- nft_id = item['nftid']
- nftdrops[i] = table2.find_one(id=nft_id)
- i += 1
- results2 = table2.find(userid=userid)
- for item in results2:
- nft[j] = item
- j += 1
- nfts_all[0] = nftdrops
- nfts_all[1] = nft
- return nfts_all
- db.close()
- # receive handler
- @app.get("/receive/{userid}")
- def receive(userid):
- # db connect
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4')
- table = db['users']
- table.find_one(userid=userid)
- if not table.find_one(userid=userid):
- db.close()
- return "ERROR: User Not Found"
- else:
- result = table.find_one(userid=userid)
- return {"userid": result['userid'], "useraddress": result['useraddress']}
- db.close()
- # send handler
- @app.post("/send")
- async def receive(userModel : models.TransactionNft):
- # db connect
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4')
- table = db['users']
- table2 = db['nft']
- table3 = db['nftdrops']
- # input and find userid
- nftid = userModel.nftid
- address = userModel.address
- result = table.find_one(useraddress=address)
- # first confirm if the user exist
- if not result:
- db.close()
- return {'msg': 'user address not found'}
- else:
- userid = result['userid']
- # update nft owner
- if table3.find_one(nftid=nftid):
- data = dict(nftid=nftid, userid=userid)
- table3.update(data, ['nftid'])
- # push訊息
- result3 = table2.find_one(id=nftid)
- title = result3['title']
- message = "您的NFT : " + title + ", 已劃轉成功!"
- push_text(userid, message)
- db.close()
- elif table2.find_one(id=nftid):
- result3 = table2.find_one(id=nftid)
- pre_own = result3['userid']
- data = dict(id=nftid, userid=userid)
- table2.update(data, ['id'])
- # push訊息
- title = result3['title']
- fr = "您的NFT : " + title + ", 已發送成功!"
- to = "您的NFT : "+title+", 已收到!"
- push_text(userid, to)
- push_text(pre_own, fr)
- db.close()
- else:
- db.close()
- return {'msg': 'nft not found'}
- return {'msg': 'OK'}
- # shop handler
- @app.get("/shop/{userid}")
- def shop(userid):
- # db connect
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4')
- sql = 'SELECT DISTINCT(title), id, imgurl, userid FROM arkcard.nft WHERE id<1001 and userid IS NULL GROUP BY title LIMIT 5'
- result = db.query(sql)
- rows = {}
- i = 0
- for row in result:
- rows[i] = row
- i += 1
- return rows
- db.close()
- @app.post("/buy")
- async def buy(userModel: models.BuyNft):
- # db connect
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4')
- table2 = db['nft']
- # input
- nftid = userModel.nftid
- userid = userModel.userid
- if not table2.find_one(id=nftid):
- db.close()
- print("error: nft not found")
- return "該NFT商品不存在!如果有疑問,請洽網站的服務信箱!"
- else:
- user_obj = table2.find_one(id=nftid)
- user_obj['userid'] = userid
- table2.update(dict(user_obj), ['id'])
- # push訊息
- result3 = table2.find_one(id=nftid)
- title = result3['title']
- message = "您的NFT : " + title + ", 已購買成功!"
- push_text(userid, message)
- db.close()
- return "您已購買成功!"
- @app.post("/event")
- async def nftdrops(userModel : models.NftDrops):
- # db connect
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/arkcard?charset=utf8mb4')
- table3 = db['nftdrops']
- # input對應
- eventid = '1'
- nftid = '1001'
- nftid2 = '1002'
- userid = userModel.userid
- email = userModel.email
- now = dt.datetime.now()
- # 如果userid不在db, 寫入到一個空值nft
- if not table3.find_one(userid=userid):
- # 新增資料
- table3.insert(dict(eventid=eventid, nftid=nftid, userid=userid, email=email, time=now))
- table3.insert(dict(eventid=eventid, nftid=nftid2, userid=userid, email=email, time=now))
- db.close()
- return "新增成功"
- # 如果userid存在,回傳通知
- else:
- db.close()
- return "已有資料"
- if __name__ == '__main__':
- uvicorn.run("main:app", host="0.0.0.0", port=8228, reload=True,ssl_context=('/etc/letsencrypt/live/ark.cards/fullchain.pem', '/etc/letsencrypt/live/ark.cards/privkey.pem'))
|