123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- # -*- coding: utf-8 -*-
- from selenium import webdriver
- from selenium.webdriver.common.action_chains import ActionChains
- from selenium.webdriver.common.keys import Keys
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.wait import WebDriverWait
- from selenium.webdriver.common.by import By
- from selenium.common.exceptions import TimeoutException
- from selenium.common.exceptions import WebDriverException
- from utility import *
- from const import *
- import json
- from datetime import datetime
- from bs4 import BeautifulSoup
- import time, os
- from tqdm import tqdm
- import pandas as pd
- import dataset
- import argparse
- import configparser
- config = configparser.ConfigParser()
- config.read('config.ini')
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/linkedin?charset=utf8mb4')
- company_list_table = db[DB_COMPANY]
- user_list_table = db[DB_USER]
- def show_more_result(driver, company_count):
- for i in tqdm(range(int(company_count/25)+1)):
- for button in driver.find_elements(By.CSS_SELECTOR,'button.scaffold-finite-scroll__load-button'):
- if button.text == 'Show more results':
- # print(button)
- try:
- ActionChains(driver).move_to_element(button).click(button).perform()
- except:
- pass
- time.sleep(1)
- break
- def get_company_from_first_page(interest_button, company_url_list):
- company_list = []
- for i in interest_button.find_element(By.XPATH,"../../..").find_elements(By.CSS_SELECTOR,' div.artdeco-tabpanel.active ul.pvs-list li.artdeco-list__item'):
- company_url = i.find_element(By.CSS_SELECTOR,'a[data-field="active_tab_companies_interests"]').get_attribute('href')
- if company_url in company_url_list:
- continue
- company_name = i.find_element(By.CSS_SELECTOR,'span.t-bold span').text
- company_image = i.find_element(By.CSS_SELECTOR,'img').get_attribute('src')
- company_followers = int(i.find_element(By.CSS_SELECTOR,'span.t-black--light span').text.replace(' followers','').replace(',',''))
- company_list += [company_name]
- company_list_table.insert({
- 'company_url': company_url,
- 'company_name': company_name,
- 'company_image': company_image,
- 'company_followers': company_followers,
- 'dt': datetime.now()
- })
- return company_list
- def get_company_from_next_page(driver, company_url_list):
- shop_soup = BeautifulSoup(driver.page_source, 'html.parser')
- company_list = []
- for item in tqdm(shop_soup.find_all('li', class_='pvs-list__paged-list-item')):
- try:
- company_url = item.select("a[data-field='active_tab_companies_interests']")[0]['href']
- if company_url in company_url_list:
- continue
- company_name = item.find('span', class_= 't-bold').find('span').text
- company_image = item.select('div img')[0]['src']
- company_followers = item.find('span', 't-black--light').find('span').text
- company_followers = int(company_followers.replace(' followers','').replace(',',''))
- company_list += [company_name]
- company_list_table.insert({
- 'company_url': company_url,
- 'company_name': company_name,
- 'company_image': company_image,
- 'company_followers': company_followers,
- 'dt': datetime.now()
- })
- except:
- pass
- return company_list
- def move_to_interest_company_web(driver):
- interest_div = driver.find_element(By.ID,'interests')
- interest_div_parent = interest_div.find_element(By.XPATH,"..")
- # move to company tag
- for button in interest_div_parent.find_elements(By.CSS_SELECTOR,'button span'):
- if button.text == 'Companies':
- interest_button = button
- ActionChains(driver).move_to_element(button).click(button).perform()
- break
-
- # show all company
- company_count = ''
- # interest_div = driver.find_element(By.ID,'interests')
- # interest_div_parent = interest_div.find_element(By.XPATH,"..")
- show_all_company_button = interest_div_parent.find_elements(By.CSS_SELECTOR, 'div.pvs-list__footer-wrapper')
- for button in show_all_company_button:
- if button.text.find('companies') != -1:
- company_count = int(button.text.replace('Show all','')\
- .replace('companies','')\
- .replace(',', ''))
- ActionChains(driver).move_to_element(button).click(button).perform()
- break
- return company_count, interest_button
- def argparse_setting():
- p = argparse.ArgumentParser()
- p.add_argument('-l', '--limit_count', nargs='?', const=1, type=int, default=20)
- p.add_argument('-u', '--user', nargs='?', const=1, type=str, default='person1')
- p.add_argument('-p', '--port', nargs='?', const=1, type=int, default='4446')
- p.add_argument('-e', '--enviorment', nargs='?', const=1, type=str, default='windows', help="windows or linux")
- return p
- def main():
- p = argparse_setting()
- args = p.parse_args()
- if args.enviorment == 'windows':
- print('windows web start')
- driver = serive_create()
- else:
- print('linux web start')
- print('restart docker p{}'.format(args.port))
- os.system('sudo docker container restart p'+str(args.port))
- time.sleep(8)
- driver = brower_start(args.port, 8787, False)
- url = 'https://www.linkedin.com/login'
- driver.get(url)
- print(f'login in with {args.user}')
- linkedin_login(driver, config, user_choose=args.user)
- time.sleep(2)
- check_page(driver)
- result = db.query(f"SELECT * FROM {DB_USER} where company_list is null ORDER BY RAND() limit {args.limit_count}")
- result = pd.DataFrame([dict(i) for i in result])
- # try:
- print('start to crawler...')
- for k, r in result.iterrows():
- company_url_list = check_duplicate(DB_COMPANY, 'company_url', db)
-
- url = r['url']
- driver.get(url)
- print(url)
- company_count, interest_button = move_to_interest_company_web(driver)
- print(f'company_count: {company_count}')
-
- if company_count == '':
- company_list = get_company_from_first_page(interest_button, company_url_list)
- else:
- if company_count > 10:
- # more results
- if company_count > 2000:
- company_count = 2000
- show_more_result(driver, company_count)
- time.sleep(1)
- company_list = get_company_from_next_page(driver, company_url_list)
-
- print(len(company_list))
- user_list_table.upsert({'url':url,'company_list':' | '.join(company_list[:2000])},['url'])
- time.sleep(2)
- # except:
- # pass
-
-
- if __name__ == '__main__':
- main()
|