第八章 高性能异步爬虫

第八章 高性能异步爬虫
性能异步爬虫
目的:在爬虫中使用异步实现高性能的数据爬取操作

同步爬虫:(阻塞)
import requestsheaders = {    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'}urls = [    'http://xmdx.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10231.rar',    'http://zjlt.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10229.rar',    'http://xmdx.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10231.rar']def get_content(url):    print('正在爬取:',url)    #get方法是一个阻塞的方法    response = requests.get(url=url,headers=headers)    if response.status_code == 200 :        return response.contentdef parse_content(content):    print('响应数据的长度为:',len(content))for url in urls:    content = get_content(url)    parse_content(content)

异步爬虫的方式:
- 1.多线程,多进程(不建议):
好处:可以为相关阻塞的操作单独开启线程或者进程,阻塞操作就可以异步执行
弊端:无法无限制的开启多线程或者多进程。(无法对大量的url开启进程或线程 会占用大量cpu资源 影响其他使用)
- 2.线程池、进程池(适当的使用):
好处:我们可以降低系统对进程或者线程创建和销毁的一个频率,从而很好的降低系统的开销。
弊端:池中线程或进程的数量是有上限。
# import time# #使用单线程串行方式执行## def get_page(str):#     print("正在下载 :",str)#     time.sleep(2)#     print('下载成功:',str)## name_list =['xiaozi','aa','bb','cc']## start_time = time.time()## for i in range(len(name_list)):#     get_page(name_list[i])## end_time = time.time()# print('%d second'% (end_time-start_time))import time#导入线程池模块对应的类from multiprocessing.dummy import Pool#使用线程池方式执行(开始计时)start_time = time.time()def get_page(str): # 模拟网络请求    print("正在下载 :",str)    time.sleep(2)    print('下载成功:',str)name_list =['xiaozi','aa','bb','cc'] # 模拟四个url#实例化一个线程池对象pool = Pool(4)#将列表中每一个列表元素传递给get_page进行处理# 将会发生阻塞的函数传递到第一个参数pool.map(get_page,name_list)pool.close()pool.join()end_time = time.time()print(end_time-start_time)
# 异步爬虫之线程池案例应用
import
requestsfrom lxml import etreeimport refrom multiprocessing.dummy import Pool#需求:爬取梨视频的视频数据headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'}#原则:线程池处理的是阻塞且较为耗时的操作(不是处理所有的操作)#对下述url发起请求解析出视频详情页的url和视频的名称url = 'https://www.pearvideo.com/category_5'page_text = requests.get(url=url,headers=headers).text# 数据解析 解析详情页url和视频名称tree = etree.HTML(page_text)li_list = tree.xpath('//ul[@id="listvideoListUl"]/li')urls = [] #存储所有视频的链接and名字# 在li标签中进行局部数据解析for li in li_list: detail_url = 'https://www.pearvideo.com/'+li.xpath('./div/a/@href')[0] # 详情页url name = li.xpath('./div/a/div[2]/text()')[0]+'.mp4' # 视频名称 #对详情页的url发起请求(视频存储在详情页中的) detail_page_text = requests.get(url=detail_url,headers=headers).text #从详情页中解析出视频的地址(url)通过正则进行解析 因为是ajax请求的 ex = 'srcUrl="(.*?)",vdoUrl' video_url = re.findall(ex,detail_page_text)[0] # 视频链接 dic = { 'name':name, 'url':video_url } urls.append(dic)#对视频链接发起请求获取视频的二进制数据,然后将视频数据进行返回def get_video_data(dic): url = dic['url'] print(dic['name'],'正在下载......') data = requests.get(url=url,headers=headers).content #持久化存储操作 with open(dic['name'],'wb') as fp: fp.write(data) print(dic['name'],'下载成功!')#使用线程池对视频数据进行请求(较为耗时的阻塞操作)pool = Pool(4)pool.map(get_video_data,urls) # 将urls传入到get_video_data函数pool.close()pool.join()

- 3.单线程+异步协程(推荐):
event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,
当满足某些条件的时候,函数就会被循环执行。

coroutine:协程对象,我们可以将协程对象注册到事件循环中,它会被事件循环调用。
我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回
一个协程对象。

task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。

future:代表将来执行或还没有执行的任务,实际上和 task 没有本质区别

async 定义一个协程.

await 用来挂起阻塞方法的执行。
单任务协程相关操作:
import asyncioasync def request(url):    print('正在请求的url是',url)    print('请求成功,',url)    return url#async修饰的函数,调用之后返回的一个协程对象cc = request('www.baidu.com')# 一 event_loop使用:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,# 当满足某些条件的时候,函数就会被循环执行。# #创建一个事件循环对象# loop = asyncio.get_event_loop()## #将协程对象注册到loop中,然后启动loop# loop.run_until_complete(c)# 二 task的使用# 创建一个事件循环对象# loop = asyncio.get_event_loop()# #基于loop创建了一个task对象# task = loop.create_task(c)# print(task)# 此处的任务对象还没被执行## loop.run_until_complete(task) # 开启任务## print(task) # 此处的任务对象已经被执行# 三 future的使用# 创建一个事件循环对象# loop = asyncio.get_event_loop()# task = asyncio.ensure_future(c)# print(task) # 此处的任务对象还没被执行# loop.run_until_complete(task)# print(task)# 此处的任务对象已经被执行def callback_func(task): # 回调函数    #result返回的就是任务对象中封装的协程对象对应函数的返回值    print(task.result()) # 接收result对象的返回值url#绑定回调loop = asyncio.get_event_loop()task = asyncio.ensure_future(c)#将回调函数绑定到任务对象中 当任务函数执行后会执行回调函数task.add_done_callback(callback_func)loop.run_until_complete(task) # 任务函数
# 多任务异步协程

import
asyncioimport timeasync def request(url): print('正在下载',url) #在异步协程中如果出现了同步模块相关的代码,那么就无法实现异步,变成了同步。 # time.sleep(2) #基于异步模块 当在asyncio中遇到阻塞操作必须进行手动挂起 await asyncio.sleep(2) print('下载完毕',url)start = time.time()# 多个协程对象urls = [ 'www.baidu.com', 'www.sogou.com', 'www.goubanjia.com']#任务列表:存放多个任务对象stasks = []for url in urls: c = request(url) # 协程对象 task = asyncio.ensure_future(c) # 任务对象 stasks.append(task) # 任务列表存放多个任务对象# 注册到多任务循环对象loop = asyncio.get_event_loop()#需要将任务列表封装到wait中 固定的语法格式loop.run_until_complete(asyncio.wait(stasks))print(time.time()-start) # 多任务执行耗时
# aiohttp模块 基于网络异步请求模块 requests是基于同步的

import
requestsimport asyncioimport timestart = time.time()urls = [ 'http://127.0.0.1:5000/bobo','http://127.0.0.1:5000/jay','http://127.0.0.1:5000/tom']async def get_page(url): print('正在下载',url) #requests.get是基于同步,必须使用基于异步的网络请求模块进行指定url的请求发送
   response = requests.get(url=url) # 这段代码是基于同步模块的代码 所以此程序没有实现异步操作

#aiohttp:基于异步网络请求的模块 完成多任务异步操作
            print('下载完毕:',response.text)tasks = []for url in urls:    c = get_page(url)    task = asyncio.ensure_future(c)    tasks.append(task)loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))end = time.time()print('总耗时:',end-start) # 总耗时6秒
#环境安装:pip install aiohttp#使用该模块中的ClientSessionimport requestsimport asyncioimport timeimport aiohttpstart = time.time()# urls = [#     'http://127.0.0.1:5000/bobo','http://127.0.0.1:5000/jay','http://127.0.0.1:5000/tom',#     'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',#     'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',#     'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',## ]from multiprocessing.dummy import Poolpool = Pool(2)urls = []for i in range(10):    urls.append('http://127.0.0.1:5000/bobo')print(urls)async def get_page(url):    async with aiohttp.ClientSession() as session: # 返回session对象        #get()、post(): 可以使用post get请求        #添加参数  :ua伪装和请求参数:headers,params/data,proxy='http://ip:port'        async with await session.get(url) as response: # 使用session对象发送请求 返回一个响应对象response            #text()返回字符串形式的响应数据            #read()返回的二进制形式的响应数据            #json()返回的就是json对象            #注意:获取响应数据操作之前一定要使用await进行手动挂起            page_text = await response.text()            print(page_text)tasks = []for url in urls:    c = get_page(url)    task = asyncio.ensure_future(c)    tasks.append(task)loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))end = time.time()print('总耗时:',end-start) # 总耗时2秒




免责声明:本网信息来自于互联网,目的在于传递更多信息,并不代表本网赞同其观点。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,并请自行核实相关内容。本站不承担此类作品侵权行为的直接责任及连带责任。如若本网有任何内容侵犯您的权益,请及时联系我们,本站将会在24小时内处理完毕。
相关文章
返回顶部