from selenium import webdriver import random import time import csv import re from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC#条件判断 from selenium.webdriver.support.wait import WebDriverWait#等待
class TestSpider(object): def __init__(self): self.url = '这里是目标网站链接' self.error = [] #进入一级页面 def get_page(self): options = webdriver.ChromeOptions() # 添加无界面参数 options.add_argument('--headless') self.browser = webdriver.Chrome(options=options) self.browser.get(self.url) print('已启动浏览器1') self.browser.maximize_window() self.browser.implicitly_wait(8)
self.browser2 = webdriver.Chrome(options=options) self.browser2.implicitly_wait(3) print('已启动浏览器2') WebDriverWait(self.browser,5).until(EC.element_to_be_clickable((By.XPATH,'//*[@id="pageTWrap"]/div[4]/ul/li[2]/a'))).click()
#遍历一级页面各个产品节点 def xunhuan_one_page(self): i = 1 #如果出错了,重新爬取,可以在这里加入上次那个产品所在的页数,位置,代码有待补充
while True:
list = self.browser.find_elements_by_xpath('//div[@id="proList"]//div[@index]') # 产品节点 if i <= int(len(list)):
try: num = list[i-1].find_element_by_xpath('.//div[@class="pro-sold"]//span[@class="num"]') page_one = self.browser.find_element_by_xpath('//div[@class="filter-page"]/span').text page_one = re.split('\s|\/', page_one)[1]
list[i-1].find_element_by_xpath('.//div[@class="photo"]//img').click()
print('已进入第{}个产品节点'.format(i)) i = i + 1 time.sleep(random.randint(7,8)) try: self.two_page()
except: print('\033[1;35;0m ****************************** \033[0m') a = ['第{}页第{}个产品爬取失败'.format(page_one,i-1)] self.error.append(a) print(a[0]) print('\033[1;35;0m ****************************** \033[0m') self.save_error(a) # self.error.append(a) time.sleep(random.randint(2,3))
while True: try: MainContent = self.browser.find_element_by_xpath('//div[@class="mainout"]').get_attribute( 'id') break except: self.browser.back() time.sleep(random.randint(2,3)) except: i = i+1 else: if self.browser.page_source.find('unnext') == -1: self.browser.find_element_by_xpath('//div[@class="filter-page"]//a[@class="next"]').click() i = 1#翻页了,归1 one_page = self.browser.find_element_by_xpath('//div[@class="page"]/span/b').text print('一级页面第{}页'.format(one_page)) time.sleep(random.randint(1,2)) else: break print(self.error)
self.browser.quit() # print(self.error)
#二级页面的一些操作 def two_page(self): #尝试爬取二级页面的某些信息,加入了判断语句,10次内拿到数据则停止 m = 1 while True: try : if m <= 10 :
product = self.browser.find_element_by_xpath('//*[@id="colLeft"]/div[1]/div[1]/div/div[1]/h2').text page = self.browser.find_element_by_xpath('//*[@id="transactionHistory"]/div/div[5]/div/span/strong[2]').text three_url = self.browser.find_element_by_xpath('//div[@id="transactionHistoryWarp"]').get_attribute( 'init-src') time.sleep(random.randint(0,1)) m = m+1 if page != None and three_url != None and product != None: print(product) print(three_url) break else: continue else: break
except : self.browser.find_element_by_xpath('//li[@id="ctab-transactionHistory"]/span').click() # 点击历史记录 time.sleep(random.randint(0, 1)) #拿到的链接不完整,需要自己拼接 three_url_list = re.split('\?', three_url) three_url = three_url_list r_three_url = '这里是目标网站初始链接' + three_url[0] + '?act=pageload&appid=dh&page={}&' + three_url[-1] + '&lang=en'
info_list = [['买家', '产品名称', '购买数量', '购买日期', '买方地址']] self.three_parse_page(product, info_list, page, r_three_url)
#与parse_page还有save_page结合爬取某一个产品节点的所有历史交易记录 def three_parse_page(self,product,info_list,page,r_three_url): j = 1 while True: if j <= int(page) :
self.browser2.get(r_three_url.format(j)) self.parse_page(info_list)
print('三级页面第{}页爬取成功'.format(j)) j = j+1 time.sleep(random.randint(1,2)) else: print('三级页面最后一页爬取成功') try: self.save_page(product,info_list) print('产品节点信息存储成功') except: self.save_page2(product,info_list) print('产品节点信息存储成功') time.sleep(random.randint(1,2)) break
#三级页面爬取函数 def parse_page(self,info_list):
tran_list = self.browser2.find_elements_by_xpath( '//div[@class="transaction-list"]//ul' )
for tran in tran_list: info_two = tran.text.split('\n') country = tran.find_element_by_xpath('.//li[@class="col1"]//img').get_attribute('src') country_list = re.split('\.|\/',country) cou = country_list[-2] info_two.append(cou)#买家,产品名称,数量,日期,买家地址 info_list.append(info_two)
time.sleep(random.randint(1,2))
#保存三级页面的数据 def save_page(self,product,info_list): with open('{}.csv'.format(product), 'a', newline='') as f: for rt in info_list: writer = csv.writer(f) writer.writerow([rt[0].strip(), rt[1].strip(), rt[2].strip(), rt[3].strip(), rt[4]]) #有些product作为文件名由于有'/'这个符号会出错,所以写个新的函数去除'/' def save_page2(self,product,info_list): product = re.split('\/',product)[0] with open('{}.csv'.format(product), 'a', newline='') as f: for rt in info_list: writer = csv.writer(f) writer.writerow([rt[0].strip(), rt[1].strip(), rt[2].strip(), rt[3].strip(), rt[4]]) #凡是爬取失败的产品,报错信息存下来(页数,第几个商品) def save_error(self,a): with open('error.csv', 'w', newline='') as f: writer = csv.writer(f) writer.writerow([a[0].strip()])
def main(self): self.get_page() self.xunhuan_one_page()
if __name__ == '__main__': spider = TestSpider() spider.main()
|