# -*- coding: utf-8 -*- from selenium import webdriver from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By from bs4 import BeautifulSoup from utility import database_access as DA from utility.parseutils import * from utility.connect import * from datetime import datetime import traceback import dataset import pandas as pd import time import json import re import sys import os # import pyautogui as pag def serive_create(profilepath): option = webdriver.ChromeOptions() option.add_argument('--disable-web-security') option.add_argument('--allow-running-insecure-content') option.add_argument("--user-data-dir=C:\\Users\\user\\AppData\\Local\\Google\\Chrome\\User Data") option.add_argument("profile-directory="+profilepath) driver = webdriver.Chrome('./utility/chromedriver_win32/chromedriver', options=option) executor_url = driver.command_executor._url session_id = driver.session_id print (session_id) print (executor_url) time.sleep(3) return driver def brower_start(port): options = webdriver.ChromeOptions() # browser = webdriver.Chrome(options=options) browser = webdriver.Chrome(options=options) # browser = webdriver.Remote( # command_executor='http://127.0.0.1:'+str(port)+'/wd/hub', # # command_executor='http://192.53.174.202:'+str(port)+'/wd/hub', # desired_capabilities=options.to_capabilities() # ) return browser def keyin_keyword(driver, keyword): button = driver.find_element_by_id("searchbox") driver.implicitly_wait(30) ActionChains(driver).move_to_element(button).send_keys(keyword).send_keys(Keys.RETURN).perform() time.sleep(3) # element = driver.find_element_by_class_name("V0h1Ob-haAclf") # driver.implicitly_wait(30) # ActionChains(driver).move_to_element(element).click(element).perform() def open_time(driver): element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[9]/div[2]') if element.text.find('預訂') == -1: element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[9]/div[2]') driver.implicitly_wait(10) ActionChains(driver).move_to_element(element).click(element).perform() return 1 else: return 0 def get_shop_info(driver, output, shop_soup): # current_url_split = driver.current_url.split('@')[1].split(',') # output['lon'] = current_url_split[1] # output['lat'] = current_url_split[0] location = shop_soup.find('button',{'data-item-id':'oloc'})['aria-label'].split(' ') output['city'] = location[-1] output['area'] = location[-2] try: output['addr'] = shop_soup.find('button',{'data-item-id':'address'})['aria-label'].replace('地址:', '') except: output['addr'] = '' try: output['tel'] = blank_check(shop_soup.find('button',{'data-tooltip':'複製電話號碼'})['aria-label'].split(':')[1]) except: output['tel'] = '' print(output['addr'], ', ' ,output['tel']) for key in element_list: try: element = element_list[key] if len(element) == 3: value = shop_soup.find(element[0],element[1])[element[2]] else: tmp_value = shop_soup.find(element[0],element[1]) if tmp_value: value = tmp_value.text else: value = '' output[key] = value_check(key, value) except: output[key] = '' return output def get_intro_info(driver, output): # element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[6]') try: element = driver.find_element(By.CSS_SELECTOR, "div[aria-label='{}簡介']".format(output['name'])) driver.implicitly_wait(5) ActionChains(driver).move_to_element(element).click(element).perform() # pageSource = driver.page_source # fileToWrite = open("page_source.html", "w") # fileToWrite.write(pageSource) # fileToWrite.close() page_down_(driver, '//*[@id="pane"]/div/div[1]', 3) intro_soup = BeautifulSoup(driver.page_source, 'html.parser') for key in intro_list: elements = intro_soup.find('div',{'aria-label':key}) if elements: element = elements.find_all('li',{'class':'LQjNnc-p83tee-JNdkSc-ibnC6b'}) count = 0 tmp = [] for ele in element: # if ele.find('img',{'src':"//www.gstatic.com/images/icons/material/system_gm/2x/check_black_18dp.png"}): if ele.find('img',{'src':"//www.gstatic.com/images/icons/material/system_gm/1x/check_black_18dp.png"}): tmp += [{ 'id':count, intro_list[key][1]: blank_check(ele.text) }] count += 1 print(str(tmp)) output[intro_list[key][0]] = str(tmp) else: output[intro_list[key][0]] = '[]' driver.back() return output except: for key in intro_list: output[intro_list[key][0]] = '[]' return output def get_time_list(shop_soup, output): periods = [] weekday_text = [] open_now = blank_check(shop_soup.find('span', {'class':'LJKBpe-Tswv1b-hour-text'}).text.split('\xa0')[0]) if open_now == '永久停業' or open_now == '暫時關閉': output['open_now'] = 'False' else: output['open_now'] = 'True' for tr_ in shop_soup.find_all('tr'): if tr_.find('div').text.replace(' ','') != '': week = tr_.find('div').text time_list = [blank_check(i.text) for i in tr_.find_all('li')] for time_ in time_list: if time_ == '24 小時營業': periods += [{ "open":{ "day": week_list[week], "time": 0000 }, "close":{ "day": week_list[week], "time": '' } }] elif time_ == '休息': periods += [{ "open":{ "day": week_list[week], "time": '' }, "close":{ "day": week_list[week], "time": '' } }] else: start, end = time_.split('–') end_hour, end_min = end.split(':') start_hour, start_min = start.split(':') if end_hour < start_hour: end_day = week_list[week] + 1 else: end_day = week_list[week] periods += [{ "open":{ "day": week_list[week], "time": start.replace(':','') }, "close":{ "day": end_day, "time": end.replace(':','') } }] weekday_text += ["{}: {}".format(week, ', '.join(time_list))] output['periods'] = str(periods) output['weekday_text'] = str(weekday_text) return output def get_reviews(driver, output): wait = WebDriverWait(driver, 30) more_reviews_css = "button[jsaction='pane.rating.moreReviews']" wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, more_reviews_css)) ) element = driver.find_element_by_css_selector(more_reviews_css) driver.implicitly_wait(10) ActionChains(driver).move_to_element(element).click(element).perform() time.sleep(0.5) # page_down_(driver, '//*[@id="pane"]/div/div[1]/div/div/div[2]/div[1]', 5) page_down_(driver, '//div[@class="PPCwl"]',5) comment_soup = BeautifulSoup(driver.page_source, 'html.parser') if comment_soup.find_all('div',class_='ODSEW-ShBeI-xJzy8c-bF1uUb') != 0: all_photo = driver.find_elements_by_class_name('ODSEW-ShBeI-xJzy8c-bF1uUb') for ap in all_photo: ap.click() if comment_soup.select('button[aria-label="顯示更多"]') != 0: all_review = driver.find_elements_by_css_selector('button[aria-label="顯示更多"]') for ap in all_review: ap.click() comment_soup = BeautifulSoup(driver.page_source, 'html.parser') count = 0 reviews = [] for comment in comment_soup.find_all('div',{'class':'ODSEW-ShBeI'}): comment_a_tag = comment.find_all('a') author_name = blank_check(comment_a_tag[1].find('div', class_= 'ODSEW-ShBeI-title').text) profile_photo_url = comment_a_tag[0].find('img')['src'] rating = blank_check(comment.find('span',{'role':'img'})['aria-label'].replace('顆星', '')) text = comment.find('div', class_='ODSEW-ShBeI-ShBeI-content').text created_at = comment.find('span', class_='ODSEW-ShBeI-RgZmSc-date').text photos = [] c = 0 for i in comment.find_all('button', class_='ODSEW-ShBeI-xJzy8c'): path = i['style'].split(';')[0].split('url')[1].replace('\"','').replace('(','').replace(')','') photos += [path] c += 1 reviews += [{ 'id': comment.find('a')['href'].split('/')[5], 'author_name': author_name, 'profile_photo_url': profile_photo_url, 'rating': int(rating), 'text': text, 'created_at': created_at, 'photos': photos }] count += 1 output['reviews'] = str(reviews) driver.back() return output # def get_photo(output, shop_soup): # shop_photo = {} # for i in shop_soup.find('div',{'aria-label':'{}的相片'.format(output['name'])}).find_all('button'): # try: # if i['aria-label'] == '街景服務和 360 度相片' or i['aria-label'] == '影片': # continue # shop_photo[i['aria-label']] = i.find('img')['src'] # except: # pass # output['shop_photo'] = shop_photo # return output def find_photo_list(driver): time.sleep(0.5) wait = WebDriverWait(driver, 60) wait.until( EC.element_to_be_clickable((By.XPATH, '//*[@id="pane"]/div/div[1]/div/div/div[3]/div[1]/div[1]/div/a')) ) page_down_(driver,'//*[@id="pane"]/div/div[1]/div/div/div[3]/div[1]/div[1]/div/a' , 10) photo_soup = BeautifulSoup(driver.page_source, 'html.parser') photo_url = [] count = 0 for i in photo_soup.find_all('a', class_='mWq4Rd-eEDwDf'): if count > 5: break a_url = i.find('div', class_='mWq4Rd-HiaYvf-CNusmb-gevUs loaded') if a_url: if a_url.find('width') != -1: sentence = a_url['style'] photo = re.search(r'https:(.*)\"', sentence) photo_url += [photo.group(0).replace('\"','')] count += 1 return photo_url def find_big_photo(output, driver): # element = driver.find_element(By.CSS_SELECTOR, "div[aria-label='{}的相片']".format(output['name'])) wait = WebDriverWait(driver, 60) wait.until( EC.element_to_be_clickable((By.XPATH, '//*[@id="pane"]/div/div[1]/div/div/div[1]/div[1]/button')) ) element = driver.find_element(By.XPATH, '//*[@id="pane"]/div/div[1]/div/div/div[1]/div[1]/button') ActionChains(driver).move_to_element(element).click(element).perform() output['shop_photo'] = '[]' output['menu_photo'] = '[]' photo_map = { '全部': 'shop_photo', '菜單': 'menu_photo' } driver.find_element(By.CSS_SELECTOR, "button[data-tab-index='1']") photo_soup = BeautifulSoup(driver.page_source, 'html.parser') tab_dict = {} for tab_index in [0, 1, 2]: selector = photo_soup.select("button[data-tab-index='{}']".format(tab_index)) if len(selector) != 0: photo_name = selector[0].text if photo_name == '菜單': tab_dict[photo_name] = tab_index elif photo_name == '全部': tab_dict[photo_name] = tab_index print(tab_dict) for tab_ in tab_dict: tab_index = tab_dict[tab_] print(tab_index) wait = WebDriverWait(driver, 60) wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-tab-index='{}']".format(tab_index))) ) element = driver.find_element(By.CSS_SELECTOR, "button[data-tab-index='{}']".format(tab_index)) ActionChains(driver).move_to_element(element).click(element).perform() photo_list = find_photo_list(driver) output[photo_map[tab_]] = str(photo_list) return output def get_url_list(driver): # wait = WebDriverWait(driver, 10) # wait.until( # EC.element_to_be_clickable((By.XPATH, '//*[@id="sGi9mc-m5SR9c-bottom-pane"]/div/div[1]/div/div/div/div[1]/div[2]/div[2]')) # ) # driver.back() time.sleep(2) for i in range(5, 43, 2): driver.find_element(By.XPATH,'//*[@id="pane"]/div/div[1]/div/div/div[2]/div[1]/div[{}]/div/a'.format(i)).send_keys(Keys.DOWN) url_soup = BeautifulSoup(driver.page_source, 'html.parser') url_list = [] for i in url_soup.find_all('a'): try: if i['href'].find('maps/place') != -1: url_list += [[i['href'], i['aria-label']]] except: pass return url_list def data_select_insert(db, table_name, table_col, data): tmp = [] for name_ in table_col: if name_ == 'crawler_date': continue if name_ == 'lon' or name_ == 'lat': tmp += [float(data[name_])] else: tmp += [data[name_]] tmp += [datetime.today().strftime("%Y/%m/%d %H:%M")] insert_sql = """INSERT IGNORE INTO {}{} VALUES {}"""\ .format(table_name, str(tuple(table_col)).replace('\'',''), tuple(tmp)) DA.mysql_insert_data(db, insert_sql) def time_click(driver): shop_soup_tmp = BeautifulSoup(driver.page_source, 'html.parser') status = '' try: if len(shop_soup_tmp.select("span[aria-label='顯示本週營業時間']")) != 0: time_css = "span[aria-label='顯示本週營業時間']" element = driver.find_element_by_css_selector(time_css) driver.implicitly_wait(10) ActionChains(driver).move_to_element(element).click(element).perform() status = '正常' elif len(shop_soup_tmp.select("img[aria-label='通知']")) != 0: status = shop_soup_tmp.find('span',class_='LJKBpe-Tswv1b-text aSftqf').text # status = '永久停業' or '暫時關閉' elif len(shop_soup_tmp.select('button[aria-label*="查看更詳細的營業時間"]')) != 0: status = 'error' return status except: return '' def get_new_keyword(db): result = db.query('select distinct(keyword) from shop_item_list order by keyword') result = pd.DataFrame([i for i in result]) progress = db.query('select distinct(kw) from progress_list2') progress = pd.DataFrame([i for i in progress]) if len(progress) != 0: keyword = result[~result['keyword'].isin(progress.kw.to_list())].iloc[0].values[0] else: keyword = result.iloc[0].values[0] return keyword def get_not_cralwer_url(keyword): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/google_poi?charset=utf8mb4') table = db['shop_item_list3'] url_list = list(table.find(keyword=keyword)) shop_item = [i['item_url'] for i in db.query('SELECT item_url FROM shop_list2 where keyword="{}"'.format(keyword))] error_item = [i['item_url'] for i in db.query('SELECT item_url FROM error_list2 where keyword="{}"'.format(keyword))] url_pd = pd.DataFrame(url_list, columns=url_list[0].keys()) # url_pd['item_url_length'] = url_pd.item_url.apply(lambda x: len(x)) # url_pd = url_pd[(url_pd['item_url_length']!=1000) & (url_pd['item_url_length']!=600)] url_pd = url_pd[~url_pd['item_url'].isin(shop_item)] url_pd = url_pd[~url_pd['item_url'].isin(error_item)] print('have {} URL list'.format(len(url_pd))) # url_list = pd.read_csv('result/shop_item_list_20211210.csv', index_col=0) return url_pd def serive_create_linux(profilepath): option = webdriver.ChromeOptions() option.add_argument('--headless') option.add_argument('--no-sandbox') option.add_argument('--disable-web-security') option.add_argument('--allow-running-insecure-content') option.add_argument('--incognito') option.add_argument( 'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:65.0) Gecko/20100101 Firefox/65.0') # option.add_argument("--user-data-dir=C:\\Users\\noodles\\AppData\\Local\\Google\\Chrome\\User Data") option.add_argument( "--user-data-dir=/home/noodlesloves/.config/google-chrome/") option.add_argument("profile-directory="+profilepath) driver = webdriver.Chrome('utility/chromedriver', options=option) # driver = webdriver.Chrome(executable_path='/usr/bin/chromedriver', chrome_options=option, # service_args=['--verbose', '--log-path=/tmp/chromedriver.log']) executor_url = driver.command_executor._url session_id = driver.session_id print(session_id) print(executor_url) return driver def find_lon_lat(driver): e = driver.find_element_by_css_selector("#scene > div.widget-scene > canvas") size = e.size total_height = size['height'] total_width = size['width'] size2 = driver.find_element_by_css_selector("#pane > div.Yr7JMd-pane").size left_width = size2['width'] print(total_height, total_width, left_width) x = (total_width - left_width) / 2 + left_width y = total_height / 2 e = driver.find_element_by_css_selector("#pane > div.Yr7JMd-pane") action = webdriver.common.action_chains.ActionChains(driver) action.move_to_element_with_offset(e, x, y) action.context_click() action.perform() time.sleep(0.5) element = driver.find_element_by_css_selector('#action-menu > ul > li:nth-child(1)') lat, lon = element.text.split(',') return float(lat), float(lon) def get_unique_id(driver): element = driver.find_element(By.CSS_SELECTOR, "button[data-value='分享']") driver.implicitly_wait(5) ActionChains(driver).move_to_element(element).click(element).perform() time.sleep(0.5) for i in range(5): ele = driver.find_element(By.CSS_SELECTOR, "input") short_url = ele.get_attribute('value') unique_id = short_url.split('/')[-1] if len(unique_id) != 0: break time.sleep(0.5) element = driver.find_element(By.CSS_SELECTOR, "button[aria-label='關閉']") driver.implicitly_wait(5) ActionChains(driver).move_to_element(element).click(element).perform() return unique_id def page_down_(driver, xpath_css, time_): elmts = driver.find_elements_by_xpath(xpath_css) print(elmts) if len(elmts)>1: elmt=elmts[1] else: elmt=elmts[0] actions = ActionChains(driver) actions.move_to_element(elmt).click().perform() for i in range(time_): try: actions = ActionChains(driver) actions.send_keys(Keys.PAGE_DOWN).perform() except: traceback.print_exc() time.sleep(0.5) def main(): db = DA.mysql_connect(MYSQL_CONFIG, DB_NAME) db2 = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/google_poi?charset=utf8mb4') table2 = db2['swire_store_list'] # keyword = '麻辣火鍋' # if len(sys.argv) >1: # keyword=sys.argv[1] # port=4444 # if len(sys.argv) >2: # port=int(sys.argv[2]) if len(sys.argv) > 1 : port=int(sys.argv[1]) print('restart docker p{}'.format(port)) os.system('sudo docker container restart p'+str(port)) time.sleep(8) for i in range(10): result = db2.query('select * from swire_store_list where check_ is null ORDER BY RAND() limit 500') url_pd = pd.DataFrame([dict(i) for i in result]) url_pd['item_url'] = url_pd['fid'].apply(lambda x: 'https://www.google.com.tw/maps/@24.1753633,120.6747136,15z/data=!4m5!3m4!1s{}!8m2!3d24.1760271!4d120.6705323'.format(x)) # keyword = get_new_keyword(db2) # table2.insert({'kw':keyword,'num':0}) # url_pd = get_not_cralwer_url(keyword) # print('drvier start {}...'.format(keyword)) driver = brower_start(port) #driver = serive_create('Profile 6') #profilepath = 'Profile 1' #driver = serive_create_linux(profilepath) for key, row in url_pd.iterrows(): try: name = row['name'] item_url = row['item_url'] print(key, name, ': ' ,item_url) print('start...') driver.get(item_url) page_down_(driver, "//div[@class='x3AX1-LfntMc-header-title-ij8cu']", 3) # lat, lon = find_lon_lat(driver) # unique_id = get_unique_id(driver) time_status = time_click(driver) time.sleep(0.5) shop_soup = BeautifulSoup(driver.page_source, 'html.parser') output = { # 'name': blank_check(shop_soup.find('h1', class_='x3AX1-LfntMc-header-title-title').text), 'name': name, 'fid': row['fid'] } print(output['name']) print('get_shop_info') output = get_shop_info(driver, output, shop_soup) print('get_intro_info') if len(shop_soup.select("div[aria-label='{}簡介']".format(output['name']))) != 0: output = get_intro_info(driver, output) else: for key in intro_list: output[intro_list[key][0]] = '[]' print('get_time_list') if time_status == '正常': output = get_time_list(shop_soup, output) else: output['open_now'] = False output['periods'] = '' output['weekday_text'] = '' print('user_ratings_total') if output['user_ratings_total'] == '': output['reviews'] = '' else: output = get_reviews(driver, output) print('find_big_photo') output = find_big_photo(output, driver) output_name = output['name'].replace('(','').replace(')', '') query_name = '{}+{}'.format(output_name, output['addr']) query_name = query_name.replace(' ','') output['item_url'] = item_url output['keyword'] = row['keyword'] output['google_url'] = 'https://www.google.com.tw/search?q={}'.format(query_name) data_select_insert(db, SHOP_LIST_TABLE, SHOP_LIST_TABLE_COL, output) table2.upsert({'place_id':row['place_id'],'check_':1},['place_id']) except Exception as e: table3 = db2['error_list2'] table3.insert({'num':row['name'],'keyword':row['keyword'],'item_url':row['item_url'],'crawler_date':datetime.today().strftime("%Y/%m/%d %H:%M")}) print(e) # error_table_col = ['name', 'keyword', 'item_url', 'crawler_date'] # db = DA.mysql_connect(MYSQL_CONFIG, DB_NAME) # data_select_insert(db, 'error_list2', error_table_col, row) time.sleep(1) if __name__ == '__main__': main()