|
@@ -0,0 +1,86 @@
|
|
|
+# Selenium
|
|
|
+
|
|
|
+```
|
|
|
+from datetime import datetime
|
|
|
+import os
|
|
|
+import sys
|
|
|
+from selenium import webdriver
|
|
|
+from selenium.webdriver.common.by import By
|
|
|
+from selenium.webdriver.support.ui import WebDriverWait, Select
|
|
|
+from selenium.webdriver.support import expected_conditions as EC
|
|
|
+from selenium.webdriver.common.keys import Keys
|
|
|
+from selenium.webdriver.remote.webdriver import WebDriver
|
|
|
+import time
|
|
|
+import rpyc
|
|
|
+from rpyc.utils.server import ThreadedServer # or ForkingServer
|
|
|
+from bs4 import BeautifulSoup
|
|
|
+import json
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def send(driver, cmd, params={}):
|
|
|
+ resource = "/session/%s/chromium/send_command_and_get_result" % driver.session_id
|
|
|
+ url = driver.command_executor._url + resource
|
|
|
+ body = json.dumps({'cmd': cmd, 'params': params})
|
|
|
+ response = driver.command_executor._request('POST', url, body)
|
|
|
+# if response['status']:
|
|
|
+# raise Exception(response.get('value'))
|
|
|
+ return response.get('value')
|
|
|
+
|
|
|
+def add_script(driver, script):
|
|
|
+ send(driver, "Page.addScriptToEvaluateOnNewDocument", {"source": script})
|
|
|
+
|
|
|
+def init_webdriver():
|
|
|
+ WebDriver.add_script = add_script
|
|
|
+ dir_path = os.path.dirname(os.path.realpath(__file__))
|
|
|
+ options = webdriver.ChromeOptions()
|
|
|
+
|
|
|
+# options.add_argument("--user-data-dir=/home/jared/.config/google-chrome/")
|
|
|
+# options.add_argument('--profile-directory="Profile 1"')
|
|
|
+ options.add_argument('--disable-web-security')
|
|
|
+ options.add_argument('--allow-running-insecure-content')
|
|
|
+ options.add_argument('--disable-blink-features=AutomationControlled')
|
|
|
+
|
|
|
+ driver = webdriver.Chrome( chrome_options=options)
|
|
|
+ return driver
|
|
|
+
|
|
|
+#global driver
|
|
|
+class MyService(rpyc.Service):
|
|
|
+ def process(self,url):
|
|
|
+ self.driver.add_script('const setProperty = () => { Object.defineProperty(navigator, "webdriver", { get: () => false, }); }; setProperty();')
|
|
|
+ print('add url.............')
|
|
|
+ print(self.driver)
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.driver = None
|
|
|
+ try:
|
|
|
+ self.driver = init_webdriver()
|
|
|
+ print(self.driver )
|
|
|
+ except Exception as e:
|
|
|
+ raise e
|
|
|
+ finally:
|
|
|
+ True
|
|
|
+ pass
|
|
|
+
|
|
|
+ def exposed_get_url(self,url):
|
|
|
+ self.driver.add_script('const setProperty = () => { Object.defineProperty(navigator, "webdriver", { get: () => false, }); }; setProperty();')
|
|
|
+ print('add url.............')
|
|
|
+ print(self.driver)
|
|
|
+ self.driver.get(url)
|
|
|
+
|
|
|
+# txt=self.driver.find_element_by_xpath("//div[@id='json']").text
|
|
|
+ print(self.driver.page_source)
|
|
|
+ soup = BeautifulSoup(self.driver.page_source,features="lxml")
|
|
|
+ dict_from_json = json.loads(soup.find("body").text)
|
|
|
+ return dict_from_json
|
|
|
+# return json.loads(self.driver.page_source)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ server = ThreadedServer(MyService(), port = 12345,protocol_config={
|
|
|
+ 'allow_public_attrs': True,
|
|
|
+})
|
|
|
+ server.start()
|
|
|
+
|
|
|
+```
|