123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- # -*- coding: utf-8 -*-
- from selenium import webdriver
- from selenium.webdriver.common.action_chains import ActionChains
- from selenium.webdriver.common.keys import Keys
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.wait import WebDriverWait
- from selenium.webdriver.common.by import By
- from selenium.common.exceptions import TimeoutException
- from selenium.common.exceptions import WebDriverException
- import json
- from datetime import datetime
- from bs4 import BeautifulSoup
- import time
- from tqdm import tqdm
- import pandas as pd
- import dataset
- import argparse
- import configparser
- config = configparser.ConfigParser()
- config.read('config.ini')
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/linkedin?charset=utf8mb4')
- company_list_table = db['company']
- user_list_table = db['user']
- def brower_start(port):
- options = webdriver.ChromeOptions()
- browser = webdriver.Remote(
- command_executor='http://127.0.0.1:'+str(port)+'/wd/hub',
- desired_capabilities=options.to_capabilities()
- )
- return browser
- def serive_create():
- option = webdriver.ChromeOptions()
- option.add_argument('--disable-web-security')
- option.add_argument('--allow-running-insecure-content')
- # option.add_argument("--user-data-dir=//Users//noodles//Documents//project")
- # option.add_argument("profile-directory="+profilepath)
- driver = webdriver.Chrome('../../driver/chromedriver_20230202/chromedriver', options=option)
- executor_url = driver.command_executor._url
- session_id = driver.session_id
- print (session_id)
- print (executor_url)
- time.sleep(3)
-
- return driver
- def string_check(x):
- return x.rstrip().lstrip()
- def get_content_info(driver):
- shop_soup = BeautifulSoup(driver.page_source, 'html.parser')
- post_info = shop_soup.find('article').select("div[data-test-id='main-feed-activity-card__entity-lockup']")[0]
-
- post_name = post_info.find('a', class_='text-sm link-styled no-underline leading-open').text
- post_name = string_check(post_name)
- post_position = post_info.find('p').text
- post_position = string_check(post_position)
- print(post_name, ';', post_position)
-
- content = shop_soup.find('article').find('p',class_='attributed-text-segment-list__content').text
- print(content)
-
- try:
- content_url = shop_soup.select("article a[data-tracking-control-name='public_post_feed-article-content']")[0].get('href')
- except:
- content_url = ''
-
- return {
- 'post_name': post_name,
- 'post_position':post_position,
- 'content':content,
- 'content_url':content_url
- }
- def linkedin_login(driver, config, user_choose='person2'):
- user = config[user_choose]['user']
- passwd = config[user_choose]['passwd']
- user_button = driver.find_element(By.ID, "username")
- driver.implicitly_wait(30)
- ActionChains(driver).move_to_element(user_button).click(user_button).send_keys(user).perform()
- # time.sleep(3)
-
- passwd_button = driver.find_element(By.ID, "password")
- driver.implicitly_wait(30)
- ActionChains(driver).move_to_element(passwd_button).click(passwd_button).send_keys(passwd).send_keys(Keys.ENTER).perform()
- # time.sleep(1)
- def check_duplicate(table_name, column):
- result = db.query(f'SELECT {column} FROM {table_name}')
- result = pd.DataFrame([dict(i) for i in result])
-
- return result[column].to_list()
- def check_page(driver):
- soup = BeautifulSoup(driver.page_source, 'html.parser')
- try:
- if soup.find('h2', class_='headline-new').text.find('我們無法聯絡到您') != -1:
- print('email error')
- ignore_button = driver.find_element(By.CSS_SELECTOR, "button.secondary-action-new")
- driver.implicitly_wait(30)
- ActionChains(driver).move_to_element(ignore_button).click(ignore_button).perform()
- except:
- pass
- def show_more_result(driver, company_count):
- for i in tqdm(range(int(company_count/25)+1)):
- for button in driver.find_elements(By.CSS_SELECTOR,'button.scaffold-finite-scroll__load-button'):
- if button.text == 'Show more results':
- # print(button)
- try:
- ActionChains(driver).move_to_element(button).click(button).perform()
- except:
- pass
- time.sleep(1)
- break
- def get_company_from_first_page(interest_button):
- company_list = []
- for i in interest_button.find_element(By.XPATH,"../../..").find_elements(By.CSS_SELECTOR,' div.artdeco-tabpanel.active ul.pvs-list li.artdeco-list__item'):
- company_name = i.find_element(By.CSS_SELECTOR,'span.t-bold span').text
- company_url = i.find_element(By.CSS_SELECTOR,'a[data-field="active_tab_companies_interests"]').get_attribute('href')
- company_image = i.find_element(By.CSS_SELECTOR,'img').get_attribute('src')
- company_followers = int(i.find_element(By.CSS_SELECTOR,'span.t-black--light span').text.replace(' followers','').replace(',',''))
- company_list += [company_name]
- company_list_table.insert({
- 'company_url': company_url,
- 'company_name': company_name,
- 'company_image': company_image,
- 'company_followers': company_followers,
- 'dt': datetime.now()
- })
- return company_list
- def get_company_from_next_page(driver):
- shop_soup = BeautifulSoup(driver.page_source, 'html.parser')
- company_list = []
- for item in tqdm(shop_soup.find_all('li', class_='pvs-list__paged-list-item')):
- try:
- company_url = item.select("a[data-field='active_tab_companies_interests']")[0]['href']
- company_name = item.find('span', class_= 't-bold').find('span').text
- company_image = item.select('div img')[0]['src']
- company_followers = item.find('span', 't-black--light').find('span').text
- company_followers = int(company_followers.replace(' followers','').replace(',',''))
- company_list += [company_name]
- company_list_table.insert({
- 'company_url': company_url,
- 'company_name': company_name,
- 'company_image': company_image,
- 'company_followers': company_followers,
- 'dt': datetime.now()
- })
- except:
- pass
- return company_list
- def move_to_interest_company_web(driver):
- interest_div = driver.find_element(By.ID,'interests')
- interest_div_parent = interest_div.find_element(By.XPATH,"..")
- # move to company tag
- for button in interest_div_parent.find_elements(By.CSS_SELECTOR,'button span'):
- if button.text == 'Companies':
- interest_button = button
- ActionChains(driver).move_to_element(button).click(button).perform()
- break
-
- # show all company
- company_count = ''
- # interest_div = driver.find_element(By.ID,'interests')
- # interest_div_parent = interest_div.find_element(By.XPATH,"..")
- show_all_company_button = interest_div_parent.find_elements(By.CSS_SELECTOR, 'div.pvs-list__footer-wrapper')
- for button in show_all_company_button:
- if button.text.find('companies') != -1:
- company_count = int(button.text.replace('Show all','')\
- .replace('companies','')\
- .replace(',', ''))
- ActionChains(driver).move_to_element(button).click(button).perform()
- break
- return company_count, interest_button
- def argparse_setting():
- p = argparse.ArgumentParser()
- p.add_argument('-l', '--limit_count', nargs='?', const=1, type=int, default=20)
- p.add_argument('-u', '--user', nargs='?', const=1, type=str, default='person1')
- p.add_argument('-p', '--port', nargs='?', const=1, type=int, default='4446')
- p.add_argument('-e', '--enviorment', nargs='?', const=1, type=str, default='windows')
- # p.add_argument('--add-feature-a', dest='a', action='store_true', default=False)
- return p
- def main():
- p = argparse_setting()
- args = p.parse_args()
- if args.enviorment == 'winodws':
- driver = serive_create()
- else:
- driver = brower_start(args.port)
- url = 'https://www.linkedin.com/login'
- driver.get(url)
- linkedin_login(driver, config, user_choose=args.user)
- time.sleep(2)
- check_page(driver)
- result = db.query(f"SELECT * FROM user where company_list is null ORDER BY RAND() limit {args.limit_count}")
- result = pd.DataFrame([dict(i) for i in result])
- # try:
- for k, r in result.iterrows():
- company_url_list = check_duplicate('company', 'company_url')
-
- url = r['url']
- driver.get(url)
- print(url)
- company_count, interest_button = move_to_interest_company_web(driver)
- print(f'company_count: {company_count}')
-
- if company_count == '':
- company_list = get_company_from_first_page(interest_button)
- else:
- if company_count > 10:
- # more results
- if company_count > 2000:
- company_count = 2000
- show_more_result(driver, company_count)
- time.sleep(1)
- company_list = get_company_from_next_page(driver)
-
- print(len(company_list))
- user_list_table.upsert({'url':url,'company_list':' | '.join(company_list[:2000])},['url'])
- time.sleep(2)
- # except:
- # pass
-
-
- if __name__ == '__main__':
- main()
|