# -*- coding: utf-8 -*- from selenium import webdriver from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.common.by import By from selenium.common.exceptions import TimeoutException from selenium.common.exceptions import WebDriverException import json from datetime import datetime from bs4 import BeautifulSoup import time from tqdm import tqdm import pandas as pd import dataset import argparse import configparser config = configparser.ConfigParser() config.read('config.ini') db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/linkedin?charset=utf8mb4') company_list_table = db['company'] user_list_table = db['user'] def brower_start(port): options = webdriver.ChromeOptions() browser = webdriver.Remote( command_executor='http://127.0.0.1:'+str(port)+'/wd/hub', desired_capabilities=options.to_capabilities() ) return browser def serive_create(): option = webdriver.ChromeOptions() option.add_argument('--disable-web-security') option.add_argument('--allow-running-insecure-content') # option.add_argument("--user-data-dir=//Users//noodles//Documents//project") # option.add_argument("profile-directory="+profilepath) driver = webdriver.Chrome('../../driver/chromedriver_20230202/chromedriver', options=option) executor_url = driver.command_executor._url session_id = driver.session_id print (session_id) print (executor_url) time.sleep(3) return driver def string_check(x): return x.rstrip().lstrip() def get_content_info(driver): shop_soup = BeautifulSoup(driver.page_source, 'html.parser') post_info = shop_soup.find('article').select("div[data-test-id='main-feed-activity-card__entity-lockup']")[0] post_name = post_info.find('a', class_='text-sm link-styled no-underline leading-open').text post_name = string_check(post_name) post_position = post_info.find('p').text post_position = string_check(post_position) print(post_name, ';', post_position) content = shop_soup.find('article').find('p',class_='attributed-text-segment-list__content').text print(content) try: content_url = shop_soup.select("article a[data-tracking-control-name='public_post_feed-article-content']")[0].get('href') except: content_url = '' return { 'post_name': post_name, 'post_position':post_position, 'content':content, 'content_url':content_url } def linkedin_login(driver, config, user_choose='person2'): user = config[user_choose]['user'] passwd = config[user_choose]['passwd'] user_button = driver.find_element(By.ID, "username") driver.implicitly_wait(30) ActionChains(driver).move_to_element(user_button).click(user_button).send_keys(user).perform() # time.sleep(3) passwd_button = driver.find_element(By.ID, "password") driver.implicitly_wait(30) ActionChains(driver).move_to_element(passwd_button).click(passwd_button).send_keys(passwd).send_keys(Keys.ENTER).perform() # time.sleep(1) def check_duplicate(table_name, column): result = db.query(f'SELECT {column} FROM {table_name}') result = pd.DataFrame([dict(i) for i in result]) return result[column].to_list() def check_page(driver): soup = BeautifulSoup(driver.page_source, 'html.parser') try: if soup.find('h2', class_='headline-new').text.find('我們無法聯絡到您') != -1: print('email error') ignore_button = driver.find_element(By.CSS_SELECTOR, "button.secondary-action-new") driver.implicitly_wait(30) ActionChains(driver).move_to_element(ignore_button).click(ignore_button).perform() except: pass def show_more_result(driver, company_count): for i in tqdm(range(int(company_count/25)+1)): for button in driver.find_elements(By.CSS_SELECTOR,'button.scaffold-finite-scroll__load-button'): if button.text == 'Show more results': # print(button) try: ActionChains(driver).move_to_element(button).click(button).perform() except: pass time.sleep(1) break def get_company_from_first_page(interest_button): company_list = [] for i in interest_button.find_element(By.XPATH,"../../..").find_elements(By.CSS_SELECTOR,' div.artdeco-tabpanel.active ul.pvs-list li.artdeco-list__item'): company_name = i.find_element(By.CSS_SELECTOR,'span.t-bold span').text company_url = i.find_element(By.CSS_SELECTOR,'a[data-field="active_tab_companies_interests"]').get_attribute('href') company_image = i.find_element(By.CSS_SELECTOR,'img').get_attribute('src') company_followers = int(i.find_element(By.CSS_SELECTOR,'span.t-black--light span').text.replace(' followers','').replace(',','')) company_list += [company_name] company_list_table.insert({ 'company_url': company_url, 'company_name': company_name, 'company_image': company_image, 'company_followers': company_followers, 'dt': datetime.now() }) return company_list def get_company_from_next_page(driver): shop_soup = BeautifulSoup(driver.page_source, 'html.parser') company_list = [] for item in tqdm(shop_soup.find_all('li', class_='pvs-list__paged-list-item')): try: company_url = item.select("a[data-field='active_tab_companies_interests']")[0]['href'] company_name = item.find('span', class_= 't-bold').find('span').text company_image = item.select('div img')[0]['src'] company_followers = item.find('span', 't-black--light').find('span').text company_followers = int(company_followers.replace(' followers','').replace(',','')) company_list += [company_name] company_list_table.insert({ 'company_url': company_url, 'company_name': company_name, 'company_image': company_image, 'company_followers': company_followers, 'dt': datetime.now() }) except: pass return company_list def move_to_interest_company_web(driver): interest_div = driver.find_element(By.ID,'interests') interest_div_parent = interest_div.find_element(By.XPATH,"..") # move to company tag for button in interest_div_parent.find_elements(By.CSS_SELECTOR,'button span'): if button.text == 'Companies': interest_button = button ActionChains(driver).move_to_element(button).click(button).perform() break # show all company company_count = '' # interest_div = driver.find_element(By.ID,'interests') # interest_div_parent = interest_div.find_element(By.XPATH,"..") show_all_company_button = interest_div_parent.find_elements(By.CSS_SELECTOR, 'div.pvs-list__footer-wrapper') for button in show_all_company_button: if button.text.find('companies') != -1: company_count = int(button.text.replace('Show all','')\ .replace('companies','')\ .replace(',', '')) ActionChains(driver).move_to_element(button).click(button).perform() break return company_count, interest_button def argparse_setting(): p = argparse.ArgumentParser() p.add_argument('-l', '--limit_count', nargs='?', const=1, type=int, default=20) p.add_argument('-u', '--user', nargs='?', const=1, type=str, default='person1') p.add_argument('-p', '--port', nargs='?', const=1, type=int, default='4446') p.add_argument('-e', '--enviorment', nargs='?', const=1, type=str, default='windows') # p.add_argument('--add-feature-a', dest='a', action='store_true', default=False) return p def main(): p = argparse_setting() args = p.parse_args() if args.enviorment == 'winodws': driver = serive_create() else: driver = brower_start(args.port) url = 'https://www.linkedin.com/login' driver.get(url) linkedin_login(driver, config, user_choose=args.user) time.sleep(2) check_page(driver) result = db.query(f"SELECT * FROM user where company_list is null ORDER BY RAND() limit {args.limit_count}") result = pd.DataFrame([dict(i) for i in result]) # try: for k, r in result.iterrows(): company_url_list = check_duplicate('company', 'company_url') url = r['url'] driver.get(url) print(url) company_count, interest_button = move_to_interest_company_web(driver) print(f'company_count: {company_count}') if company_count == '': company_list = get_company_from_first_page(interest_button) else: if company_count > 10: # more results if company_count > 2000: company_count = 2000 show_more_result(driver, company_count) time.sleep(1) company_list = get_company_from_next_page(driver) print(len(company_list)) user_list_table.upsert({'url':url,'company_list':' | '.join(company_list[:2000])},['url']) time.sleep(2) # except: # pass if __name__ == '__main__': main()