| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 | from datetime import datetimeimport osimport sysfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWait, Selectfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.webdriver.common.keys import Keysfrom selenium.webdriver.remote.webdriver import WebDriverimport timeimport rpycfrom rpyc.utils.server import ThreadedServer # or ForkingServerfrom bs4 import BeautifulSoupimport jsondef send(driver, cmd, params={}):    resource = "/session/%s/chromium/send_command_and_get_result" % driver.session_id    url = driver.command_executor._url + resource    body = json.dumps({'cmd': cmd, 'params': params})    response = driver.command_executor._request('POST', url, body)#    if response['status']:#        raise Exception(response.get('value'))    return response.get('value')def add_script(driver, script):    send(driver, "Page.addScriptToEvaluateOnNewDocument", {"source": script})def init_webdriver():    WebDriver.add_script = add_script    dir_path = os.path.dirname(os.path.realpath(__file__))    options = webdriver.ChromeOptions()#    options.add_argument("--user-data-dir=/home/jared/.config/google-chrome/")#    options.add_argument('--profile-directory="Profile 1"')    options.add_argument('--disable-web-security')     options.add_argument('--allow-running-insecure-content')    options.add_argument('--disable-blink-features=AutomationControlled')    driver = webdriver.Chrome(  chrome_options=options)    return driver#global driverclass MyService(rpyc.Service):    def process(self,url):        self.driver.add_script('const setProperty = () => {     Object.defineProperty(navigator, "webdriver", {       get: () => false,     }); }; setProperty();')        print('add url.............')        print(self.driver)    def __init__(self):        self.driver = None        try:            self.driver = init_webdriver()            print(self.driver )        except Exception as e:            raise e        finally:            True        pass        def exposed_get_url(self,url):        self.driver.add_script('const setProperty = () => {     Object.defineProperty(navigator, "webdriver", {       get: () => false,     }); }; setProperty();')        print('add url.............')        print(self.driver)        self.driver.get(url)        #        txt=self.driver.find_element_by_xpath("//div[@id='json']").text        print(self.driver.page_source)        soup = BeautifulSoup(self.driver.page_source,features="lxml")        dict_from_json = json.loads(soup.find("body").text)        return dict_from_json#        return json.loads(self.driver.page_source)if __name__ == "__main__":    server = ThreadedServer(MyService(), port = 12345,protocol_config={    'allow_public_attrs': True,})    server.start()
 |