|
- # -*- coding: utf-8 -*-
- from selenium import webdriver
- from selenium.webdriver.common.action_chains import ActionChains
- from selenium.webdriver.common.keys import Keys
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.wait import WebDriverWait
- from selenium.common.exceptions import NoSuchElementException
- from selenium.webdriver.common.by import By
- from bs4 import BeautifulSoup
- from utility import database_access as DA
- from utility.parseutils import *
- from utility.connect import *
- from datetime import datetime
- import traceback
- import dataset
- import pandas as pd
- import time
- import json
- import re
- import sys
- import os
- # import pyautogui as pag
- def serive_create(profilepath):
- option = webdriver.ChromeOptions()
- option.add_argument('--disable-web-security')
- option.add_argument('--allow-running-insecure-content')
- option.add_argument("--user-data-dir=C:\\Users\\user\\AppData\\Local\\Google\\Chrome\\User Data")
- option.add_argument("profile-directory="+profilepath)
- driver = webdriver.Chrome('./utility/chromedriver_win32/chromedriver', options=option)
- executor_url = driver.command_executor._url
- session_id = driver.session_id
- print (session_id)
- print (executor_url)
- time.sleep(3)
-
- return driver
- def brower_start(port):
- options = webdriver.ChromeOptions()
- # browser = webdriver.Chrome(options=options)
- browser = webdriver.Chrome(options=options)
- # browser = webdriver.Remote(
- # command_executor='http://127.0.0.1:'+str(port)+'/wd/hub',
- # # command_executor='http://192.53.174.202:'+str(port)+'/wd/hub',
- # desired_capabilities=options.to_capabilities()
- # )
- return browser
- def keyin_keyword(driver, keyword):
- button = driver.find_element_by_id("searchbox")
- driver.implicitly_wait(30)
- ActionChains(driver).move_to_element(button).send_keys(keyword).send_keys(Keys.RETURN).perform()
- time.sleep(3)
- # element = driver.find_element_by_class_name("V0h1Ob-haAclf")
- # driver.implicitly_wait(30)
- # ActionChains(driver).move_to_element(element).click(element).perform()
- def open_time(driver):
- element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[9]/div[2]')
- if element.text.find('預訂') == -1:
- element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[9]/div[2]')
- driver.implicitly_wait(10)
- ActionChains(driver).move_to_element(element).click(element).perform()
- return 1
- else:
- return 0
- def get_shop_info(driver, output, shop_soup):
- # current_url_split = driver.current_url.split('@')[1].split(',')
- # output['lon'] = current_url_split[1]
- # output['lat'] = current_url_split[0]
-
- location = shop_soup.find('button',{'data-item-id':'oloc'})['aria-label'].split(' ')
- output['city'] = location[-1]
- output['area'] = location[-2]
-
- try:
- output['addr'] = shop_soup.find('button',{'data-item-id':'address'})['aria-label'].replace('地址:', '')
- except:
- output['addr'] = ''
-
- try:
- output['tel'] = blank_check(shop_soup.find('button',{'data-tooltip':'複製電話號碼'})['aria-label'].split(':')[1])
- except:
- output['tel'] = ''
- print(output['addr'], ', ' ,output['tel'])
- for key in element_list:
- try:
- element = element_list[key]
- if len(element) == 3:
- value = shop_soup.find(element[0],element[1])[element[2]]
- else:
- tmp_value = shop_soup.find(element[0],element[1])
- if tmp_value:
- value = tmp_value.text
- else:
- value = ''
- output[key] = value_check(key, value)
- except:
- output[key] = ''
- return output
- def get_intro_info(driver, output):
- # element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[6]')
- try:
- element = driver.find_element(By.CSS_SELECTOR, "div[aria-label='{}簡介']".format(output['name']))
- driver.implicitly_wait(5)
- ActionChains(driver).move_to_element(element).click(element).perform()
- # pageSource = driver.page_source
- # fileToWrite = open("page_source.html", "w")
- # fileToWrite.write(pageSource)
- # fileToWrite.close()
- page_down_(driver, '//*[@id="pane"]/div/div[1]', 3)
- intro_soup = BeautifulSoup(driver.page_source, 'html.parser')
- for key in intro_list:
- elements = intro_soup.find('div',{'aria-label':key})
- if elements:
- element = elements.find_all('li',{'class':'LQjNnc-p83tee-JNdkSc-ibnC6b'})
- count = 0
- tmp = []
- for ele in element:
- # if ele.find('img',{'src':"//www.gstatic.com/images/icons/material/system_gm/2x/check_black_18dp.png"}):
- if ele.find('img',{'src':"//www.gstatic.com/images/icons/material/system_gm/1x/check_black_18dp.png"}):
- tmp += [{
- 'id':count,
- intro_list[key][1]: blank_check(ele.text)
- }]
- count += 1
- print(str(tmp))
- output[intro_list[key][0]] = str(tmp)
- else:
- output[intro_list[key][0]] = '[]'
- driver.back()
- return output
- except:
- for key in intro_list:
- output[intro_list[key][0]] = '[]'
- return output
- def get_time_list(shop_soup, output):
- periods = []
- weekday_text = []
-
- open_now = blank_check(shop_soup.find('span', {'class':'LJKBpe-Tswv1b-hour-text'}).text.split('\xa0')[0])
- if open_now == '永久停業' or open_now == '暫時關閉':
- output['open_now'] = 'False'
- else:
- output['open_now'] = 'True'
- for tr_ in shop_soup.find_all('tr'):
- if tr_.find('div').text.replace(' ','') != '':
- week = tr_.find('div').text
- time_list = [blank_check(i.text) for i in tr_.find_all('li')]
- for time_ in time_list:
- if time_ == '24 小時營業':
- periods += [{
- "open":{
- "day": week_list[week],
- "time": 0000
- },
- "close":{
- "day": week_list[week],
- "time": ''
- }
- }]
- elif time_ == '休息':
- periods += [{
- "open":{
- "day": week_list[week],
- "time": ''
- },
- "close":{
- "day": week_list[week],
- "time": ''
- }
- }]
- else:
- start, end = time_.split('–')
- end_hour, end_min = end.split(':')
- start_hour, start_min = start.split(':')
- if end_hour < start_hour:
- end_day = week_list[week] + 1
- else:
- end_day = week_list[week]
- periods += [{
- "open":{
- "day": week_list[week],
- "time": start.replace(':','')
- },
- "close":{
- "day": end_day,
- "time": end.replace(':','')
- }
- }]
- weekday_text += ["{}: {}".format(week, ', '.join(time_list))]
- output['periods'] = str(periods)
- output['weekday_text'] = str(weekday_text)
- return output
- def get_reviews(driver, output):
- wait = WebDriverWait(driver, 30)
- more_reviews_css = "button[jsaction='pane.rating.moreReviews']"
- wait.until(
- EC.element_to_be_clickable((By.CSS_SELECTOR, more_reviews_css))
- )
- element = driver.find_element_by_css_selector(more_reviews_css)
- driver.implicitly_wait(10)
- ActionChains(driver).move_to_element(element).click(element).perform()
- time.sleep(0.5)
- # page_down_(driver, '//*[@id="pane"]/div/div[1]/div/div/div[2]/div[1]', 5)
- page_down_(driver, '//div[@class="PPCwl"]',5)
- comment_soup = BeautifulSoup(driver.page_source, 'html.parser')
- if comment_soup.find_all('div',class_='ODSEW-ShBeI-xJzy8c-bF1uUb') != 0:
- all_photo = driver.find_elements_by_class_name('ODSEW-ShBeI-xJzy8c-bF1uUb')
- for ap in all_photo:
- ap.click()
- if comment_soup.select('button[aria-label="顯示更多"]') != 0:
- all_review = driver.find_elements_by_css_selector('button[aria-label="顯示更多"]')
- for ap in all_review:
- ap.click()
- comment_soup = BeautifulSoup(driver.page_source, 'html.parser')
- count = 0
- reviews = []
- for comment in comment_soup.find_all('div',{'class':'ODSEW-ShBeI'}):
- comment_a_tag = comment.find_all('a')
- author_name = blank_check(comment_a_tag[1].find('div', class_= 'ODSEW-ShBeI-title').text)
- profile_photo_url = comment_a_tag[0].find('img')['src']
- rating = blank_check(comment.find('span',{'role':'img'})['aria-label'].replace('顆星', ''))
- text = comment.find('div', class_='ODSEW-ShBeI-ShBeI-content').text
- created_at = comment.find('span', class_='ODSEW-ShBeI-RgZmSc-date').text
- photos = []
- c = 0
- for i in comment.find_all('button', class_='ODSEW-ShBeI-xJzy8c'):
- path = i['style'].split(';')[0].split('url')[1].replace('\"','').replace('(','').replace(')','')
- photos += [path]
- c += 1
-
- reviews += [{
- 'id': comment.find('a')['href'].split('/')[5],
- 'author_name': author_name,
- 'profile_photo_url': profile_photo_url,
- 'rating': int(rating),
- 'text': text,
- 'created_at': created_at,
- 'photos': photos
- }]
- count += 1
- output['reviews'] = str(reviews)
- driver.back()
- return output
- # def get_photo(output, shop_soup):
- # shop_photo = {}
- # for i in shop_soup.find('div',{'aria-label':'{}的相片'.format(output['name'])}).find_all('button'):
- # try:
- # if i['aria-label'] == '街景服務和 360 度相片' or i['aria-label'] == '影片':
- # continue
-
- # shop_photo[i['aria-label']] = i.find('img')['src']
- # except:
- # pass
- # output['shop_photo'] = shop_photo
- # return output
- def find_photo_list(driver):
- time.sleep(0.5)
- wait = WebDriverWait(driver, 60)
- wait.until(
- EC.element_to_be_clickable((By.XPATH, '//*[@id="pane"]/div/div[1]/div/div/div[3]/div[1]/div[1]/div/a'))
- )
- page_down_(driver,'//*[@id="pane"]/div/div[1]/div/div/div[3]/div[1]/div[1]/div/a' , 10)
- photo_soup = BeautifulSoup(driver.page_source, 'html.parser')
- photo_url = []
- count = 0
- for i in photo_soup.find_all('a', class_='mWq4Rd-eEDwDf'):
- if count > 5: break
- a_url = i.find('div', class_='mWq4Rd-HiaYvf-CNusmb-gevUs loaded')
- if a_url:
- if a_url.find('width') != -1:
- sentence = a_url['style']
- photo = re.search(r'https:(.*)\"', sentence)
- photo_url += [photo.group(0).replace('\"','')]
- count += 1
- return photo_url
- def find_big_photo(output, driver):
- # element = driver.find_element(By.CSS_SELECTOR, "div[aria-label='{}的相片']".format(output['name']))
- wait = WebDriverWait(driver, 60)
- wait.until(
- EC.element_to_be_clickable((By.XPATH, '//*[@id="pane"]/div/div[1]/div/div/div[1]/div[1]/button'))
- )
- element = driver.find_element(By.XPATH, '//*[@id="pane"]/div/div[1]/div/div/div[1]/div[1]/button')
- ActionChains(driver).move_to_element(element).click(element).perform()
- output['shop_photo'] = '[]'
- output['menu_photo'] = '[]'
-
- photo_map = {
- '全部': 'shop_photo',
- '菜單': 'menu_photo'
- }
- driver.find_element(By.CSS_SELECTOR, "button[data-tab-index='1']")
- photo_soup = BeautifulSoup(driver.page_source, 'html.parser')
- tab_dict = {}
- for tab_index in [0, 1, 2]:
- selector = photo_soup.select("button[data-tab-index='{}']".format(tab_index))
- if len(selector) != 0:
- photo_name = selector[0].text
- if photo_name == '菜單':
- tab_dict[photo_name] = tab_index
- elif photo_name == '全部':
- tab_dict[photo_name] = tab_index
- print(tab_dict)
- for tab_ in tab_dict:
- tab_index = tab_dict[tab_]
- print(tab_index)
- wait = WebDriverWait(driver, 60)
- wait.until(
- EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-tab-index='{}']".format(tab_index)))
- )
- element = driver.find_element(By.CSS_SELECTOR, "button[data-tab-index='{}']".format(tab_index))
- ActionChains(driver).move_to_element(element).click(element).perform()
- photo_list = find_photo_list(driver)
- output[photo_map[tab_]] = str(photo_list)
-
- return output
- def get_url_list(driver):
- # wait = WebDriverWait(driver, 10)
- # wait.until(
- # EC.element_to_be_clickable((By.XPATH, '//*[@id="sGi9mc-m5SR9c-bottom-pane"]/div/div[1]/div/div/div/div[1]/div[2]/div[2]'))
- # )
- # driver.back()
- time.sleep(2)
- for i in range(5, 43, 2):
- driver.find_element(By.XPATH,'//*[@id="pane"]/div/div[1]/div/div/div[2]/div[1]/div[{}]/div/a'.format(i)).send_keys(Keys.DOWN)
- url_soup = BeautifulSoup(driver.page_source, 'html.parser')
- url_list = []
- for i in url_soup.find_all('a'):
- try:
- if i['href'].find('maps/place') != -1:
- url_list += [[i['href'], i['aria-label']]]
- except:
- pass
-
- return url_list
- def data_select_insert(db, table_name, table_col, data):
- tmp = []
- for name_ in table_col:
- if name_ == 'crawler_date':
- continue
- if name_ == 'lon' or name_ == 'lat':
- tmp += [float(data[name_])]
- else:
- tmp += [data[name_]]
- tmp += [datetime.today().strftime("%Y/%m/%d %H:%M")]
- insert_sql = """INSERT IGNORE INTO {}{} VALUES {}"""\
- .format(table_name, str(tuple(table_col)).replace('\'',''), tuple(tmp))
- DA.mysql_insert_data(db, insert_sql)
- def time_click(driver):
- shop_soup_tmp = BeautifulSoup(driver.page_source, 'html.parser')
- status = ''
- try:
- if len(shop_soup_tmp.select("span[aria-label='顯示本週營業時間']")) != 0:
- time_css = "span[aria-label='顯示本週營業時間']"
- element = driver.find_element_by_css_selector(time_css)
- driver.implicitly_wait(10)
- ActionChains(driver).move_to_element(element).click(element).perform()
- status = '正常'
-
- elif len(shop_soup_tmp.select("img[aria-label='通知']")) != 0:
- status = shop_soup_tmp.find('span',class_='LJKBpe-Tswv1b-text aSftqf').text
- # status = '永久停業' or '暫時關閉'
-
- elif len(shop_soup_tmp.select('button[aria-label*="查看更詳細的營業時間"]')) != 0:
- status = 'error'
-
- return status
- except:
- return ''
- def get_new_keyword(db):
- result = db.query('select distinct(keyword) from shop_item_list order by keyword')
- result = pd.DataFrame([i for i in result])
- progress = db.query('select distinct(kw) from progress_list2')
- progress = pd.DataFrame([i for i in progress])
- if len(progress) != 0:
- keyword = result[~result['keyword'].isin(progress.kw.to_list())].iloc[0].values[0]
- else:
- keyword = result.iloc[0].values[0]
-
- return keyword
- def get_not_cralwer_url(keyword):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/google_poi?charset=utf8mb4')
- table = db['shop_item_list3']
- url_list = list(table.find(keyword=keyword))
- shop_item = [i['item_url'] for i in db.query('SELECT item_url FROM shop_list2 where keyword="{}"'.format(keyword))]
- error_item = [i['item_url'] for i in db.query('SELECT item_url FROM error_list2 where keyword="{}"'.format(keyword))]
-
- url_pd = pd.DataFrame(url_list, columns=url_list[0].keys())
- # url_pd['item_url_length'] = url_pd.item_url.apply(lambda x: len(x))
- # url_pd = url_pd[(url_pd['item_url_length']!=1000) & (url_pd['item_url_length']!=600)]
- url_pd = url_pd[~url_pd['item_url'].isin(shop_item)]
- url_pd = url_pd[~url_pd['item_url'].isin(error_item)]
- print('have {} URL list'.format(len(url_pd)))
- # url_list = pd.read_csv('result/shop_item_list_20211210.csv', index_col=0)
- return url_pd
- def serive_create_linux(profilepath):
- option = webdriver.ChromeOptions()
- option.add_argument('--headless')
- option.add_argument('--no-sandbox')
- option.add_argument('--disable-web-security')
- option.add_argument('--allow-running-insecure-content')
- option.add_argument('--incognito')
- option.add_argument(
- 'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:65.0) Gecko/20100101 Firefox/65.0')
- # option.add_argument("--user-data-dir=C:\\Users\\noodles\\AppData\\Local\\Google\\Chrome\\User Data")
- option.add_argument(
- "--user-data-dir=/home/noodlesloves/.config/google-chrome/")
- option.add_argument("profile-directory="+profilepath)
- driver = webdriver.Chrome('utility/chromedriver', options=option)
- # driver = webdriver.Chrome(executable_path='/usr/bin/chromedriver', chrome_options=option,
- # service_args=['--verbose', '--log-path=/tmp/chromedriver.log'])
- executor_url = driver.command_executor._url
- session_id = driver.session_id
- print(session_id)
- print(executor_url)
- return driver
- def find_lon_lat(driver):
- e = driver.find_element_by_css_selector("#scene > div.widget-scene > canvas")
- size = e.size
- total_height = size['height']
- total_width = size['width']
- size2 = driver.find_element_by_css_selector("#pane > div.Yr7JMd-pane").size
- left_width = size2['width']
- print(total_height, total_width, left_width)
- x = (total_width - left_width) / 2 + left_width
- y = total_height / 2
- e = driver.find_element_by_css_selector("#pane > div.Yr7JMd-pane")
- action = webdriver.common.action_chains.ActionChains(driver)
- action.move_to_element_with_offset(e, x, y)
- action.context_click()
- action.perform()
- time.sleep(0.5)
- element = driver.find_element_by_css_selector('#action-menu > ul > li:nth-child(1)')
- lat, lon = element.text.split(',')
- return float(lat), float(lon)
- def get_unique_id(driver):
- element = driver.find_element(By.CSS_SELECTOR, "button[data-value='分享']")
- driver.implicitly_wait(5)
- ActionChains(driver).move_to_element(element).click(element).perform()
- time.sleep(0.5)
- for i in range(5):
- ele = driver.find_element(By.CSS_SELECTOR, "input")
- short_url = ele.get_attribute('value')
- unique_id = short_url.split('/')[-1]
- if len(unique_id) != 0:
- break
- time.sleep(0.5)
- element = driver.find_element(By.CSS_SELECTOR, "button[aria-label='關閉']")
- driver.implicitly_wait(5)
- ActionChains(driver).move_to_element(element).click(element).perform()
- return unique_id
- def page_down_(driver, xpath_css, time_):
- elmts = driver.find_elements_by_xpath(xpath_css)
- print(elmts)
- if len(elmts)>1:
- elmt=elmts[1]
- else:
- elmt=elmts[0]
- actions = ActionChains(driver)
- actions.move_to_element(elmt).click().perform()
- for i in range(time_):
- try:
- actions = ActionChains(driver)
- actions.send_keys(Keys.PAGE_DOWN).perform()
- except:
- traceback.print_exc()
- time.sleep(0.5)
- def main():
- db = DA.mysql_connect(MYSQL_CONFIG, DB_NAME)
- db2 = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/google_poi?charset=utf8mb4')
- table2 = db2['swire_store_list']
- # keyword = '麻辣火鍋'
- # if len(sys.argv) >1:
- # keyword=sys.argv[1]
- # port=4444
- # if len(sys.argv) >2:
- # port=int(sys.argv[2])
- if len(sys.argv) > 1 :
- port=int(sys.argv[1])
- print('restart docker p{}'.format(port))
- os.system('sudo docker container restart p'+str(port))
- time.sleep(8)
- for i in range(10):
- result = db2.query('select * from swire_store_list where check_ is null ORDER BY RAND() limit 500')
- url_pd = pd.DataFrame([dict(i) for i in result])
- url_pd['item_url'] = url_pd['fid'].apply(lambda x: 'https://www.google.com.tw/maps/@24.1753633,120.6747136,15z/data=!4m5!3m4!1s{}!8m2!3d24.1760271!4d120.6705323'.format(x))
- # keyword = get_new_keyword(db2)
- # table2.insert({'kw':keyword,'num':0})
- # url_pd = get_not_cralwer_url(keyword)
- # print('drvier start {}...'.format(keyword))
- driver = brower_start(port)
- #driver = serive_create('Profile 6')
- #profilepath = 'Profile 1'
- #driver = serive_create_linux(profilepath)
-
- for key, row in url_pd.iterrows():
- try:
- name = row['name']
- item_url = row['item_url']
- print(key, name, ': ' ,item_url)
-
- print('start...')
- driver.get(item_url)
- page_down_(driver, "//div[@class='x3AX1-LfntMc-header-title-ij8cu']", 3)
- # lat, lon = find_lon_lat(driver)
- # unique_id = get_unique_id(driver)
- time_status = time_click(driver)
- time.sleep(0.5)
- shop_soup = BeautifulSoup(driver.page_source, 'html.parser')
- output = {
- # 'name': blank_check(shop_soup.find('h1', class_='x3AX1-LfntMc-header-title-title').text),
- 'name': name,
- 'fid': row['fid']
- }
- print(output['name'])
- print('get_shop_info')
- output = get_shop_info(driver, output, shop_soup)
- print('get_intro_info')
- if len(shop_soup.select("div[aria-label='{}簡介']".format(output['name']))) != 0:
- output = get_intro_info(driver, output)
- else:
- for key in intro_list:
- output[intro_list[key][0]] = '[]'
- print('get_time_list')
- if time_status == '正常':
- output = get_time_list(shop_soup, output)
- else:
- output['open_now'] = False
- output['periods'] = ''
- output['weekday_text'] = ''
- print('user_ratings_total')
- if output['user_ratings_total'] == '':
- output['reviews'] = ''
- else:
- output = get_reviews(driver, output)
- print('find_big_photo')
- output = find_big_photo(output, driver)
- output_name = output['name'].replace('(','').replace(')', '')
- query_name = '{}+{}'.format(output_name, output['addr'])
- query_name = query_name.replace(' ','')
- output['item_url'] = item_url
- output['keyword'] = row['keyword']
- output['google_url'] = 'https://www.google.com.tw/search?q={}'.format(query_name)
- data_select_insert(db, SHOP_LIST_TABLE, SHOP_LIST_TABLE_COL, output)
- table2.upsert({'place_id':row['place_id'],'check_':1},['place_id'])
- except Exception as e:
- table3 = db2['error_list2']
- table3.insert({'num':row['name'],'keyword':row['keyword'],'item_url':row['item_url'],'crawler_date':datetime.today().strftime("%Y/%m/%d %H:%M")})
- print(e)
- # error_table_col = ['name', 'keyword', 'item_url', 'crawler_date']
- # db = DA.mysql_connect(MYSQL_CONFIG, DB_NAME)
- # data_select_insert(db, 'error_list2', error_table_col, row)
- time.sleep(1)
-
- if __name__ == '__main__':
- main()
|