# -*- coding: utf-8 -*- from selenium import webdriver from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.common.by import By from selenium.common.exceptions import TimeoutException from selenium.common.exceptions import WebDriverException from utility import * from const import * import json from datetime import datetime from bs4 import BeautifulSoup import time, os from tqdm import tqdm import pandas as pd import dataset import argparse import configparser config = configparser.ConfigParser() config.read('config.ini') db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/linkedin?charset=utf8mb4') company_list_table = db[DB_COMPANY] user_list_table = db[DB_USER] def show_more_result(driver, company_count): for i in tqdm(range(int(company_count/25)+1)): for button in driver.find_elements(By.CSS_SELECTOR,'button.scaffold-finite-scroll__load-button'): if button.text == 'Show more results': # print(button) try: ActionChains(driver).move_to_element(button).click(button).perform() except: pass time.sleep(1) break def get_company_from_first_page(interest_button, company_url_list): company_list = [] for i in interest_button.find_element(By.XPATH,"../../..").find_elements(By.CSS_SELECTOR,' div.artdeco-tabpanel.active ul.pvs-list li.artdeco-list__item'): company_url = i.find_element(By.CSS_SELECTOR,'a[data-field="active_tab_companies_interests"]').get_attribute('href') if company_url in company_url_list: continue company_name = i.find_element(By.CSS_SELECTOR,'span.t-bold span').text company_image = i.find_element(By.CSS_SELECTOR,'img').get_attribute('src') company_followers = int(i.find_element(By.CSS_SELECTOR,'span.t-black--light span').text.replace(' followers','').replace(',','')) company_list += [company_name] company_list_table.insert({ 'company_url': company_url, 'company_name': company_name, 'company_image': company_image, 'company_followers': company_followers, 'dt': datetime.now() }) return company_list def get_company_from_next_page(driver, company_url_list): shop_soup = BeautifulSoup(driver.page_source, 'html.parser') company_list = [] for item in tqdm(shop_soup.find_all('li', class_='pvs-list__paged-list-item')): try: company_url = item.select("a[data-field='active_tab_companies_interests']")[0]['href'] if company_url in company_url_list: continue company_name = item.find('span', class_= 't-bold').find('span').text company_image = item.select('div img')[0]['src'] company_followers = item.find('span', 't-black--light').find('span').text company_followers = int(company_followers.replace(' followers','').replace(',','')) company_list += [company_name] company_list_table.insert({ 'company_url': company_url, 'company_name': company_name, 'company_image': company_image, 'company_followers': company_followers, 'dt': datetime.now() }) except: pass return company_list def move_to_interest_company_web(driver): interest_div = driver.find_element(By.ID,'interests') interest_div_parent = interest_div.find_element(By.XPATH,"..") # move to company tag for button in interest_div_parent.find_elements(By.CSS_SELECTOR,'button span'): if button.text == 'Companies': interest_button = button ActionChains(driver).move_to_element(button).click(button).perform() break # show all company company_count = '' # interest_div = driver.find_element(By.ID,'interests') # interest_div_parent = interest_div.find_element(By.XPATH,"..") show_all_company_button = interest_div_parent.find_elements(By.CSS_SELECTOR, 'div.pvs-list__footer-wrapper') for button in show_all_company_button: if button.text.find('companies') != -1: company_count = int(button.text.replace('Show all','')\ .replace('companies','')\ .replace(',', '')) ActionChains(driver).move_to_element(button).click(button).perform() break return company_count, interest_button def argparse_setting(): p = argparse.ArgumentParser() p.add_argument('-l', '--limit_count', nargs='?', const=1, type=int, default=20) p.add_argument('-u', '--user', nargs='?', const=1, type=str, default='person1') p.add_argument('-p', '--port', nargs='?', const=1, type=int, default='4446') p.add_argument('-e', '--enviorment', nargs='?', const=1, type=str, default='windows', help="windows or linux") return p def main(): p = argparse_setting() args = p.parse_args() if args.enviorment == 'windows': print('windows web start') driver = serive_create() else: print('linux web start') print('restart docker p{}'.format(args.port)) os.system('sudo docker container restart p'+str(args.port)) time.sleep(8) driver = brower_start(args.port, 8787, False) url = 'https://www.linkedin.com/login' driver.get(url) print(f'login in with {args.user}') linkedin_login(driver, config, user_choose=args.user) time.sleep(2) check_page(driver) result = db.query(f"SELECT * FROM {DB_USER} where company_list is null ORDER BY RAND() limit {args.limit_count}") result = pd.DataFrame([dict(i) for i in result]) # try: print('start to crawler...') for k, r in result.iterrows(): company_url_list = check_duplicate(DB_COMPANY, 'company_url', db) url = r['url'] driver.get(url) print(url) company_count, interest_button = move_to_interest_company_web(driver) print(f'company_count: {company_count}') if company_count == '': company_list = get_company_from_first_page(interest_button, company_url_list) else: if company_count > 10: # more results if company_count > 2000: company_count = 2000 show_more_result(driver, company_count) time.sleep(1) company_list = get_company_from_next_page(driver, company_url_list) print(len(company_list)) user_list_table.upsert({'url':url,'company_list':' | '.join(company_list[:2000])},['url']) time.sleep(2) # except: # pass if __name__ == '__main__': main()