123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- # -*- coding: utf-8 -*-
- from selenium import webdriver
- from selenium.webdriver.common.action_chains import ActionChains
- from selenium.webdriver.common.keys import Keys
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.wait import WebDriverWait
- from selenium.webdriver.common.by import By
- from selenium.common.exceptions import TimeoutException
- from selenium.common.exceptions import WebDriverException
- from utility import *
- from const import *
- import json
- from datetime import datetime
- from bs4 import BeautifulSoup
- import time
- from tqdm import tqdm
- import pandas as pd
- import dataset
- import argparse
- import configparser
- config = configparser.ConfigParser()
- config.read('config.ini')
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/linkedin?charset=utf8mb4')
- post_list_table = db[DB_POST]
- user_list_table = db[DB_USER]
- user_url_list = check_duplicate(DB_USER, 'url', db)
- url_list = check_duplicate(DB_POST, 'url', db)
- def get_content_info(driver):
- shop_soup = BeautifulSoup(driver.page_source, 'html.parser')
-
- post_info = shop_soup.select("a.app-aware-link div.update-components-actor__meta ")[0]
- post_name = post_info.find('span', class_='t-bold').text
- post_name = string_check(post_name)
- post_position = post_info.find('span', class_='t-black--light').text
- post_position = string_check(post_position)
- print(post_name, ';', post_position)
-
- content = shop_soup.find('div',class_='feed-shared-update-v2__description-wrapper').select("span[dir='ltr']")[0].text
- print(content)
-
- try:
- content_url = shop_soup.select('div.update-components-article__link-container')[0].find('a').get('href')
- except:
- content_url = ''
-
- return {
- 'post_name': post_name,
- 'post_position':post_position,
- 'content':content,
- 'content_url':content_url
- }
- def get_reaction_button(driver):
- more_reaction_button = driver.find_element(By.CSS_SELECTOR, "button[data-jump-link-target='reactors-facepile-see-more-jump-target']")
- # print(more_reaction_button)
- driver.implicitly_wait(30)
- ActionChains(driver).move_to_element(more_reaction_button).click(more_reaction_button).perform()
- time.sleep(1)
- try:
- all_reactions_num = driver.find_element(By.CSS_SELECTOR, "button[data-js-reaction-tab='ALL']").text
- all_reactions_num = all_reactions_num.split('\n')[-1]
- print(f'reactions numbers: {all_reactions_num}')
-
- except:
- all_reactions_num = driver.find_element(By.CSS_SELECTOR, "button[data-js-reaction-tab='LIKE']").text
- all_reactions_num = all_reactions_num.split('\n')[-1]
- print(f'reactions numbers: {all_reactions_num}')
- all_reactions_num = int(all_reactions_num.replace(',', ''))
- return all_reactions_num
- def show_more_result(driver, all_reactions_num):
- if all_reactions_num > 2000:
- all_reactions_num = 2000
- for i in tqdm(range(int(int(all_reactions_num) / 10 ) + 1)):
- for button in driver.find_elements(By.CSS_SELECTOR,'button'):
- if button.text == 'Show more results':
- try:
- button_click = button
- ActionChains(driver).move_to_element(button_click).click(button_click).perform()
- except:
- pass
- break
- time.sleep(1)
- def argparse_setting():
- p = argparse.ArgumentParser()
- p.add_argument('-u', '--user', nargs='?', const=1, type=str, default='person1')
- p.add_argument('-p', '--port', nargs='?', const=1, type=int, default='4446')
- p.add_argument('-e', '--enviorment', nargs='?', const=1, type=str, default='windows', help="windows or linux")
- p.add_argument('-url', '--url', required = True)
- # p.add_argument('--add-feature-a', dest='a', action='store_true', default=False)
- return p
- def main():
- p = argparse_setting()
- args = p.parse_args()
- print(args.enviorment, args.port, args.user)
- if args.enviorment == 'windows':
- print('windows web start')
- driver = serive_create()
- else:
- print('linux web start')
- driver = brower_start(args.port)
- print(f'login in with {args.user}')
- url = 'https://www.linkedin.com/login'
- driver.get(url)
- linkedin_login(driver, config, user_choose=args.user)
- time.sleep(2)
- check_page(driver)
- time.sleep(2)
- url = args.url
- print(url)
-
- # post info
- print('start to crawler...')
- for i in range(3):
- try:
- driver.get(url)
- time.sleep(2)
- post_output = get_content_info(driver)
- post_output['url'] = url
- post_output['dt'] = datetime.now()
- if url not in url_list:
- print('url not in post table')
- post_list_table.insert(post_output)
- all_reactions_num = get_reaction_button(driver)
- post_list_table.upsert({'url':url,'all_reactions_num':all_reactions_num},['url'])
- print('upsert success')
- break
- except:
- pass
- show_more_result(driver, all_reactions_num)
- a_list = driver.find_elements(By.CSS_SELECTOR, "ul.artdeco-list li a[rel='noopener noreferrer']")
- print(len(a_list))
- # reaction_list = []
- print('start to insert...')
- for i in a_list:
- person_url = i.get_attribute("href")
- reaction_info = i.text.split('\n')
- person_name = reaction_info[0]
- person_position = reaction_info[-1]
-
- if person_url not in user_url_list:
- user_list_table.insert({
- 'post_url': url,
- 'url': person_url,
- 'person_name': person_name,
- 'person_position': person_position,
- 'dt': datetime.now()
- })
- # reaction_list += [[url, person_url, person_name, person_position]]
- if __name__ == '__main__':
- main()
|