# -*- coding: utf-8 -*- from selenium import webdriver from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.common.by import By from selenium.common.exceptions import TimeoutException from selenium.common.exceptions import WebDriverException from utility import * from const import * import json from datetime import datetime from bs4 import BeautifulSoup import time from tqdm import tqdm import pandas as pd import dataset import argparse import configparser config = configparser.ConfigParser() config.read('config.ini') db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/linkedin?charset=utf8mb4') post_list_table = db[DB_POST] user_list_table = db[DB_USER] user_url_list = check_duplicate(DB_USER, 'url', db) url_list = check_duplicate(DB_POST, 'url', db) def get_content_info(driver): shop_soup = BeautifulSoup(driver.page_source, 'html.parser') post_info = shop_soup.select("a.app-aware-link div.update-components-actor__meta ")[0] post_name = post_info.find('span', class_='t-bold').text post_name = string_check(post_name) post_position = post_info.find('span', class_='t-black--light').text post_position = string_check(post_position) print(post_name, ';', post_position) content = shop_soup.find('div',class_='feed-shared-update-v2__description-wrapper').select("span[dir='ltr']")[0].text print(content) try: content_url = shop_soup.select('div.update-components-article__link-container')[0].find('a').get('href') except: content_url = '' return { 'post_name': post_name, 'post_position':post_position, 'content':content, 'content_url':content_url } def get_reaction_button(driver): more_reaction_button = driver.find_element(By.CSS_SELECTOR, "button[data-jump-link-target='reactors-facepile-see-more-jump-target']") # print(more_reaction_button) driver.implicitly_wait(30) ActionChains(driver).move_to_element(more_reaction_button).click(more_reaction_button).perform() time.sleep(1) try: all_reactions_num = driver.find_element(By.CSS_SELECTOR, "button[data-js-reaction-tab='ALL']").text all_reactions_num = all_reactions_num.split('\n')[-1] print(f'reactions numbers: {all_reactions_num}') except: all_reactions_num = driver.find_element(By.CSS_SELECTOR, "button[data-js-reaction-tab='LIKE']").text all_reactions_num = all_reactions_num.split('\n')[-1] print(f'reactions numbers: {all_reactions_num}') all_reactions_num = int(all_reactions_num.replace(',', '')) return all_reactions_num def show_more_result(driver, all_reactions_num): if all_reactions_num > 2000: all_reactions_num = 2000 for i in tqdm(range(int(int(all_reactions_num) / 10 ) + 1)): for button in driver.find_elements(By.CSS_SELECTOR,'button'): if button.text == 'Show more results': try: button_click = button ActionChains(driver).move_to_element(button_click).click(button_click).perform() except: pass break time.sleep(1) def argparse_setting(): p = argparse.ArgumentParser() p.add_argument('-u', '--user', nargs='?', const=1, type=str, default='person1') p.add_argument('-p', '--port', nargs='?', const=1, type=int, default='4446') p.add_argument('-e', '--enviorment', nargs='?', const=1, type=str, default='windows', help="windows or linux") p.add_argument('-url', '--url', required = True) # p.add_argument('--add-feature-a', dest='a', action='store_true', default=False) return p def main(): p = argparse_setting() args = p.parse_args() print(args.enviorment, args.port, args.user) if args.enviorment == 'windows': print('windows web start') driver = serive_create() else: print('linux web start') driver = brower_start(args.port) print(f'login in with {args.user}') url = 'https://www.linkedin.com/login' driver.get(url) linkedin_login(driver, config, user_choose=args.user) time.sleep(2) check_page(driver) time.sleep(2) url = args.url print(url) # post info print('start to crawler...') for i in range(3): try: driver.get(url) time.sleep(2) post_output = get_content_info(driver) post_output['url'] = url post_output['dt'] = datetime.now() if url not in url_list: print('url not in post table') post_list_table.insert(post_output) all_reactions_num = get_reaction_button(driver) post_list_table.upsert({'url':url,'all_reactions_num':all_reactions_num},['url']) print('upsert success') break except: pass show_more_result(driver, all_reactions_num) a_list = driver.find_elements(By.CSS_SELECTOR, "ul.artdeco-list li a[rel='noopener noreferrer']") print(len(a_list)) # reaction_list = [] print('start to insert...') for i in a_list: person_url = i.get_attribute("href") reaction_info = i.text.split('\n') person_name = reaction_info[0] person_position = reaction_info[-1] if person_url not in user_url_list: user_list_table.insert({ 'post_url': url, 'url': person_url, 'person_name': person_name, 'person_position': person_position, 'dt': datetime.now() }) # reaction_list += [[url, person_url, person_name, person_position]] if __name__ == '__main__': main()