# -*- coding: utf-8 -*- from selenium import webdriver from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By from bs4 import BeautifulSoup from utility import database_access as DA from utility.parseutils import * from utility.connect import * from datetime import datetime import traceback import dataset import pandas as pd import time import json import re import sys import os import logging import sys from logging.handlers import SysLogHandler import socket _LOG_SERVER = ('hhh.ptt.cx', 514) logger = logging.getLogger('poibot') handler1 = SysLogHandler(address=_LOG_SERVER,socktype=socket.SOCK_DGRAM) logger.addHandler(handler1) hname=socket.gethostname() pid=str(os.getpid()) logger.fatal('[poibot]['+hname+']['+pid+']begin') # import pyautogui as pag def serive_create(profilepath): if driver is not None: driver.quit() os.system('killall chrome') driver=None option = webdriver.ChromeOptions() option.add_argument('--disable-web-security') option.add_argument('--allow-running-insecure-content') option.add_argument("--user-data-dir=C:\\Users\\user\\AppData\\Local\\Google\\Chrome\\User Data") option.add_argument("profile-directory="+profilepath) driver = webdriver.Chrome('./utility/chromedriver_win32/chromedriver', options=option) executor_url = driver.command_executor._url session_id = driver.session_id print (session_id) print (executor_url) time.sleep(3) return driver def brower_start(port): logger.fatal('[poibot]['+hname+']['+pid+']browser start') options = webdriver.ChromeOptions() # browser = webdriver.Chrome(options=options) options.add_argument('--ignore-certificate-errors') options.add_argument("--no-sandbox") # options.add_argument("--headless") options.add_argument("--disable-gpu") options.add_argument("--disable-dev-shm-usage") browser = webdriver.Chrome(options=options) browser.set_window_size(1400,1000) # browser = webdriver.Remote( # command_executor='http://127.0.0.1:'+str(port)+'/wd/hub', # # command_executor='http://192.53.174.202:'+str(port)+'/wd/hub', # desired_capabilities=options.to_capabilities() # ) return browser def keyin_keyword(driver, keyword): button = driver.find_element_by_id("searchbox") driver.implicitly_wait(30) ActionChains(driver).move_to_element(button).send_keys(keyword).send_keys(Keys.RETURN).perform() time.sleep(3) # element = driver.find_element_by_class_name("V0h1Ob-haAclf") # driver.implicitly_wait(30) # ActionChains(driver).move_to_element(element).click(element).perform() def open_time(driver): element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[9]/div[2]') if element.text.find('預訂') == -1: element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[9]/div[2]') driver.implicitly_wait(10) ActionChains(driver).move_to_element(element).click(element).perform() return 1 else: return 0 def get_shop_info(driver, output, shop_soup): # current_url_split = driver.current_url.split('@')[1].split(',') # output['lon'] = current_url_split[1] # output['lat'] = current_url_split[0] location = shop_soup.find('button',{'data-item-id':'oloc'})['aria-label'].split(' ') output['city'] = location[-1] output['area'] = location[-2] try: output['addr'] = shop_soup.find('button',{'data-item-id':'address'})['aria-label'].replace('地址:', '') except: output['addr'] = '' try: output['tel'] = blank_check(shop_soup.find('button',{'data-tooltip':'複製電話號碼'})['aria-label'].split(':')[1]) except: output['tel'] = '' print(output['addr'], ', ' ,output['tel']) for key in element_list: try: element = element_list[key] if len(element) == 3: value = shop_soup.find(element[0],element[1])[element[2]] else: tmp_value = shop_soup.find(element[0],element[1]) if tmp_value: value = tmp_value.text else: value = '' output[key] = value_check(key, value) except: output[key] = '' return output def get_intro_info(driver, output): # element = driver.find_element_by_xpath('//*[@id="pane"]/div/div[1]/div/div/div[6]') try: element = driver.find_element(By.CSS_SELECTOR, "div[aria-label='{}簡介']".format(output['name'])) driver.implicitly_wait(5) ActionChains(driver).move_to_element(element).click(element).perform() # pageSource = driver.page_source # fileToWrite = open("page_source.html", "w") # fileToWrite.write(pageSource) # fileToWrite.close() page_down_(driver, '//*[@id="pane"]/div/div[1]', 3) intro_soup = BeautifulSoup(driver.page_source, 'html.parser') for key in intro_list: elements = intro_soup.find('div',{'aria-label':key}) if elements: element = elements.find_all('li',{'class':'LQjNnc-p83tee-JNdkSc-ibnC6b'}) count = 0 tmp = [] for ele in element: # if ele.find('img',{'src':"//www.gstatic.com/images/icons/material/system_gm/2x/check_black_18dp.png"}): if ele.find('img',{'src':"//www.gstatic.com/images/icons/material/system_gm/1x/check_black_18dp.png"}): tmp += [{ 'id':count, intro_list[key][1]: blank_check(ele.text) }] count += 1 print(str(tmp)) output[intro_list[key][0]] = str(tmp) else: output[intro_list[key][0]] = '[]' driver.back() return output except: for key in intro_list: output[intro_list[key][0]] = '[]' return output def get_time_list(shop_soup, output): periods = [] weekday_text = [] open_now = blank_check(shop_soup.find('span', {'class':'LJKBpe-Tswv1b-hour-text'}).text.split('\xa0')[0]) if open_now == '永久停業' or open_now == '暫時關閉': output['open_now'] = 'False' else: output['open_now'] = 'True' for tr_ in shop_soup.find_all('tr'): if tr_.find('div').text.replace(' ','') != '': week = tr_.find('div').text time_list = [blank_check(i.text) for i in tr_.find_all('li')] for time_ in time_list: if time_ == '24 小時營業': periods += [{ "open":{ "day": week_list[week], "time": 0000 }, "close":{ "day": week_list[week], "time": '' } }] elif time_ == '休息': periods += [{ "open":{ "day": week_list[week], "time": '' }, "close":{ "day": week_list[week], "time": '' } }] else: start, end = time_.split('–') end_hour, end_min = end.split(':') start_hour, start_min = start.split(':') if end_hour < start_hour: end_day = week_list[week] + 1 else: end_day = week_list[week] periods += [{ "open":{ "day": week_list[week], "time": start.replace(':','') }, "close":{ "day": end_day, "time": end.replace(':','') } }] weekday_text += ["{}: {}".format(week, ', '.join(time_list))] output['periods'] = str(periods) output['weekday_text'] = str(weekday_text) return output def get_reviews(driver, output): wait = WebDriverWait(driver, 30) more_reviews_css = "button[jsaction='pane.rating.moreReviews']" wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, more_reviews_css)) ) element = driver.find_element_by_css_selector(more_reviews_css) driver.implicitly_wait(10) ActionChains(driver).move_to_element(element).click(element).perform() time.sleep(0.5) # page_down_(driver, '//*[@id="pane"]/div/div[1]/div/div/div[2]/div[1]', 5) page_down_(driver, '//div[@class="PPCwl"]',5) comment_soup = BeautifulSoup(driver.page_source, 'html.parser') if comment_soup.find_all('div',class_='ODSEW-ShBeI-xJzy8c-bF1uUb') != 0: all_photo = driver.find_elements_by_class_name('ODSEW-ShBeI-xJzy8c-bF1uUb') for ap in all_photo: ap.click() if comment_soup.select('button[aria-label="顯示更多"]') != 0: all_review = driver.find_elements_by_css_selector('button[aria-label="顯示更多"]') for ap in all_review: ap.click() comment_soup = BeautifulSoup(driver.page_source, 'html.parser') count = 0 reviews = [] for comment in comment_soup.find_all('div',{'class':'ODSEW-ShBeI'}): comment_a_tag = comment.find_all('a') author_name = blank_check(comment_a_tag[1].find('div', class_= 'ODSEW-ShBeI-title').text) profile_photo_url = comment_a_tag[0].find('img')['src'] rating = blank_check(comment.find('span',{'role':'img'})['aria-label'].replace('顆星', '')) text = comment.find('div', class_='ODSEW-ShBeI-ShBeI-content').text created_at = comment.find('span', class_='ODSEW-ShBeI-RgZmSc-date').text photos = [] c = 0 for i in comment.find_all('button', class_='ODSEW-ShBeI-xJzy8c'): path = i['style'].split(';')[0].split('url')[1].replace('\"','').replace('(','').replace(')','') photos += [path] c += 1 reviews += [{ 'id': comment.find('a')['href'].split('/')[5], 'author_name': author_name, 'profile_photo_url': profile_photo_url, 'rating': int(rating), 'text': text, 'created_at': created_at, 'photos': photos }] count += 1 output['reviews'] = str(reviews) driver.back() return output # def get_photo(output, shop_soup): # shop_photo = {} # for i in shop_soup.find('div',{'aria-label':'{}的相片'.format(output['name'])}).find_all('button'): # try: # if i['aria-label'] == '街景服務和 360 度相片' or i['aria-label'] == '影片': # continue # shop_photo[i['aria-label']] = i.find('img')['src'] # except: # pass # output['shop_photo'] = shop_photo # return output def find_photo_list(driver): time.sleep(0.5) wait = WebDriverWait(driver, 60) wait.until( EC.element_to_be_clickable((By.XPATH, '//*[@id="pane"]/div/div[1]/div/div/div[3]/div[1]/div[1]/div/a')) ) page_down_(driver,'//*[@id="pane"]/div/div[1]/div/div/div[3]/div[1]/div[1]/div/a' , 10) photo_soup = BeautifulSoup(driver.page_source, 'html.parser') photo_url = [] count = 0 for i in photo_soup.find_all('a', class_='mWq4Rd-eEDwDf'): if count > 5: break a_url = i.find('div', class_='mWq4Rd-HiaYvf-CNusmb-gevUs loaded') if a_url: if a_url.find('width') != -1: sentence = a_url['style'] photo = re.search(r'https:(.*)\"', sentence) photo_url += [photo.group(0).replace('\"','')] count += 1 return photo_url def find_big_photo(output, driver): # element = driver.find_element(By.CSS_SELECTOR, "div[aria-label='{}的相片']".format(output['name'])) wait = WebDriverWait(driver, 60) wait.until( EC.element_to_be_clickable((By.XPATH, '//*[@id="pane"]/div/div[1]/div/div/div[1]/div[1]/button')) ) element = driver.find_element(By.XPATH, '//*[@id="pane"]/div/div[1]/div/div/div[1]/div[1]/button') ActionChains(driver).move_to_element(element).click(element).perform() output['shop_photo'] = '[]' output['menu_photo'] = '[]' photo_map = { '全部': 'shop_photo', '菜單': 'menu_photo' } driver.find_element(By.CSS_SELECTOR, "button[data-tab-index='1']") photo_soup = BeautifulSoup(driver.page_source, 'html.parser') tab_dict = {} for tab_index in [0, 1, 2]: selector = photo_soup.select("button[data-tab-index='{}']".format(tab_index)) if len(selector) != 0: photo_name = selector[0].text if photo_name == '菜單': tab_dict[photo_name] = tab_index elif photo_name == '全部': tab_dict[photo_name] = tab_index print(tab_dict) for tab_ in tab_dict: tab_index = tab_dict[tab_] print(tab_index) wait = WebDriverWait(driver, 60) wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-tab-index='{}']".format(tab_index))) ) element = driver.find_element(By.CSS_SELECTOR, "button[data-tab-index='{}']".format(tab_index)) ActionChains(driver).move_to_element(element).click(element).perform() photo_list = find_photo_list(driver) output[photo_map[tab_]] = str(photo_list) return output def get_url_list(driver): # wait = WebDriverWait(driver, 10) # wait.until( # EC.element_to_be_clickable((By.XPATH, '//*[@id="sGi9mc-m5SR9c-bottom-pane"]/div/div[1]/div/div/div/div[1]/div[2]/div[2]')) # ) # driver.back() time.sleep(2) for i in range(5, 43, 2): driver.find_element(By.XPATH,'//*[@id="pane"]/div/div[1]/div/div/div[2]/div[1]/div[{}]/div/a'.format(i)).send_keys(Keys.DOWN) url_soup = BeautifulSoup(driver.page_source, 'html.parser') url_list = [] for i in url_soup.find_all('a'): try: if i['href'].find('maps/place') != -1: url_list += [[i['href'], i['aria-label']]] except: pass return url_list def data_select_insert(db, table_name, table_col, data): tmp = [] for name_ in table_col: if name_ == 'crawler_date': continue if name_ == 'lon' or name_ == 'lat': tmp += [float(data[name_])] else: tmp += [data[name_]] tmp += [datetime.today().strftime("%Y/%m/%d %H:%M")] insert_sql = """INSERT IGNORE INTO {}{} VALUES {}"""\ .format(table_name, str(tuple(table_col)).replace('\'',''), tuple(tmp)) DA.mysql_insert_data(db, insert_sql) def time_click(driver): shop_soup_tmp = BeautifulSoup(driver.page_source, 'html.parser') status = '' try: if len(shop_soup_tmp.select("span[aria-label='顯示本週營業時間']")) != 0: time_css = "span[aria-label='顯示本週營業時間']" element = driver.find_element_by_css_selector(time_css) driver.implicitly_wait(10) ActionChains(driver).move_to_element(element).click(element).perform() status = '正常' elif len(shop_soup_tmp.select("img[aria-label='通知']")) != 0: status = shop_soup_tmp.find('span',class_='LJKBpe-Tswv1b-text aSftqf').text # status = '永久停業' or '暫時關閉' elif len(shop_soup_tmp.select('button[aria-label*="查看更詳細的營業時間"]')) != 0: status = 'error' return status except: return '' def get_new_keyword(db): result = db.query('select distinct(keyword) from shop_item_list order by keyword') result = pd.DataFrame([i for i in result]) progress = db.query('select distinct(kw) from progress_list2') progress = pd.DataFrame([i for i in progress]) if len(progress) != 0: keyword = result[~result['keyword'].isin(progress.kw.to_list())].iloc[0].values[0] else: keyword = result.iloc[0].values[0] return keyword def get_not_cralwer_url(keyword): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/google_poi?charset=utf8mb4') table = db['shop_item_list3'] url_list = list(table.find(keyword=keyword)) shop_item = [i['item_url'] for i in db.query('SELECT item_url FROM shop_list2 where keyword="{}"'.format(keyword))] error_item = [i['item_url'] for i in db.query('SELECT item_url FROM error_list2 where keyword="{}"'.format(keyword))] url_pd = pd.DataFrame(url_list, columns=url_list[0].keys()) # url_pd['item_url_length'] = url_pd.item_url.apply(lambda x: len(x)) # url_pd = url_pd[(url_pd['item_url_length']!=1000) & (url_pd['item_url_length']!=600)] url_pd = url_pd[~url_pd['item_url'].isin(shop_item)] url_pd = url_pd[~url_pd['item_url'].isin(error_item)] print('have {} URL list'.format(len(url_pd))) # url_list = pd.read_csv('result/shop_item_list_20211210.csv', index_col=0) return url_pd def serive_create_linux(profilepath): option = webdriver.ChromeOptions() option.add_argument('--headless') option.add_argument('--no-sandbox') option.add_argument('--disable-web-security') option.add_argument('--allow-running-insecure-content') option.add_argument('--incognito') option.add_argument( 'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:65.0) Gecko/20100101 Firefox/65.0') # option.add_argument("--user-data-dir=C:\\Users\\noodles\\AppData\\Local\\Google\\Chrome\\User Data") option.add_argument( "--user-data-dir=/home/noodlesloves/.config/google-chrome/") option.add_argument("profile-directory="+profilepath) driver = webdriver.Chrome('utility/chromedriver', options=option) # driver = webdriver.Chrome(executable_path='/usr/bin/chromedriver', chrome_options=option, # service_args=['--verbose', '--log-path=/tmp/chromedriver.log']) executor_url = driver.command_executor._url session_id = driver.session_id print(session_id) print(executor_url) return driver def find_lon_lat(driver): e = driver.find_element_by_css_selector("#scene > div.widget-scene > canvas") size = e.size total_height = size['height'] total_width = size['width'] size2 = driver.find_element_by_css_selector("#pane > div.Yr7JMd-pane").size left_width = size2['width'] print(total_height, total_width, left_width) x = (total_width - left_width) / 2 + left_width y = total_height / 2 e = driver.find_element_by_css_selector("#pane > div.Yr7JMd-pane") action = webdriver.common.action_chains.ActionChains(driver) action.move_to_element_with_offset(e, x, y) action.context_click() action.perform() time.sleep(0.5) element = driver.find_element_by_css_selector('#action-menu > ul > li:nth-child(1)') lat, lon = element.text.split(',') return float(lat), float(lon) def get_unique_id(driver): element = driver.find_element(By.CSS_SELECTOR, "button[data-value='分享']") driver.implicitly_wait(5) ActionChains(driver).move_to_element(element).click(element).perform() time.sleep(0.5) for i in range(5): ele = driver.find_element(By.CSS_SELECTOR, "input") short_url = ele.get_attribute('value') unique_id = short_url.split('/')[-1] if len(unique_id) != 0: break time.sleep(0.5) element = driver.find_element(By.CSS_SELECTOR, "button[aria-label='關閉']") driver.implicitly_wait(5) ActionChains(driver).move_to_element(element).click(element).perform() return unique_id def page_down_(driver, xpath_css, time_): elmts = driver.find_elements_by_xpath(xpath_css) # print(xpath_css) print(elmts) # time.sleep(9999) if len(elmts)>1: elmt=elmts[1] else: elmt=elmts[0] actions = ActionChains(driver) actions.move_to_element(elmt).click().perform() for i in range(time_): try: actions = ActionChains(driver) actions.send_keys(Keys.PAGE_DOWN).perform() except: traceback.print_exc() time.sleep(0.5) def main(): db = DA.mysql_connect(MYSQL_CONFIG, DB_NAME) db2 = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/google_poi?charset=utf8mb4') table2 = db2['swire_store_list'] # keyword = '麻辣火鍋' # if len(sys.argv) >1: # keyword=sys.argv[1] # port=4444 # if len(sys.argv) >2: # port=int(sys.argv[2]) if len(sys.argv) > 1 : port=int(sys.argv[1]) # print('restart docker p{}'.format(port)) # os.system('sudo docker container restart p'+str(port)) # time.sleep(8) else: port = 2 for i in range(10): # result = db2.query('select * from swire_store_list where check_ is null and fid not in (select distinct fid from error_list2) ORDER BY RAND() limit 500') result = db2.query('SELECT * FROM swire_store_list a WHERE not exists (select 1 from error_list2 tei where tei.fid = a.fid limit 1 ) ORDER BY RAND() limit 500') url_pd = pd.DataFrame([dict(i) for i in result]) # print(url_pd) url_pd['item_url'] = url_pd['fid'].apply(lambda x: 'https://www.google.com.tw/maps/@24.1753633,120.6747136,15z/data=!4m5!3m4!1s{}!8m2!3d24.1760271!4d120.6705323'.format(x)) # keyword = get_new_keyword(db2) # table2.insert({'kw':keyword,'num':0}) # url_pd = get_not_cralwer_url(keyword) # print('drvier start {}...'.format(keyword)) driver = brower_start(port) time.sleep(4) #driver = serive_create('Profile 6') #profilepath = 'Profile 1' #driver = serive_create_linux(profilepath) for key, row in url_pd.iterrows(): try: name = row['name'] logger.fatal('[poibot]['+hname+']['+pid+'] processing: '+name) item_url = row['item_url'] print(key, name, ': ' ,item_url) print('start...') driver.get(item_url) time.sleep(9999) # page_down_(driver, "//div[@class='x3AX1-LfntMc-header-title-ij8cu']", 3) page_down_(driver, "//div[@class='x3AX1-LfntMc-header-title-ij8cu-haAclf']", 3) # lat, lon = find_lon_lat(driver) # unique_id = get_unique_id(driver) time_status = time_click(driver) time.sleep(0.5) shop_soup = BeautifulSoup(driver.page_source, 'html.parser') output = { # 'name': blank_check(shop_soup.find('h1', class_='x3AX1-LfntMc-header-title-title').text), 'name': name, 'fid': row['fid'] } print(output['name']) print('get_shop_info') output = get_shop_info(driver, output, shop_soup) print('get_intro_info') if len(shop_soup.select("div[aria-label='{}簡介']".format(output['name']))) != 0: output = get_intro_info(driver, output) else: for key in intro_list: output[intro_list[key][0]] = '[]' print('get_time_list') if time_status == '正常': output = get_time_list(shop_soup, output) else: output['open_now'] = False output['periods'] = '' output['weekday_text'] = '' print('user_ratings_total') if output['user_ratings_total'] == '': output['reviews'] = '' else: output = get_reviews(driver, output) print('find_big_photo') output = find_big_photo(output, driver) output_name = output['name'].replace('(','').replace(')', '') query_name = '{}+{}'.format(output_name, output['addr']) query_name = query_name.replace(' ','') output['item_url'] = item_url output['keyword'] = row['keyword'] output['google_url'] = 'https://www.google.com.tw/search?q={}'.format(query_name) data_select_insert(db, SHOP_LIST_TABLE, SHOP_LIST_TABLE_COL, output) table2.upsert({'place_id':row['place_id'],'check_':1},['place_id']) except Exception as e: traceback.print_exc() table3 = db2['error_list2'] table3.insert({'fid':row['fid'],'num':row['name'],'keyword':row['keyword'],'item_url':row['item_url'],'crawler_date':datetime.today().strftime("%Y/%m/%d %H:%M")}) print(e) # error_table_col = ['name', 'keyword', 'item_url', 'crawler_date'] # db = DA.mysql_connect(MYSQL_CONFIG, DB_NAME) # data_select_insert(db, 'error_list2', error_table_col, row) time.sleep(1) if __name__ == '__main__': main()