# -*- coding: utf-8 -*- from selenium import webdriver from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By import traceback from bs4 import BeautifulSoup #import datetime import pysnooper from utility import database_access as DA from utility.parseutils import * from utility.connect import * from datetime import datetime import dataset import pandas as pd import time import json import re # import pyautogui as pag def serive_create(profilepath): option = webdriver.ChromeOptions() option.add_argument('--disable-web-security') option.add_argument('--allow-running-insecure-content') # option.add_argument("--user-data-dir=C:\\Users\\noodles\\AppData\\Local\\Google\\Chrome\\User Data") # option.add_argument("profile-directory="+profilepath) # driver = webdriver.Chrome('./utility/chromedriver_20211103/chromedriver', options=option) driver = webdriver.Chrome( options=option) executor_url = driver.command_executor._url session_id = driver.session_id print (session_id) print (executor_url) time.sleep(3) return driver def brower_start(): options = webdriver.ChromeOptions() browser = webdriver.Remote( command_executor='http://192.53.174.202:4444/wd/hub', desired_capabilities=options.to_capabilities() ) return browser def keyin_keyword(driver, keyword): button = driver.find_element_by_id("searchbox") driver.implicitly_wait(30) ActionChains(driver).move_to_element(button).send_keys(keyword).send_keys(Keys.RETURN).perform() time.sleep(3) # element = driver.find_element_by_class_name("V0h1Ob-haAclf") # driver.implicitly_wait(30) # ActionChains(driver).move_to_element(element).click(element).perform() def open_time(driver): element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[9]/div[2]') if element.text.find('預訂') == -1: element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[9]/div[2]') driver.implicitly_wait(20) ActionChains(driver).move_to_element(element).click(element).perform() return 1 else: return 0 def get_shop_info(driver, output, shop_soup): # print(datetime.now()) current_url_split = driver.current_url.split('@')[1].split(',') output['lon'] = current_url_split[1] output['lat'] = current_url_split[0] location = shop_soup.find('button',{'data-item-id':'oloc'})['aria-label'].split(' ') output['city'] = location[-1] output['area'] = location[-2] output['addr'] = shop_soup.find('button',{'data-item-id':'address'})['aria-label'].replace('地址:', '') output['tel'] = blank_check(shop_soup.find('button',{'data-tooltip':'複製電話號碼'})['aria-label'].split(':')[1]) # print(output['addr'], ', ' ,output['tel']) print(output) # for key in element_list: # element = element_list[key] # if len(element) == 3: # value = shop_soup.find(element[0],element[1])[element[2]] # else: # tmp_value = shop_soup.find(element[0],element[1]) # if tmp_value: # value = tmp_value.text # else: # value = '' # output[key] = value_check(key, value) return output def get_intro_info(driver, output): print(datetime.now()) element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[6]') print(element) driver.implicitly_wait(20) print(datetime.now()) ActionChains(driver).move_to_element(element).click(element).perform() for i in range(5, 35, 3): try: element = driver.find_element(By.XPATH,'//*[@id="pane"]/div/div[1]/div/div/div[2]/div[{}]'.format(i)) print(element) actions = ActionChains(driver) actions.move_to_element(element).perform() except: break print(datetime.now()) intro_soup = BeautifulSoup(driver.page_source, 'html.parser') for key in intro_list: elements = intro_soup.find('div',{'aria-label':key}) if elements: element = elements.find_all('li',{'class':'LQjNnc-p83tee-JNdkSc-ibnC6b'}) print('element') print(element) count = 0 tmp = [] for ele in element: # print(ele.) print(ele.prettify()) # if ele.find('img',{'src':"//www.gstatic.com/images/icons/material/system_gm/2x/check_black_18dp.png"}): if ele.find('img',{'src':"//www.gstatic.com/images/icons/material/system_gm/1x/check_black_18dp.png"}): tmp += [{ 'id':count, intro_list[key][1]: blank_check(ele.text) }] count += 1 print(str(tmp)) output[intro_list[key][0]] = str(tmp) else: output[intro_list[key][0]] = '[]' print(datetime.now()) # time.sleep(9999) driver.back() return output def get_time_list(shop_soup, output): periods = [] weekday_text = [] open_now = blank_check(shop_soup.find('span', {'class':'LJKBpe-Tswv1b-hour-text'}).text.split('\xa0')[0]) if open_now == '永久停業' or open_now == '暫時關閉': output['open_now'] = 'False' else: output['open_now'] = 'True' for tr_ in shop_soup.find_all('tr'): if tr_.find('div').text.replace(' ','') != '': week = tr_.find('div').text time_list = [blank_check(i.text) for i in tr_.find_all('li')] for time_ in time_list: if time_ == '24 小時營業': periods += [{ "open":{ "day": week_list[week], "time": 0000 }, "close":{ "day": week_list[week], "time": '' } }] elif time_ == '休息': periods += [{ "open":{ "day": week_list[week], "time": '' }, "close":{ "day": week_list[week], "time": '' } }] else: start, end = time_.split('–') end_hour, end_min = end.split(':') start_hour, start_min = start.split(':') if end_hour < start_hour: end_day = week_list[week] + 1 else: end_day = week_list[week] periods += [{ "open":{ "day": week_list[week], "time": start.replace(':','') }, "close":{ "day": end_day, "time": end.replace(':','') } }] weekday_text += ["{}: {}".format(week, ', '.join(time_list))] output['periods'] = str(periods) output['weekday_text'] = str(weekday_text) return output @pysnooper.snoop() def get_reviews(driver, output): wait = WebDriverWait(driver, 30) more_reviews_css = "button[jsaction='pane.rating.moreReviews']" wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, more_reviews_css)) ) element = driver.find_element_by_css_selector(more_reviews_css) driver.implicitly_wait(20) ActionChains(driver).move_to_element(element).click(element).perform() time.sleep(1) all_photo = driver.find_elements_by_class_name('ODSEW-ShBeI-xJzy8c-bF1uUb') for ap in all_photo: ap.click() all_review = driver.find_elements_by_css_selector('button[aria-label="顯示更多"') for ap in all_review: ap.click() comment_soup = BeautifulSoup(driver.page_source, 'html.parser') count = 0 reviews = [] for comment in comment_soup.find_all('div',{'class':'ODSEW-ShBeI'}): comment_a_tag = comment.find_all('a') author_name = blank_check(comment_a_tag[1].find('div', class_= 'ODSEW-ShBeI-title').text) profile_photo_url = comment_a_tag[0].find('img')['src'] rating = blank_check(comment.find('span',{'role':'img'})['aria-label'].replace('顆星', '')) text = comment.find('div', class_='ODSEW-ShBeI-ShBeI-content').text created_at = comment.find('span', class_='ODSEW-ShBeI-RgZmSc-date').text photos = [] c = 0 for i in comment.find_all('button', class_='ODSEW-ShBeI-xJzy8c'): path = i['style'].split(';')[0].split('url')[1].replace('\"','').replace('(','').replace(')','') photos += [path] c += 1 reviews += [{ 'id': comment.find('a')['href'].split('/')[5], 'author_name': author_name, 'profile_photo_url': profile_photo_url, 'rating': int(rating), 'text': text, 'created_at': created_at, 'photos': photos }] count += 1 output['reviews'] = str(reviews) driver.back() return output # def get_photo(output, shop_soup): # shop_photo = {} # for i in shop_soup.find('div',{'aria-label':'{}的相片'.format(output['name'])}).find_all('button'): # try: # if i['aria-label'] == '街景服務和 360 度相片' or i['aria-label'] == '影片': # continue # shop_photo[i['aria-label']] = i.find('img')['src'] # except: # pass # output['shop_photo'] = shop_photo # return output def find_photo_list(driver): time.sleep(2) wait = WebDriverWait(driver, 60) wait.until( EC.element_to_be_clickable((By.XPATH, '//*[@id="pane"]/div/div[1]/div/div/div[3]/div[1]/div[1]/div/a')) ) count_list = [] for i in range(1, 6): try: element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[3]/div[1]/div[{}]/div/a'.format(i)) count_list += [element.get_attribute('data-photo-index')] actions = ActionChains(driver) actions.move_to_element(element).perform() except: break time.sleep(1) photo_soup = BeautifulSoup(driver.page_source, 'html.parser') photo_url = [] for photo_id in count_list: for i in photo_soup.select('a[data-photo-index="{}"]'.format(photo_id))[0].find_all('div'): if i['style'].find('width') != -1: sentence = i['style'] photo = re.search(r'https:(.*)\"', sentence) print(sentence) photo_url += [photo.group(0).replace('\"','')] break return photo_url def find_big_photo(output, driver): # element = driver.find_element(By.CSS_SELECTOR, "div[aria-label='{}的相片']".format(output['name'])) try: element = driver.find_element(By.CSS_SELECTOR,"div[class='F8J9Nb-LfntMc-header-HiaYvf-LfntMc-haAclf d8bJN-LfntMc-HiaYvf']") ActionChains(driver).move_to_element(element).click(element).perform() except: traceback.print_exc() time.sleep(9999) output['shop_photo'] = '[]' output['menu_photo'] = '[]' photo_map = { '全部': 'shop_photo', '菜單': 'menu_photo' } tab_dict = {} for tab_index in [0, 1, 2]: photo_name = driver.find_element(By.CSS_SELECTOR, "button[data-tab-index='{}']".format(tab_index)).text if photo_name == '菜單': tab_dict[photo_name] = tab_index elif photo_name == '全部': tab_dict[photo_name] = tab_index print(tab_dict) for tab_ in tab_dict: tab_index = tab_dict[tab_] print(tab_index) wait = WebDriverWait(driver, 60) wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-tab-index='{}']".format(tab_index))) ) element = driver.find_element(By.CSS_SELECTOR, "button[data-tab-index='{}']".format(tab_index)) ActionChains(driver).move_to_element(element).click(element).perform() photo_list = find_photo_list(driver) output[photo_map[tab_]] = str(photo_list) return output def get_url_list(driver): # wait = WebDriverWait(driver, 10) # wait.until( # EC.element_to_be_clickable((By.XPATH, '//*[@id="sGi9mc-m5SR9c-bottom-pane"]/div/div[1]/div/div/div/div[1]/div[2]/div[2]')) # ) # driver.back() time.sleep(2) for i in range(5, 43, 2): driver.find_element(By.XPATH,'//*[@id="pane"]/div/div[1]/div/div/div[2]/div[1]/div[{}]/div/a'.format(i)).send_keys(Keys.DOWN) url_soup = BeautifulSoup(driver.page_source, 'html.parser') url_list = [] for i in url_soup.find_all('a'): try: if i['href'].find('maps/place') != -1: url_list += [[i['href'], i['aria-label']]] except: pass return url_list def data_select_insert(db, table_name, table_col, data): tmp = [] for name_ in table_col: if name_ == 'crawler_date': continue if name_ == 'lon' or name_ == 'lat': tmp += [float(data[name_])] else: tmp += [data[name_]] tmp += [datetime.today().strftime("%Y/%m/%d %H:%M")] insert_sql = """INSERT IGNORE INTO {}{} VALUES {}"""\ .format(table_name, str(tuple(table_col)).replace('\'',''), tuple(tmp)) DA.mysql_insert_data(db, insert_sql) def time_click(driver): status = '' try: print('calling time_click') time_css = "span[aria-label='顯示本週營業時間']" element = driver.find_element_by_css_selector(time_css) driver.implicitly_wait(30) ActionChains(driver).move_to_element(element).click(element).perform() status = '正常' print('status ok') except NoSuchElementException: time_css = "div[aria-expanded='false']" elem = driver.find_element_by_css_selector(time_css) if elem: status = '暫時關閉' return status def get_not_cralwer_url(keyword): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/google_poi?charset=utf8mb4') table = db['shop_item_list'] url_list = list(table.find(keyword=keyword)) shop_item = [i['item_url'] for i in db.query('SELECT item_url FROM shop_list where keyword="{}"'.format(keyword))] error_item = [i['item_url'] for i in db.query('SELECT item_url FROM error_list where keyword="{}"'.format(keyword))] url_pd = pd.DataFrame(url_list, columns=url_list[0].keys()) url_pd['item_url_length'] = url_pd.item_url.apply(lambda x: len(x)) url_pd = url_pd[url_pd['item_url_length']!=1000] url_pd = url_pd[~url_pd['item_url'].isin(shop_item)] url_pd = url_pd[~url_pd['item_url'].isin(error_item)] print('have {} URL list'.format(len(url_pd))) # url_list = pd.read_csv('result/shop_item_list_20211210.csv', index_col=0) return url_pd def serive_create_linux(profilepath): option = webdriver.ChromeOptions() # option.add_argument('--headless') option.add_argument('--no-sandbox') option.add_argument('--disable-web-security') option.add_argument('--allow-running-insecure-content') option.add_argument('--incognito') option.add_argument( 'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:65.0) Gecko/20100101 Firefox/65.0') # option.add_argument("--user-data-dir=C:\\Users\\noodles\\AppData\\Local\\Google\\Chrome\\User Data") # option.add_argument( # "--user-data-dir=/home/noodlesloves/.config/google-chrome/") # option.add_argument("profile-directory="+profilepath) # driver = webdriver.Chrome('utility/chromedriver', options=option) driver = webdriver.Chrome( options=option) # driver = webdriver.Chrome(executable_path='/usr/bin/chromedriver', chrome_options=option, # service_args=['--verbose', '--log-path=/tmp/chromedriver.log']) executor_url = driver.command_executor._url session_id = driver.session_id print(session_id) print(executor_url) return driver def main(): keyword = '咖啡' db = DA.mysql_connect(MYSQL_CONFIG, DB_NAME) url_pd = get_not_cralwer_url(keyword) print('drvier start...') # driver = brower_start() driver = serive_create('Profile 1') # profilepath = 'Profile 1' # driver = serive_create_linux(profilepath) for key, row in url_pd.iterrows(): try: name = row['name'] item_url = row['item_url'] print(key, name, ': ' ,item_url) driver.get(item_url) for i in range(4, 26, 2): element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[{}]'.format(i)) actions = ActionChains(driver) actions.move_to_element(element).perform() time.sleep(0.5) print('start...') time_status = time_click(driver) time.sleep(2) shop_soup = BeautifulSoup(driver.page_source, 'html.parser') print('after bs4') output = { 'name': blank_check(shop_soup.find('h1', class_='x3AX1-LfntMc-header-title-title').text) } print(output['name']) try: output = get_shop_info(driver, output, shop_soup) except: traceback.print_exc() print('get_shop_info') output = get_intro_info(driver, output) print('get_intro_info') output = get_time_list(shop_soup, output) print('get_time_list') output = get_reviews(driver, output) print('get_reviews') output = find_big_photo(output, driver) print('find_big_photo') output_name = output['name'].replace('(','').replace(')', '') query_name = '{}+{}'.format(output_name, output['addr']) query_name = query_name.replace(' ','') output['item_url'] = item_url output['keyword'] = keyword output['google_url'] = 'https://www.google.com.tw/search?q={}'.format(query_name) data_select_insert(db, SHOP_LIST_TABLE, SHOP_LIST_TABLE_COL, output) except: error_table_col = ['name', 'lon', 'lat', 'keyword', 'item_url', 'crawler_date'] data_select_insert(db, 'error_list', error_table_col, row) driver.close() driver = brower_start() # driver = serive_create_linux(profilepath) if __name__ == '__main__': main()