• python爬虫实战——小红书


    目录

    1、博主页面分析

    2、在控制台预先获取所有作品页的URL 

    3、在 Python 中读入该文件并做准备工作 

    4、处理图文类型作品

    5、处理视频类型作品

    6、异常访问而被中断的现象

    7、完整参考代码 


            任务:在 win 环境下,利用 Python、webdriver、JavaScript等,获取 xiaohongshu 某个博主的全部作品。

            本文仅做学习和交流使用。

    1、博主页面分析

     

            section 代表每一项作品,但即使博主作品有很多,在未登录状态下,只会显示 20 项左右。向下滚动页面,section 发生改变(个数不变),标签中的 index 会递增。  

     

            向下滚动页面时,到一定的范围时,会发送一个获取作品数据的请求,该请求每次只请求 30 项作品数据。该请求携带了 cookies 以及其他不确定值的参数。 

     

    2、在控制台预先获取所有作品页的URL 

            为了获取博主的全部作品数据,在登录的状态下访问目标博主页面,在控制台中注入JavaScript 脚本。该脚本不断滚动页面到最底部,每次滚动一段距离后,都获取每一个作品的信息(通过a标签的 href 获取到作品页URL;通过判断a标签是否有一个 class='play-icon' 的后代元素来判断是图文还是视频类型的作品,如果有该标签就是视频作品,反之则是图文作品)。

            将作品页 URL 作为键,作品是视频作品还是图文作品作为值,添加到 js 对象中。

            显示完所有的作品,即滚动到最底部时,将所有获取到的作品信息导出为 txt 文件。

             在控制台中执行:

    1. // 页面高度
    2. const vh = window.innerHeight
    3. let work_obj = {}
    4. // 延迟
    5. function delay(ms){
    6. return new Promise(resolve => setTimeout(resolve, ms));
    7. }
    8. async function action() {
    9. let last_height = document.body.offsetHeight;
    10. window.scrollTo(0, window.scrollY + vh * 1.5)
    11. ul = document.querySelector('#userPostedFeeds').querySelectorAll('.cover')
    12. ul.forEach((e,index)=>{
    13. // length 为 0 时是图片,为 1 时为视频
    14. work_obj[e.href] = ul[index].querySelector('.play-icon') ? 1 : 0
    15. })
    16. // 延迟500ms
    17. await delay(500);
    18. // console.log(last_height, document.body.offsetHeight)
    19. // 判断是否滚动到底部
    20. if(document.body.offsetHeight > last_height){
    21. action()
    22. }else{
    23. console.log('end')
    24. // 作品的数量
    25. console.log(Object.keys(work_obj).length)
    26. // 转换格式,并下载为txt文件
    27. var content = JSON.stringify(work_obj);
    28. var blob = new Blob([content], {type: "text/plain;charset=utf-8"});
    29. var link = document.createElement("a");
    30. link.href = URL.createObjectURL(blob);
    31. link.download = "xhs_works.txt";
    32. link.click();
    33. }
    34. }
    35. action()

            写出的 txt 文件内容如下:

     

    3、在 Python 中读入该文件并做准备工作 

    1. # 获取当前时间
    2. def get_current_time():
    3. now = datetime.now()
    4. format_time = now.strftime("_%Y-%m-%d__%H-%M-%S-%f__")
    5. return format_time
    6. # 下载的作品保存的路径,以作者主页的 id 号命名
    7. ABS_BASE_URL = f'G:\\639476c10000000026006023'
    8. # 检查作品是否已经下载过
    9. def check_download_or_not(work_id, is_pictures):
    10. end_str = 'pictures' if is_pictures else 'video'
    11. # work_id 是每一个作品的目录,检查目录是否存在并且是否有内容,则能判断对应的作品是否被下载过
    12. path = f'{ABS_BASE_URL}/{work_id}-{end_str}'
    13. if os.path.exists(path) and os.path.isdir(path):
    14. if os.listdir(path):
    15. return True
    16. return False
    17. # 下载资源
    18. def download_resource(url, save_path):
    19. response = requests.get(url, stream=True)
    20. if response.status_code == 200:
    21. with open(save_path, 'wb') as file:
    22. for chunk in response.iter_content(1024):
    23. file.write(chunk)

    读入文件,判断作品数量然后进行任务分配: 

    1. # 读入文件
    2. content = ''
    3. with open('./xhs_works.txt', mode='r', encoding='utf-8') as f:
    4. content = json.load(f)
    5. # 转换成 [[href, is_pictures],[href, is_pictures],...] 类型
    6. # 每一维中分别是作品页的URL、作品类型
    7. url_list = [list(pair) for pair in content.items()]
    8. # 有多少个作品
    9. length = len(url_list)
    10. if length > 3:
    11. ul = [url_list[0: int(length / 3) + 1], url_list[int(length / 3) + 1: int(length / 3) * 2 + 1],url_list[int(length / 3) * 2 + 1: length]]
    12. # 开启三个线程并分配任务
    13. for child_ul in ul:
    14. thread = threading.Thread(target=thread_task, args=(child_ul,))
    15. thread.start()
    16. else:
    17. thread_task(url_list)

    若使用多线程,每一个线程处理自己被分配到的作品列表: 

    1. # 每一个线程遍历自己分配到的作品列表,进行逐项处理
    2. def thread_task(ul):
    3. for item in ul:
    4. href = item[0]
    5. is_pictures = (True if item[1] == 0 else False)
    6. res = work_task(href, is_pictures)
    7. if res == 0: # 被阻止正常访问
    8. break

    处理每一项作品: 

    1. # 处理每一项作品
    2. def work_task(href, is_pictures):
    3. # href 中最后的一个路径参数就是博主的id
    4. work_id = href.split('/')[-1]
    5. # 判断是否已经下载过该作品
    6. has_downloaded = check_download_or_not(work_id, is_pictures)
    7. # 没有下载,则去下载
    8. if not has_downloaded:
    9. if not is_pictures:
    10. res = deal_video(work_id)
    11. else:
    12. res = deal_pictures(work_id)
    13. if res == 0:
    14. return 0 # 无法正常访问
    15. else:
    16. print('当前作品已被下载')
    17. return 2
    18. return 1

     

    4、处理图文类型作品

            对于图文类型,每一张图片都作为 div 元素的背景图片进行展示,图片对应的 URL 在 div 元素的 style 中。 可以先获取到 style 的内容,然后根据圆括号进行分隔,最后得到图片的地址。

            这里拿到的图片是没有水印的。

    1. # 处理图片类型作品的一系列操作
    2. def download_pictures_prepare(res_links, path, date):
    3. # 下载作品到目录
    4. index = 0
    5. for src in res_links:
    6. download_resource(src, f'{path}/{date}-{index}.webp')
    7. index += 1
    8. # 处理图片类型的作品
    9. def deal_pictures(work_id):
    10. # 直接 requests 请求回来,style 是空的,使用 webdriver 获取当前界面的源代码
    11. temp_driver = webdriver.Chrome()
    12. temp_driver.set_page_load_timeout(5)
    13. temp_driver.get(f'https://www.xiaohongshu.com/explore/{work_id}')
    14. sleep(1)
    15. try:
    16. # 如果页面中有 class='feedback-btn' 这个元素,则表示不能正常访问
    17. temp_driver.find_element(By.CLASS_NAME, 'feedback-btn')
    18. except NoSuchElementException: # 没有该元素,则说明能正常访问到作品页面
    19. WebDriverWait(temp_driver, 5).until(EC.presence_of_element_located((By.CLASS_NAME, 'swiper-wrapper')))
    20. # 获取页面的源代码
    21. source_code = temp_driver.page_source
    22. temp_driver.quit()
    23. html = BeautifulSoup(source_code, 'lxml')
    24. swiper_sliders = html.find_all(class_='swiper-slide')
    25. # 当前作品的发表日期
    26. date = html.find(class_='bottom-container').span.string.split(' ')[0].strip()
    27. # 图片路径
    28. res_links = []
    29. for item in swiper_sliders:
    30. # 在 style 中提取出图片的 url
    31. url = item['style'].split('url(')[1].split(')')[0].replace('"', '').replace('"', '')
    32. if url not in res_links:
    33. res_links.append(url)
    34. #为图片集创建目录
    35. path = f'{ABS_BASE_URL}/{work_id}-pictures'
    36. try:
    37. os.makedirs(path)
    38. except FileExistsError:
    39. # 目录已经存在,则直接下载到该目录下
    40. download_pictures_prepare(res_links, path, date)
    41. except Exception as err:
    42. print(f'deal_pictures 捕获到其他错误:{err}')
    43. else:
    44. download_pictures_prepare(res_links, path, date)
    45. finally:
    46. return 1
    47. except Exception as err:
    48. print(f'下载图片类型作品 捕获到错误:{err}')
    49. return 1
    50. else:
    51. print(f'访问作品页面被阻断,下次再试')
    52. return 0

     

    5、处理视频类型作品

            获取到的视频有水印。 

    1. # 处理视频类型的作品
    2. def deal_video(work_id):
    3. temp_driver = webdriver.Chrome()
    4. temp_driver.set_page_load_timeout(5)
    5. temp_driver.get(f'https://www.xiaohongshu.com/explore/{work_id}')
    6. sleep(1)
    7. try:
    8. temp_driver.find_element(By.CLASS_NAME, 'feedback-btn')
    9. except NoSuchElementException:
    10. WebDriverWait(temp_driver, 5).until(EC.presence_of_element_located((By.CLASS_NAME, 'player-container')))
    11. source_code = temp_driver.page_source
    12. temp_driver.quit()
    13. html = BeautifulSoup(source_code, 'lxml')
    14. video_src = html.find(class_='player-el').video['src']
    15. # 作品发布日期
    16. date = html.find(class_='bottom-container').span.string.split(' ')[0].strip()
    17. # 为视频作品创建目录,以 作品的id号 + video 命名目录
    18. path = f'{ABS_BASE_URL}/{work_id}-video'
    19. try:
    20. os.makedirs(path)
    21. except FileExistsError:
    22. download_resource(video_src, f'{path}/{date}.mp4')
    23. except Exception as err:
    24. print(f'deal_video 捕获到其他错误:{err}')
    25. else:
    26. download_resource(video_src, f'{path}/{date}.mp4')
    27. finally:
    28. return 1
    29. except Exception as err:
    30. print(f'下载视频类型作品 捕获到错误:{err}')
    31. return 1
    32. else:
    33. print(f'访问视频作品界面被阻断,下次再试')
    34. return 0

     

     

    6、异常访问而被中断的现象

             频繁的访问和下载资源会被重定向到如下的页面,可以通过获取到该页面的特殊标签来判断是否被重定向连接,如果是,则及时中断访问,稍后再继续。

            使用 webdriver 访问页面,页面打开后,在 try 中查找是否有 class='feedback-btn' 元素(即下方的 我要反馈 的按钮)。如果有该元素,则在 else 中进行提示并返回错误码退出任务。如果找不到元素,则会触发 NoSuchElementException 的错误,在 except 中继续任务即可。

    1. try:
    2. temp_driver.find_element(By.CLASS_NAME, 'feedback-btn')
    3. except NoSuchElementException:
    4. # 正常访问到作品页面
    5. pass
    6. except Exception as err:
    7. # 其他的异常
    8. return 1
    9. else:
    10. # 不能访问到作品页面
    11. return 0

     

    7、完整参考代码 

    1. import json
    2. import threading
    3. import requests,os
    4. from selenium.webdriver.common.by import By
    5. from selenium.common.exceptions import NoSuchElementException
    6. from selenium.webdriver.support import expected_conditions as EC
    7. from selenium.webdriver.support.wait import WebDriverWait
    8. from datetime import datetime
    9. from selenium import webdriver
    10. from time import sleep
    11. from bs4 import BeautifulSoup
    12. # 获取当前时间
    13. def get_current_time():
    14. now = datetime.now()
    15. format_time = now.strftime("_%Y-%m-%d__%H-%M-%S-%f__")
    16. return format_time
    17. # 下载的作品保存的路径,以作者主页的 id 号命名
    18. ABS_BASE_URL = f'G:\\639476c10000000026006023'
    19. # 检查作品是否已经下载过
    20. def check_download_or_not(work_id, is_pictures):
    21. end_str = 'pictures' if is_pictures else 'video'
    22. # work_id 是每一个作品的目录,检查目录是否存在并且是否有内容,则能判断对应的作品是否被下载过
    23. path = f'{ABS_BASE_URL}/{work_id}-{end_str}'
    24. if os.path.exists(path) and os.path.isdir(path):
    25. if os.listdir(path):
    26. return True
    27. return False
    28. # 下载资源
    29. def download_resource(url, save_path):
    30. response = requests.get(url, stream=True)
    31. if response.status_code == 200:
    32. with open(save_path, 'wb') as file:
    33. for chunk in response.iter_content(1024):
    34. file.write(chunk)
    35. # 处理图片类型作品的一系列操作
    36. def download_pictures_prepare(res_links, path, date):
    37. # 下载作品到目录
    38. index = 0
    39. for src in res_links:
    40. download_resource(src, f'{path}/{date}-{index}.webp')
    41. index += 1
    42. # 处理图片类型的作品
    43. def deal_pictures(work_id):
    44. # 直接 requests 请求回来,style 是空的,使用 webdriver 获取当前界面的源代码
    45. temp_driver = webdriver.Chrome()
    46. temp_driver.set_page_load_timeout(5)
    47. temp_driver.get(f'https://www.xiaohongshu.com/explore/{work_id}')
    48. sleep(1)
    49. try:
    50. temp_driver.find_element(By.CLASS_NAME, 'feedback-btn')
    51. except NoSuchElementException:
    52. WebDriverWait(temp_driver, 5).until(EC.presence_of_element_located((By.CLASS_NAME, 'swiper-wrapper')))
    53. source_code = temp_driver.page_source
    54. temp_driver.quit()
    55. html = BeautifulSoup(source_code, 'lxml')
    56. swiper_sliders = html.find_all(class_='swiper-slide')
    57. # 当前作品的发表日期
    58. date = html.find(class_='bottom-container').span.string.split(' ')[0].strip()
    59. # 图片路径
    60. res_links = []
    61. for item in swiper_sliders:
    62. url = item['style'].split('url(')[1].split(')')[0].replace('"', '').replace('"', '')
    63. if url not in res_links:
    64. res_links.append(url)
    65. #为图片集创建目录
    66. path = f'{ABS_BASE_URL}/{work_id}-pictures'
    67. try:
    68. os.makedirs(path)
    69. except FileExistsError:
    70. # 目录已经存在,则直接下载到该目录下
    71. download_pictures_prepare(res_links, path, date)
    72. except Exception as err:
    73. print(f'deal_pictures 捕获到其他错误:{err}')
    74. else:
    75. download_pictures_prepare(res_links, path, date)
    76. finally:
    77. return 1
    78. except Exception as err:
    79. print(f'下载图片类型作品 捕获到错误:{err}')
    80. return 1
    81. else:
    82. print(f'访问作品页面被阻断,下次再试')
    83. return 0
    84. # 处理视频类型的作品
    85. def deal_video(work_id):
    86. temp_driver = webdriver.Chrome()
    87. temp_driver.set_page_load_timeout(5)
    88. temp_driver.get(f'https://www.xiaohongshu.com/explore/{work_id}')
    89. sleep(1)
    90. try:
    91. # 访问不到正常内容的标准元素
    92. temp_driver.find_element(By.CLASS_NAME, 'feedback-btn')
    93. except NoSuchElementException:
    94. WebDriverWait(temp_driver, 5).until(EC.presence_of_element_located((By.CLASS_NAME, 'player-container')))
    95. source_code = temp_driver.page_source
    96. temp_driver.quit()
    97. html = BeautifulSoup(source_code, 'lxml')
    98. video_src = html.find(class_='player-el').video['src']
    99. # 作品发布日期
    100. date = html.find(class_='bottom-container').span.string.split(' ')[0].strip()
    101. # 为视频作品创建目录
    102. path = f'{ABS_BASE_URL}/{work_id}-video'
    103. try:
    104. os.makedirs(path)
    105. except FileExistsError:
    106. download_resource(video_src, f'{path}/{date}.mp4')
    107. except Exception as err:
    108. print(f'deal_video 捕获到其他错误:{err}')
    109. else:
    110. download_resource(video_src, f'{path}/{date}.mp4')
    111. finally:
    112. return 1
    113. except Exception as err:
    114. print(f'下载视频类型作品 捕获到错误:{err}')
    115. return 1
    116. else:
    117. print(f'访问视频作品界面被阻断,下次再试')
    118. return 0
    119. # 检查作品是否已经下载,如果没有下载则去下载
    120. def work_task(href, is_pictures):
    121. work_id = href.split('/')[-1]
    122. has_downloaded = check_download_or_not(work_id, is_pictures)
    123. # 没有下载,则去下载
    124. if not has_downloaded:
    125. if not is_pictures:
    126. res = deal_video(work_id)
    127. else:
    128. res = deal_pictures(work_id)
    129. if res == 0:
    130. return 0 # 受到阻断
    131. else:
    132. print('当前作品已被下载')
    133. return 2
    134. return 1
    135. def thread_task(ul):
    136. for item in ul:
    137. href = item[0]
    138. is_pictures = (True if item[1] == 0 else False)
    139. res = work_task(href, is_pictures)
    140. if res == 0: # 被阻止正常访问
    141. break
    142. if __name__ == '__main__':
    143. content = ''
    144. with open('xhs_works.txt', mode='r', encoding='utf-8') as f:
    145. content = json.load(f)
    146. url_list = [list(pair) for pair in content.items()]
    147. length = len(url_list)
    148. if length > 3:
    149. ul = [url_list[0: int(length / 3) + 1], url_list[int(length / 3) + 1: int(length / 3) * 2 + 1],
    150. url_list[int(length / 3) * 2 + 1: length]]
    151. for child_ul in ul:
    152. thread = threading.Thread(target=thread_task, args=(child_ul,))
    153. thread.start()
    154. else:
    155. thread_task(url_list)

  • 相关阅读:
    Flink学习第八天——Flink核心的Sink Operator实战
    MASA MAUI Plugin 安卓蓝牙低功耗(一)蓝牙扫描
    【后端版】分布式医疗云平台【【统计模块】his-statistics 模块、【子父项目】statistics-api 、statistics-domain、statistics-mapper】(十五)
    有序序列判断
    模拟实现C语言--strcmp函数
    ASP.NET Core Web API入门之一:创建新项目
    Linux 入门及常见Shell命令
    java计算机毕业设计ssm基金分析系统的设计与实现(源码+系统+mysql数据库+Lw文档)
    LeetCode 739 每日温度(单调栈的初步了解)
    pg_dump执行流程简单记录
  • 原文地址:https://blog.csdn.net/hao_13/article/details/136687023