目录
任务:在 win 环境下,利用 Python、webdriver、JavaScript等,获取 xiaohongshu 某个博主的全部作品。
本文仅做学习和交流使用。
section 代表每一项作品,但即使博主作品有很多,在未登录状态下,只会显示 20 项左右。向下滚动页面,section 发生改变(个数不变),标签中的 index 会递增。
向下滚动页面时,到一定的范围时,会发送一个获取作品数据的请求,该请求每次只请求 30 项作品数据。该请求携带了 cookies 以及其他不确定值的参数。
为了获取博主的全部作品数据,在登录的状态下访问目标博主页面,在控制台中注入JavaScript 脚本。该脚本不断滚动页面到最底部,每次滚动一段距离后,都获取每一个作品的信息(通过a标签的 href 获取到作品页URL;通过判断a标签是否有一个 class='play-icon' 的后代元素来判断是图文还是视频类型的作品,如果有该标签就是视频作品,反之则是图文作品)。
将作品页 URL 作为键,作品是视频作品还是图文作品作为值,添加到 js 对象中。
显示完所有的作品,即滚动到最底部时,将所有获取到的作品信息导出为 txt 文件。
在控制台中执行:
- // 页面高度
- const vh = window.innerHeight
- let work_obj = {}
-
- // 延迟
- function delay(ms){
- return new Promise(resolve => setTimeout(resolve, ms));
- }
-
- async function action() {
- let last_height = document.body.offsetHeight;
- window.scrollTo(0, window.scrollY + vh * 1.5)
-
- ul = document.querySelector('#userPostedFeeds').querySelectorAll('.cover')
-
- ul.forEach((e,index)=>{
- // length 为 0 时是图片,为 1 时为视频
- work_obj[e.href] = ul[index].querySelector('.play-icon') ? 1 : 0
- })
- // 延迟500ms
- await delay(500);
- // console.log(last_height, document.body.offsetHeight)
-
- // 判断是否滚动到底部
- if(document.body.offsetHeight > last_height){
- action()
- }else{
- console.log('end')
- // 作品的数量
- console.log(Object.keys(work_obj).length)
-
- // 转换格式,并下载为txt文件
- var content = JSON.stringify(work_obj);
- var blob = new Blob([content], {type: "text/plain;charset=utf-8"});
- var link = document.createElement("a");
- link.href = URL.createObjectURL(blob);
- link.download = "xhs_works.txt";
- link.click();
- }
- }
-
- action()
写出的 txt 文件内容如下:
- # 获取当前时间
- def get_current_time():
- now = datetime.now()
- format_time = now.strftime("_%Y-%m-%d__%H-%M-%S-%f__")
- return format_time
-
- # 下载的作品保存的路径,以作者主页的 id 号命名
- ABS_BASE_URL = f'G:\\639476c10000000026006023'
-
- # 检查作品是否已经下载过
- def check_download_or_not(work_id, is_pictures):
- end_str = 'pictures' if is_pictures else 'video'
- # work_id 是每一个作品的目录,检查目录是否存在并且是否有内容,则能判断对应的作品是否被下载过
- path = f'{ABS_BASE_URL}/{work_id}-{end_str}'
- if os.path.exists(path) and os.path.isdir(path):
- if os.listdir(path):
- return True
- return False
-
- # 下载资源
- def download_resource(url, save_path):
- response = requests.get(url, stream=True)
- if response.status_code == 200:
- with open(save_path, 'wb') as file:
- for chunk in response.iter_content(1024):
- file.write(chunk)
读入文件,判断作品数量然后进行任务分配:
- # 读入文件
- content = ''
- with open('./xhs_works.txt', mode='r', encoding='utf-8') as f:
- content = json.load(f)
-
- # 转换成 [[href, is_pictures],[href, is_pictures],...] 类型
- # 每一维中分别是作品页的URL、作品类型
- url_list = [list(pair) for pair in content.items()]
-
- # 有多少个作品
- length = len(url_list)
-
- if length > 3:
- ul = [url_list[0: int(length / 3) + 1], url_list[int(length / 3) + 1: int(length / 3) * 2 + 1],url_list[int(length / 3) * 2 + 1: length]]
- # 开启三个线程并分配任务
- for child_ul in ul:
- thread = threading.Thread(target=thread_task, args=(child_ul,))
- thread.start()
- else:
- thread_task(url_list)
若使用多线程,每一个线程处理自己被分配到的作品列表:
- # 每一个线程遍历自己分配到的作品列表,进行逐项处理
- def thread_task(ul):
- for item in ul:
- href = item[0]
- is_pictures = (True if item[1] == 0 else False)
- res = work_task(href, is_pictures)
- if res == 0: # 被阻止正常访问
- break
处理每一项作品:
- # 处理每一项作品
- def work_task(href, is_pictures):
- # href 中最后的一个路径参数就是博主的id
- work_id = href.split('/')[-1]
-
- # 判断是否已经下载过该作品
- has_downloaded = check_download_or_not(work_id, is_pictures)
-
- # 没有下载,则去下载
- if not has_downloaded:
- if not is_pictures:
- res = deal_video(work_id)
- else:
- res = deal_pictures(work_id)
- if res == 0:
- return 0 # 无法正常访问
- else:
- print('当前作品已被下载')
- return 2
- return 1
对于图文类型,每一张图片都作为 div 元素的背景图片进行展示,图片对应的 URL 在 div 元素的 style 中。 可以先获取到 style 的内容,然后根据圆括号进行分隔,最后得到图片的地址。
这里拿到的图片是没有水印的。
- # 处理图片类型作品的一系列操作
- def download_pictures_prepare(res_links, path, date):
- # 下载作品到目录
- index = 0
- for src in res_links:
- download_resource(src, f'{path}/{date}-{index}.webp')
- index += 1
-
- # 处理图片类型的作品
- def deal_pictures(work_id):
- # 直接 requests 请求回来,style 是空的,使用 webdriver 获取当前界面的源代码
- temp_driver = webdriver.Chrome()
- temp_driver.set_page_load_timeout(5)
- temp_driver.get(f'https://www.xiaohongshu.com/explore/{work_id}')
- sleep(1)
- try:
- # 如果页面中有 class='feedback-btn' 这个元素,则表示不能正常访问
- temp_driver.find_element(By.CLASS_NAME, 'feedback-btn')
- except NoSuchElementException: # 没有该元素,则说明能正常访问到作品页面
- WebDriverWait(temp_driver, 5).until(EC.presence_of_element_located((By.CLASS_NAME, 'swiper-wrapper')))
-
- # 获取页面的源代码
- source_code = temp_driver.page_source
- temp_driver.quit()
-
- html = BeautifulSoup(source_code, 'lxml')
- swiper_sliders = html.find_all(class_='swiper-slide')
- # 当前作品的发表日期
- date = html.find(class_='bottom-container').span.string.split(' ')[0].strip()
-
- # 图片路径
- res_links = []
- for item in swiper_sliders:
- # 在 style 中提取出图片的 url
- url = item['style'].split('url(')[1].split(')')[0].replace('"', '').replace('"', '')
- if url not in res_links:
- res_links.append(url)
-
- #为图片集创建目录
- path = f'{ABS_BASE_URL}/{work_id}-pictures'
- try:
- os.makedirs(path)
- except FileExistsError:
- # 目录已经存在,则直接下载到该目录下
- download_pictures_prepare(res_links, path, date)
- except Exception as err:
- print(f'deal_pictures 捕获到其他错误:{err}')
- else:
- download_pictures_prepare(res_links, path, date)
- finally:
- return 1
- except Exception as err:
- print(f'下载图片类型作品 捕获到错误:{err}')
- return 1
- else:
- print(f'访问作品页面被阻断,下次再试')
- return 0
获取到的视频有水印。
- # 处理视频类型的作品
- def deal_video(work_id):
- temp_driver = webdriver.Chrome()
- temp_driver.set_page_load_timeout(5)
- temp_driver.get(f'https://www.xiaohongshu.com/explore/{work_id}')
- sleep(1)
- try:
- temp_driver.find_element(By.CLASS_NAME, 'feedback-btn')
- except NoSuchElementException:
- WebDriverWait(temp_driver, 5).until(EC.presence_of_element_located((By.CLASS_NAME, 'player-container')))
- source_code = temp_driver.page_source
- temp_driver.quit()
-
- html = BeautifulSoup(source_code, 'lxml')
- video_src = html.find(class_='player-el').video['src']
- # 作品发布日期
- date = html.find(class_='bottom-container').span.string.split(' ')[0].strip()
-
- # 为视频作品创建目录,以 作品的id号 + video 命名目录
- path = f'{ABS_BASE_URL}/{work_id}-video'
- try:
- os.makedirs(path)
- except FileExistsError:
- download_resource(video_src, f'{path}/{date}.mp4')
- except Exception as err:
- print(f'deal_video 捕获到其他错误:{err}')
- else:
- download_resource(video_src, f'{path}/{date}.mp4')
- finally:
- return 1
- except Exception as err:
- print(f'下载视频类型作品 捕获到错误:{err}')
- return 1
- else:
- print(f'访问视频作品界面被阻断,下次再试')
- return 0
频繁的访问和下载资源会被重定向到如下的页面,可以通过获取到该页面的特殊标签来判断是否被重定向连接,如果是,则及时中断访问,稍后再继续。
使用 webdriver 访问页面,页面打开后,在 try 中查找是否有 class='feedback-btn' 元素(即下方的 我要反馈 的按钮)。如果有该元素,则在 else 中进行提示并返回错误码退出任务。如果找不到元素,则会触发 NoSuchElementException 的错误,在 except 中继续任务即可。
- try:
- temp_driver.find_element(By.CLASS_NAME, 'feedback-btn')
- except NoSuchElementException:
- # 正常访问到作品页面
- pass
- except Exception as err:
- # 其他的异常
- return 1
- else:
- # 不能访问到作品页面
- return 0
-
- import json
- import threading
- import requests,os
- from selenium.webdriver.common.by import By
- from selenium.common.exceptions import NoSuchElementException
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.wait import WebDriverWait
- from datetime import datetime
- from selenium import webdriver
- from time import sleep
- from bs4 import BeautifulSoup
-
- # 获取当前时间
- def get_current_time():
- now = datetime.now()
- format_time = now.strftime("_%Y-%m-%d__%H-%M-%S-%f__")
- return format_time
-
- # 下载的作品保存的路径,以作者主页的 id 号命名
- ABS_BASE_URL = f'G:\\639476c10000000026006023'
-
- # 检查作品是否已经下载过
- def check_download_or_not(work_id, is_pictures):
- end_str = 'pictures' if is_pictures else 'video'
- # work_id 是每一个作品的目录,检查目录是否存在并且是否有内容,则能判断对应的作品是否被下载过
- path = f'{ABS_BASE_URL}/{work_id}-{end_str}'
- if os.path.exists(path) and os.path.isdir(path):
- if os.listdir(path):
- return True
- return False
-
- # 下载资源
- def download_resource(url, save_path):
- response = requests.get(url, stream=True)
- if response.status_code == 200:
- with open(save_path, 'wb') as file:
- for chunk in response.iter_content(1024):
- file.write(chunk)
-
- # 处理图片类型作品的一系列操作
- def download_pictures_prepare(res_links, path, date):
- # 下载作品到目录
- index = 0
- for src in res_links:
- download_resource(src, f'{path}/{date}-{index}.webp')
- index += 1
-
- # 处理图片类型的作品
- def deal_pictures(work_id):
- # 直接 requests 请求回来,style 是空的,使用 webdriver 获取当前界面的源代码
- temp_driver = webdriver.Chrome()
- temp_driver.set_page_load_timeout(5)
- temp_driver.get(f'https://www.xiaohongshu.com/explore/{work_id}')
- sleep(1)
- try:
- temp_driver.find_element(By.CLASS_NAME, 'feedback-btn')
- except NoSuchElementException:
- WebDriverWait(temp_driver, 5).until(EC.presence_of_element_located((By.CLASS_NAME, 'swiper-wrapper')))
- source_code = temp_driver.page_source
- temp_driver.quit()
-
- html = BeautifulSoup(source_code, 'lxml')
- swiper_sliders = html.find_all(class_='swiper-slide')
- # 当前作品的发表日期
- date = html.find(class_='bottom-container').span.string.split(' ')[0].strip()
-
- # 图片路径
- res_links = []
- for item in swiper_sliders:
- url = item['style'].split('url(')[1].split(')')[0].replace('"', '').replace('"', '')
- if url not in res_links:
- res_links.append(url)
-
- #为图片集创建目录
- path = f'{ABS_BASE_URL}/{work_id}-pictures'
- try:
- os.makedirs(path)
- except FileExistsError:
- # 目录已经存在,则直接下载到该目录下
- download_pictures_prepare(res_links, path, date)
- except Exception as err:
- print(f'deal_pictures 捕获到其他错误:{err}')
- else:
- download_pictures_prepare(res_links, path, date)
- finally:
- return 1
- except Exception as err:
- print(f'下载图片类型作品 捕获到错误:{err}')
- return 1
- else:
- print(f'访问作品页面被阻断,下次再试')
- return 0
-
-
- # 处理视频类型的作品
- def deal_video(work_id):
- temp_driver = webdriver.Chrome()
- temp_driver.set_page_load_timeout(5)
- temp_driver.get(f'https://www.xiaohongshu.com/explore/{work_id}')
- sleep(1)
- try:
- # 访问不到正常内容的标准元素
- temp_driver.find_element(By.CLASS_NAME, 'feedback-btn')
- except NoSuchElementException:
- WebDriverWait(temp_driver, 5).until(EC.presence_of_element_located((By.CLASS_NAME, 'player-container')))
- source_code = temp_driver.page_source
- temp_driver.quit()
-
- html = BeautifulSoup(source_code, 'lxml')
- video_src = html.find(class_='player-el').video['src']
- # 作品发布日期
- date = html.find(class_='bottom-container').span.string.split(' ')[0].strip()
-
- # 为视频作品创建目录
- path = f'{ABS_BASE_URL}/{work_id}-video'
- try:
- os.makedirs(path)
- except FileExistsError:
- download_resource(video_src, f'{path}/{date}.mp4')
- except Exception as err:
- print(f'deal_video 捕获到其他错误:{err}')
- else:
- download_resource(video_src, f'{path}/{date}.mp4')
- finally:
- return 1
- except Exception as err:
- print(f'下载视频类型作品 捕获到错误:{err}')
- return 1
- else:
- print(f'访问视频作品界面被阻断,下次再试')
- return 0
-
- # 检查作品是否已经下载,如果没有下载则去下载
- def work_task(href, is_pictures):
- work_id = href.split('/')[-1]
- has_downloaded = check_download_or_not(work_id, is_pictures)
- # 没有下载,则去下载
- if not has_downloaded:
- if not is_pictures:
- res = deal_video(work_id)
- else:
- res = deal_pictures(work_id)
- if res == 0:
- return 0 # 受到阻断
- else:
- print('当前作品已被下载')
- return 2
- return 1
-
- def thread_task(ul):
- for item in ul:
- href = item[0]
- is_pictures = (True if item[1] == 0 else False)
- res = work_task(href, is_pictures)
- if res == 0: # 被阻止正常访问
- break
-
- if __name__ == '__main__':
- content = ''
- with open('xhs_works.txt', mode='r', encoding='utf-8') as f:
- content = json.load(f)
- url_list = [list(pair) for pair in content.items()]
- length = len(url_list)
- if length > 3:
- ul = [url_list[0: int(length / 3) + 1], url_list[int(length / 3) + 1: int(length / 3) * 2 + 1],
- url_list[int(length / 3) * 2 + 1: length]]
- for child_ul in ul:
- thread = threading.Thread(target=thread_task, args=(child_ul,))
- thread.start()
- else:
- thread_task(url_list)