import os import sys from datetime import datetime from time import sleep from selenium import webdriver from selenium.common.exceptions import TimeoutException from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from threading import Thread from queue import Queue import argparse #chrome_driver_executable_path = '' #if getattr(sys, 'frozen', False): # # publish one file # chrome_driver_executable_path = sys._MEIPASS + r'\resource\chromedriver.exe' #else: # source # chrome_driver_executable_path = os.getcwd() + r'\resource\chromedriver.exe' capa = DesiredCapabilities.CHROME capa["pageLoadStrategy"] = "none" capa['loggingPrefs'] = {'driver': 'OFF', 'server': 'OFF', 'browser': 'OFF'} attempt = 0 successful_attempt = 0 final_sleep = 0 timeout_ex = 0 other_ex = 0 recaptcha_ex = 0 def main_script(keyword, site_url, max_successful_clicks, browser_visibility_flag): global attempt global successful_attempt global final_sleep global timeout_ex global other_ex global recaptcha_ex while successful_attempt < max_successful_clicks: options = Options() if not browser_visibility_flag: options.add_argument('--headless') options.add_argument("--js-flags=--expose-gc") options.add_argument("--enable-precise-memory-info") options.add_argument("--disable-popup-blocking") options.add_argument("--disable-default-apps") options.add_argument("disable-infobars") options.add_argument('--disable-contextual-search') options.add_argument("--disable-notifications") options.add_argument('--incognito') options.add_argument('--disable-application-cache') options.add_argument('--no-sandbox') options.add_argument('--disk-cache-size=0') options.add_argument('–-disable-restore-session-state') options.add_argument('--disable-extensions') options.add_argument('test-type') options.add_argument('--silent') options.add_argument('--log-level=3') options.add_argument("--proxy-server=socks5://127.0.0.1:9050") driver = None try: driver = webdriver.Chrome( # executable_path=chrome_driver_executable_path, chrome_options=options, desired_capabilities=capa) wait = WebDriverWait(driver, 40) driver.implicitly_wait(10) driver.set_page_load_timeout(80) # driver.get('http://google.com/search?q=' + urllib.parse.urlencode(keyword)) # does not work driver.get('http://google.com/') wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'input[name="q"]'))) driver.execute_script("window.stop();") driver.execute_script("window.stop();") attempt += 1 elm = driver.find_element_by_css_selector('input[name="q"]') elm.send_keys(keyword) elm.send_keys(Keys.RETURN) sleep(3) recaptcha = None try: recaptcha = driver.find_elements_by_css_selector('#recaptcha') except: pass if not recaptcha: wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '#res'))) driver.execute_script("window.stop();") site_links = driver.find_elements_by_css_selector('#res a') site_links_filtered = filter(lambda x: site_url in x.get_attribute('href'), site_links) site_link = next(site_links_filtered) site_link.click() wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'body'))) sleep(10) elm = driver.find_element_by_css_selector('body') elm.send_keys(Keys.RETURN) successful_attempt += 1 final_sleep = 240 # staying in the website else: driver.execute_script("window.stop();") recaptcha_ex += 1 final_sleep = 0 except TimeoutException as ex: final_sleep = 0 timeout_ex += 1 except Exception as ex: final_sleep = 0 other_ex += 1 else: info_message = '{}\t|\tprocess pid: {: <6}\t|\t{: >5} attempt\t|\t{: >5} successful click\t|\t{: >5} timeout\t|\t {: >5} recaptcha \t|\t{: >5} error'.format( datetime.now().strftime('%Y/%m/%d %H:%M:%S'), os.getpid(), attempt, successful_attempt, timeout_ex, recaptcha_ex, other_ex ) print(info_message) f = open('log.txt', 'a') f.write(info_message + '\n') f.close() finally: sleep(final_sleep) if driver: driver.close() del driver print() print('process pid: {} | finished {} successful clicks! \nprocess stopped...'.format(os.getpid(), max_successful_clicks)) f = open('log.txt', 'a') f.write('process pid: {} | finished {} successful clicks! \nprocess stopped...'.format(os.getpid(), max_successful_clicks) + '\n') f.close() print() class WorkerThread(Thread): def __init__(self, q, keyword, site_url, max_successful_clicks, browser_visibility_flag): super().__init__() self.setDaemon(True) self.q = q self.keyword = keyword self.site_url = site_url self.max_successful_clicks = max_successful_clicks self.browser_visibility_flag = browser_visibility_flag def run(self): while True: print() print('automated searching started...') print() main_script(self.keyword, self.site_url, self.max_successful_clicks, self.browser_visibility_flag) self.q.task_done() #parser = argparse.ArgumentParser(prog='python automate_search_google.py', # description='A script to automatically search keywords on google and click on your desired website link as much as you want.') #parser.add_argument('k', help='list of keywords to search in format of "[\'keyword 1\',\'keyword 2\']"') #parser.add_argument('u', help='Url of your desired website to be clicked without http and www. eg: example.com') #parser.add_argument('m', type=int, help='maximum successful click count you want. eg: 500') #parser.add_argument('-v', '--visible', action='store_true', # help='browser visibility flag. if present the browser become visible') #args = parser.parse_args() #keywords = eval(args.k) keywords=['幸福空間'] message = ' please start tor browser first '.upper() print() message_str = '#' * 10 + message + '#' * 10 print('#' * len(message_str)) print('#' * len(message_str)) print('#' * 10 + ' ' * len(message) + '#' * 10) print(message_str) print('#' * 10 + ' ' * len(message) + '#' * 10) print('#' * len(message_str)) print('#' * len(message_str)) print() my_queue = Queue() for key in keywords: worker = WorkerThread(my_queue, key, 'hhh.com.tw', 1, 1) worker.start() for i in keywords: my_queue.put(i) sleep(1) my_queue.join() c = input('Press Enter to exit program... ') exit(0)