简单概述
本系列可能是一个比较长的系列,主要是对《Python3网络爬虫开发实战》前七章的一个内容总结并且熟悉运用一下相关的结构与技能。
任务方针
爬取电影数据网站ssr1.scrape.center/, 此网站无反爬,数据经过服务端渲染,需求爬取的部分为列表页里边的电影数据概况。
任务方针解析
- 爬取ssr1.scrape.center/, 网站的列表页面,经过列表页面的内容获取到需求的URL
- 爬取ssr1.scrape.center/detail/{id}, 网站内的数据概况,需求获取的部分有:
- 电影标题
- 电影图片的url
- 电影上映时刻
- 电影分类
- 电影评分
- 剧情简介
- 将内容寄存到需求的数据库中
技能选型与爬取
怎么爬取
playwright库是微软开源的一个库,这个库的功用更加的强壮,除了能够完成同步操作,相同也能够完成异步的操作,这个库能够说是现在功用最强壮的库也不为过,由于其还支持xpath,css选择器等一些元素的选择操作,乃至能够经过点击鼠标进行操作,然后完成自动化构建代码,全体的功用真的十分强壮。
构建根底的爬取函数
# 抓取网页内容
def scrape_page(page, url):
logging.info('scraping %s ...', url)
try:
page.goto(url)
page.wait_for_load_state('networkidle')
except:
logging.error('error occured while scraping %s', url, exc_info=True)
对于网页内容的操作,咱们只需求对页面选项卡进行操作,传入页面选项卡对象和url链接完成咱们想要完结的页面恳求,在这种恳求下,咱们经过等待网络恳求的响应情况来判别页面是否完全响应。
构建列表页的爬取函数
这个部分只需求分分出最根底的URL的页码规矩就能够完结对页面内容的爬取,经过剖析咱们能够发现https://ssr1.scrape.center/page/{page}
能够发现变化的内容在{page}
部分,因而构建的抓取方法如下:
def scrape_index(page, page_index):
index_url = f'{BASE_URL}/page/{page_index}'
return scrape_page(page, index_url)
构建概况页的爬取函数
概况页的爬取是建立在解析列表的根底上取得的,因而概况页爬取函数只需求知道url就能够直接调用根底爬取函数,而这儿咱们只需求对列表页解析后就能够获取到咱们所需求的url,因而全体的构建方法如下:
def scrape_detail(page, url):
return scrape_page(page, url)
怎么解析
解析列表页后获取概况页的URL
快速快捷的选择器让咱们经过一行代码就获取到咱们所需求的标签与特点,十分便利的完结了咱们需求获取概况页的URL.
# 获取解析内容
def parse_index(page):
# 获取网页内容恳求
elements = page.query_selector_all('a.name')
# 获取元素信息
for element in elements:
part_of_url = element.get_attribute('href')
detail_url = urljoin(BASE_URL, part_of_url)
logging.info('get url: %s', detail_url)
yield detail_url
解析概况页获取需求的数据
当概况页数据获取到之后,对网页内的信息进行解析,完成对电影称号,电影类别,图片地址,剧情简介以及评分的内容获取:
def parse_detail(page):
# 获取标题
name = None
name_tag = page.query_selector('h2.m-b-sm')
if name_tag:
name = name_tag.text_content()
# 获取图片
cover = None
cover_tag = page.query_selector('img.cover')
if cover_tag:
cover = cover_tag.get_attribute('src')
# 获取分类
categories = []
category_tags = page.query_selector_all('div.categories > button > span')
if category_tags:
categories = [category.text_content() for category in category_tags]
# 获取评分
score = None
score_tag = page.query_selector('p.score')
if score_tag:
score = score_tag.text_content().strip()
# 剧情简介
drama = None
drama_tag = page.query_selector('div.drama > p')
if drama_tag:
drama = drama_tag.text_content().strip()
return {
# 标题
'name': name,
# 图片
'cover': cover,
# 分类
'categories': categories,
# 简介
'drama': drama,
# 评分
'score': score
}
怎么存储
本次存储运用txt文本进行文件内容的存储,直接将文件内容写入一个txt文件傍边。
# 数据内容存储
def save_data(data):
# 文件寄存地址
data_path = '{0}/movies.txt'.format(RESULT_DIR)
# 进行文件写入
with open(data_path, 'a+', encoding='utf-8') as file:
name = data.get('name', None)
cover = data.get('cover', None)
categories = data.get('categories', None)
drama = data.get('drama', None)
score = data.get('score', None)
file.write('name:'+name+'\n')
file.write('cover:'+cover+'\n')
file.write('categories:'+str(categories)+'\n')
file.write('drama:'+drama+'\n')
file.write('score:'+score+'\n')
file.write('='*50 + '\n')
源代码
import logging
from os import makedirs
from os.path import exists
from urllib.parse import urljoin
from playwright.sync_api import sync_playwright
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
#
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULT_DIR = 'results'
exists(RESULT_DIR) or makedirs(RESULT_DIR)
# 抓取网页内容
def scrape_page(page, url):
logging.info('scraping %s ...', url)
try:
page.goto(url)
page.wait_for_load_state('networkidle')
except:
logging.error('error occured while scraping %s', url, exc_info=True)
def scrape_index(page, page_index):
index_url = f'{BASE_URL}/page/{page_index}'
return scrape_page(page, index_url)
def scrape_detail(page, url):
return scrape_page(page, url)
# 获取解析内容
def parse_index(page):
# 获取网页内容恳求
elements = page.query_selector_all('a.name')
# 获取元素信息
for element in elements:
part_of_url = element.get_attribute('href')
detail_url = urljoin(BASE_URL, part_of_url)
logging.info('get url: %s', detail_url)
yield detail_url
def parse_detail(page):
# 获取标题
name = None
name_tag = page.query_selector('h2.m-b-sm')
if name_tag:
name = name_tag.text_content()
# 获取图片
cover = None
cover_tag = page.query_selector('img.cover')
if cover_tag:
cover = cover_tag.get_attribute('src')
# 获取分类
categories = []
category_tags = page.query_selector_all('div.categories > button > span')
if category_tags:
categories = [category.text_content() for category in category_tags]
# 获取评分
score = None
score_tag = page.query_selector('p.score')
if score_tag:
score = score_tag.text_content().strip()
# 剧情简介
drama = None
drama_tag = page.query_selector('div.drama > p')
if drama_tag:
drama = drama_tag.text_content().strip()
return {
# 标题
'name': name,
# 图片
'cover': cover,
# 分类
'categories': categories,
# 简介
'drama': drama,
# 评分
'score': score
}
# 数据内容存储
def save_data(data):
# 文件寄存地址
data_path = '{0}/movies.txt'.format(RESULT_DIR)
# 进行文件写入
with open(data_path, 'a+', encoding='utf-8') as file:
name = data.get('name', None)
cover = data.get('cover', None)
categories = data.get('categories', None)
drama = data.get('drama', None)
score = data.get('score', None)
file.write('name:'+name+'\n')
file.write('cover:'+cover+'\n')
file.write('categories:'+str(categories)+'\n')
file.write('drama:'+drama+'\n')
file.write('score:'+score+'\n')
file.write('='*50 + '\n')
def main():
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
for page_index in range(1, TOTAL_PAGE + 1):
scrape_index(page, page_index)
detail_urls = list(parse_index(page))
for detail_url in detail_urls:
scrape_detail(page, detail_url)
data = parse_detail(page)
logging.info('get data: %s', data)
save_data(data)
browser.close()
if __name__ == '__main__':
main()
版权信息
本文由PorterZhang整理或写作完结
本人的Github: PorterZhang2021
本人的博客地址:PorterZhang