123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- #import urllib.request
- import urllib
- import requests
- import traceback
- from bs4 import BeautifulSoup
- import json
- import os
- import time
- import sys
- import random
- from seleniumwire import webdriver
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support.ui import WebDriverWait, Select
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.common.keys import Keys
- from selenium.webdriver.remote.webdriver import WebDriver
- import dataset
- import docker
- import brotli
- import gzip
- import datetime
- import redis
- import argparse
- #from fp.fp import FreeProxy
- localrun=False
- geo='TW'
- def send(driver, cmd, params={}):
- resource = "/session/%s/chromium/send_command_and_get_result" % driver.session_id
- url = driver.command_executor._url + resource
- body = json.dumps({'cmd': cmd, 'params': params})
- response = driver.command_executor._request('POST', url, body)
- # if response['status']:
- # raise Exception(response.get('value'))
- return response.get('value')
- def add_script(driver, script):
- send(driver, "Page.addScriptToEvaluateOnNewDocument", {"source": script})
- def set_viewport_size(driver, width, height):
- window_size = driver.execute_script("""
- return [window.outerWidth - window.innerWidth + arguments[0],
- window.outerHeight - window.innerHeight + arguments[1]];
- """, width, height)
- driver.set_window_size(*window_size)
- #docker run -d -p 4445:4444 --name p4445 --add-host=host.docker.internal:172.17.0.1 -v /dev/shm:/dev/shm selenium/standalone-chrome
- def init_webdriver():
- options = webdriver.ChromeOptions()
- options.add_argument('--ignore-certificate-errors')
- options.add_argument("--no-sandbox")
- options.add_argument("--headless")
- options.add_argument("--disable-gpu")
- options.add_argument("--disable-dev-shm-usage")
- driver = webdriver.Chrome(
- options=options
- )
- driver.set_window_size(1400,1000)
- return driver
- class SelGTrend:
- def __init__(self):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/gtrends?charset=utf8mb4')
- self.table=db['gtrend_jsraw']
- self.yt=False
- self.texts = []
- self.links = []
- self.results = []
- self.user_agent = 'Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:64.0) Gecko/20100101 Firefox/64.0'
- self.headers = {'User-Agent': self.user_agent}
- def search(self, key):
- self.original=key
- self.key = "+".join(key.split(" "))
- return self.getpage(self.key)
- def getpage(self,query):
- global geo
- driver=None
- result=[]
- import urllib.parse
- safe_string = urllib.parse.quote_plus(query)
- if self.yt:
- self.url = 'https://trends.google.com/trends/explore?date=now%207-d&geo='+geo+'&gprop=youtube&q='+safe_string
- else:
- self.url = 'https://trends.google.com/trends/explore?date=now%207-d&geo='+geo+'&q='+safe_string
- try:
- print(self.url)
- driver=init_webdriver()
- # driver.add_script('const setProperty = () => { Object.defineProperty(navigator, "webdriver", { get: () => false, }); }; setProperty();')
- driver.get(self.url)
- time.sleep(3)
- driver.refresh()
- time.sleep(4)
- ub = driver.find_element_by_css_selector('body')
- for i in range(9):
- ub.send_keys(Keys.PAGE_DOWN)
- time.sleep(0.5)
- # time.sleep(4)
- # driver.save_screenshot("/tmp/screenshot.png")
- for request in driver.requests:
- print(request.url[0:60])
- if request.response:
- if 'relatedsearches?' in request.url :
- print('*** parsing js:')
- resp=request.response.body
- data=None
- try:
- data = gzip.decompress(resp)
- except:
- traceback.print_exc()
- data=resp
-
- jstext=data.decode('utf-8')
- print(jstext)
- jsobj=json.loads(jstext[6:])
- jsobj=jsobj['default']['rankedList']
- self.table.insert({'kw':self.original,'dt':datetime.datetime.now(),'json':json.dumps(jsobj, ensure_ascii=False).encode('utf8')})
- print(jsobj)
- except Exception as e:
- traceback.print_exc()
- print(e)
- pass
- driver.quit()
- return result
- # driver.quit()
- def result(self):
- return self.results
- def gettext(self):
- return self.texts
- def getlinks(self):
- return self.links
- def clear(self):
- self.texts = []
- self.links = []
- self.results = []
- #sgtrend=SelGTrend()
- #data=sgtrend.search('居家')
- #data=sgtrend.search('7-11 當機')
|