#import urllib.request import urllib import requests import traceback from bs4 import BeautifulSoup import json import os import time import sys import random from seleniumwire import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait, Select from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.keys import Keys from selenium.webdriver.remote.webdriver import WebDriver import dataset import docker import datetime import gzip #from fp.fp import FreeProxy def send(driver, cmd, params={}): resource = "/session/%s/chromium/send_command_and_get_result" % driver.session_id url = driver.command_executor._url + resource body = json.dumps({'cmd': cmd, 'params': params}) response = driver.command_executor._request('POST', url, body) # if response['status']: # raise Exception(response.get('value')) return response.get('value') def add_script(driver, script): send(driver, "Page.addScriptToEvaluateOnNewDocument", {"source": script}) def set_viewport_size(driver, width, height): window_size = driver.execute_script(""" return [window.outerWidth - window.innerWidth + arguments[0], window.outerHeight - window.innerHeight + arguments[1]]; """, width, height) driver.set_window_size(*window_size) def init_webdriver(): # client = docker.from_env() # ls=client.containers.list() # print(ls) # ls[0].restart() # time.sleep(11) options = webdriver.ChromeOptions() options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--headless") options.add_argument("--incognito") driver = webdriver.Chrome(options=options) # driver = webdriver.Remote( # command_executor='http://127.0.0.1:4444/wd/hub', # desired_capabilities=options.to_capabilities()) return driver class SelGTrend: def __init__(self): self.texts = [] self.links = [] self.results = [] self.user_agent = 'Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:64.0) Gecko/20100101 Firefox/64.0' self.headers = {'User-Agent': self.user_agent} # self.proxy = FreeProxy().get() def search(self, key): self.key = "+".join(key.split(" ")) self.getpage() def getpage(self, geo): result=[] self.url = 'https://trends.google.com/trends/trendingsearches/daily?geo='+geo try: print(self.url) driver=init_webdriver() # driver.add_script('const setProperty = () => { Object.defineProperty(navigator, "webdriver", { get: () => false, }); }; setProperty();') driver.get(self.url) time.sleep(5) for request in driver.requests: print(request.url[0:60]) if request.response: if 'dailytrends?' in request.url : print('*** parsing js:') resp=request.response.body data=None try: data = gzip.decompress(resp) except: traceback.print_exc() data=resp jstext=data.decode('utf-8') # print(jstext) jsobj=json.loads(jstext[6:]) return jsobj # print(jsobj) except Exception as e: traceback.print_exc() print(e) pass # driver.quit() return result # driver.quit() def result(self): return self.results def gettext(self): return self.texts def getlinks(self): return self.links def clear(self): self.texts = [] self.links = [] self.results = [] def save_to_db(js): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/gtrends?charset=utf8mb4') table = db['trending_search_json'] js=js['default']['trendingSearchesDays'][0]['trendingSearches'] try: table.insert({'dt':datetime.datetime.now(),'json':json.dumps(js, ensure_ascii=False).encode('utf8')}) except: print('dup') # for j in json: # print(j['title']) ## print(j['formattedTraffic']) # print(j['relatedQueries']) # if j.get('source') is not None: # print(j['source']) ## print(json.dumps(j['image'])) # print(j['snippet']) # print(j) # table.insert(j) geo='TW' sgtrend=SelGTrend() result=sgtrend.getpage(geo) #print(result) save_to_db(result) #time.sleep(9999)