目录
- 一、单线程爬虫
- 代码实现
- 二、 多线程爬虫
- 1、多线程的方法使用
- 2、队列模块的使用
- 3、多线程实现思路剖析
- 4、代码实现
- **注意点:**
- 三、多进程爬虫
- 1、多进程程的方法使用
- 2、多进程中队列的使用
- 3 代码实现
- **小结**
- 四、线程池实现爬虫
- 1、线程池使用方法介绍
- 2、使用线程池实现爬虫的具体实现
- **小结:**
- 五、协程池实现爬虫
- 1、协程池模块使用介绍
- 2、使用协程池实现爬虫的具体实现过程
- 总结
注意:以下代码去掉了关键信息和url只展示怎么操作,具体实现可采用这个框架自己修改
一、单线程爬虫
思路分析:
- 确定url地址
- 确定数据的位置
代码实现
import requests
from lxml import etreeclass Spider():def __init__(self):url = "http://www.baidu.com/page/{}" # 注意这里是假url,实际跑不通# 构建构建每页url地址self.url_list = [ url.format(i) for i in range(1,14)]self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36"}def get_html(self,html_url):# 向标题页发送请求,获取响应内容html_url_resp = requests.get(html_url, headers=self.headers)return html_url_resp.contentdef get_items(self,html_url_resp):# 获取每页的标题和标题url# 获取可以xpath的对象html_url_element = etree.HTML(html_url_resp)# 进行xpath提取标题和urlhtml_url_a = html_url_element.xpath('//a[@class="recmd-content"]')# print(len(html_url_a))for a in html_url_a:item = {}# print(type(a))# 通过xpath得到的是列表,[0]是为了将列表中的内容取出来if a.xpath('./text()') != []: #为空说明是广告item['title'] = a.xpath('./text()')[0]item['title_url'] = a.xpath('./@href')[0]self.save(item)def save(self,item):print(item)def run(self):for html_url in self.url_list:# 获取该页的标题详情html_url_resp = self.get_html(html_url)# 获取每页标题和标题urlself.get_items(html_url_resp)if __name__ == '__main__':start_time = datetime.datetime.now()spider = Sprider()spider.run()end_time = datetime.datetime.now()print('单线程消耗时间{}'.format(end_time-start_time))# 单线程消耗时间0:00:06.377732
二、 多线程爬虫
在前面爬虫基础知识案例中我们发现请求回来的总数据不是太多,时间性对来说还是比较快的,那么如果该网站有大量数据等待爬虫爬取,我们是不是需要使用多线程并发来操作爬虫的网络请求呢?
1、多线程的方法使用
在python3中,主线程主进程结束,子线程,子进程不会结束
为了能够让主线程回收子线程,可以把子线程设置为守护线程,即该线程不重要,主线程结束,子线程结束(爬虫不能搞守护线程我觉得,不然还没执行的子线程都不爬了)
t1 = threading.Thread(targe=func,args=(,))
t1.setDaemon(True) # 设置为守护线程
t1.start() #此时线程才会启动
2、队列模块的使用
from queue import Queue
q = Queue(maxsize=100) # maxsize为队列长度
item = {}
q.put_nowait(item) #不等待直接放,队列满的时候会报错
q.put(item) #放入数据,队列满的时候会阻塞等待
q.get_nowait() #不等待直接取,队列空的时候会报错
q.get() #取出数据,队列为空的时候会阻塞等待
q.qsize() #获取队列中现存数据的个数
q.join() # 队列中维持了一个计数(初始为0),计数不为0时候让主线程阻塞等待,队列计数为0的时候才会继续往后执行# q.join()实际作用就是阻塞主线程,与task_done()配合使用# put()操作会让计数+1,task_done()会让计数-1# 计数为0,才停止阻塞,让主线程继续执行
q.task_done() # put的时候计数+1,get不会-1,get需要和task_done 一起使用才会-1
3、多线程实现思路剖析
把爬虫中的每个步骤封装成函数,分别用线程去执行不同的函数通过队列相互通信,函数间解耦
4、代码实现
import requests
from lxml import etree
import datetime
from queue import Queue
from threading import Threadclass Spider():def __init__(self):self.base_url = "http://www.baidu.com/8hr/page/{}" # 注意这是假url,实际跑不通得换你自己的# 构建构建每页url地址self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36"}# 创建队列self.q_url = Queue(10)self.q_html = Queue(10)self.q_item = Queue(10)# 生产urldef get_url(self):for i in range(1, 14):url = self.base_url.format(i)self.q_url.put(url)# 获取对应url的response内容def get_html(self):while True:# 向标题页发送请求,获取响应内容url = self.q_url.get()html_url_resp = requests.get(url, headers=self.headers)self.q_html.put(html_url_resp.content)self.q_url.task_done() # 计数 -1# 获取每页的标题和urldef get_items(self):while True:# 获取每页的响应内容html_url_resp = self.q_html.get()# 获取可以xpath的对象html_url_element = etree.HTML(html_url_resp)# 进行xpath提取标题和urlhtml_url_a = html_url_element.xpath('//a[@class="recmd-content"]')# print(len(html_url_a))titles = []for a in html_url_a:# print(type(a))# 通过xpath得到的是列表,[0]是为了将列表中的内容取出来if a.xpath('./text()') != []: #为空说明是广告item = {}item['title'] = a.xpath('./text()')[0]item['title_url'] = a.xpath('./@href')[0]titles.append(item)self.q_item.put(titles)self.q_html.task_done()def save(self):while True:items = self.q_item.get()for item in items:print(item)self.q_item.task_done()def run(self):thread_list =[]t_url = Thread(target=self.get_url)thread_list.append(t_url)for i in range(3):t_html = Thread(target=self.get_html)thread_list.append(t_html)for i in range(3):t_items = Thread(target=self.get_items)thread_list.append(t_items)t_save = Thread(target=self.save)thread_list.append(t_save)for t in thread_list:# t.setDaemon(True) # 设置为守护线程t.start() #此时线程才会启动for q in [self.q_url, self.q_html, self.q_item]:q.join() # 主线程阻塞,直到每个q队列计数为0print('程序结束了')if __name__ == '__main__':start_time = datetime.datetime.now()spider = Spider()spider.run()end_time = datetime.datetime.now()print('多线程消耗时间{}'.format(end_time-start_time))# 单线程消耗时间0:00:06.377732# 多线程消耗时间0:00:02.270735
注意点:
- put会让队列的计数+1,但是单纯的使用get不会让其-1,需要和task_done同时使用才能够-1。
- task_done不能放在另一个队列的put之前,否则可能会出现数据没有处理完成,程序结束的情况。
三、多进程爬虫
使用和多线程差不多,不同库而已
在一个进程中无论开多少个线程都只能运行在一个CPU的核心之上,这是python的特点,不能说是缺点!
如果我们想利用计算机的多核心优势,就可以用多进程的方式实现,思路和多线程相似,只是对应的api不相同。
1、多进程程的方法使用
from multiprocessing import Process #导入模块
t1 = Process(targe=func,args=(,)) #使用一个进程来执行一个函数
t1.daemon = True #设置为守护进程
t1.start() #此时线程才会启动
2、多进程中队列的使用
多进程中使用普通的队列模块会发生阻塞,对应的需要使multiprocessing提供的JoinableQueue模块,其使用过程和在线程中使用的queue方法相同
3 代码实现
import requests
from lxml import etree
import datetime
from multiprocessing import Process
from multiprocessing import JoinableQueue as Queueclass Sprider():def __init__(self):self.base_url = "http://www.baidu.com/8hr/page/{}" # 注意这是假url,实际跑不通,得换你自己的# 构建构建每页url地址self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36"}# 创建队列self.q_url = Queue(10)self.q_html = Queue(10)self.q_item = Queue(10)# 生产urldef get_url(self):for i in range(1, 14):url = self.base_url.format(i)self.q_url.put(url)# 获取对应url的response内容def get_html(self):while True:# 向标题页发送请求,获取响应内容url = self.q_url.get()html_url_resp = requests.get(url, headers=self.headers)self.q_html.put(html_url_resp.content)self.q_url.task_done()# 获取每页的标题和urldef get_items(self):while True:# 获取每页的响应内容html_url_resp = self.q_html.get()# 获取可以xpath的对象html_url_element = etree.HTML(html_url_resp)# 进行xpath提取标题和urlhtml_url_a = html_url_element.xpath('//a[@class="recmd-content"]')# print(len(html_url_a))titles = []for a in html_url_a:# print(type(a))# 通过xpath得到的是列表,[0]是为了将列表中的内容取出来if a.xpath('./text()') != []: #为空说明是广告item = {}item['title'] = a.xpath('./text()')[0]item['title_url'] = a.xpath('./@href')[0]titles.append(item)self.q_item.put(titles)self.q_html.task_done()def save(self):while True:items = self.q_item.get()for item in items:print(item)self.q_item.task_done()def run(self):process_list =[]p_url = Process(target=self.get_url)process_list.append(p_url)for i in range(2):p_html = Process(target=self.get_html)process_list.append(p_html)for i in range(2):p_items = Process(target=self.get_items)process_list.append(p_items)p_save = Process(target=self.save)process_list.append(p_save)for t in process_list:# t.setDaemon(True) # 设置为守护线程t.start() #此时线程才会启动for q in [self.q_url, self.q_html, self.q_item]:q.join() # 主线程阻塞,直到每个q队列计数为0print('程序结束了')if __name__ == '__main__':start_time = datetime.datetime.now()spider = Sprider()spider.run()end_time = datetime.datetime.now()print('多进程消耗时间{}'.format(end_time-start_time))# 单线程消耗时间0:00:06.377732# 多线程消耗时间0:00:02.270735# 多进程消耗时间0: 00:03.296609
上述多进程实现的代码中,multiprocessing提供的JoinableQueue可以创建可连接的共享进程队列。和普通的Queue对象一样,队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 对应的该队列能够和普通队列一样能够调用task_done和join方法。
小结
- multiprocessing导包:from multiprocessing import Process
- 创建进程: Process(target=self.get_url_list)
- 添加入队列: put
- 从队列获取:get
- 守护线程:t.daemon=True
- 主线程阻塞: q.join()
- 跨进程通讯可以使用from multiprocessing import JoinableQueue as Queue
四、线程池实现爬虫
1、线程池使用方法介绍
- 实例化线程池对象
from multiprocessing.dummy import Poolpool = Pool(processes=3) # 默认大小是cpu的个数"""源码内容:if processes is None:processes = os.cpu_count() or 1 # 此处or的用法:默认选择or前边的值,如果or前边的值为False,就选择后边的值"""
- 把从发送请求,提取数据,到保存合并成一个函数,交给线程池异步执行
使用方法pool.apply_async(func)
def exetute_requests_item_save(self):url = self.queue.get()html_str = self.parse_url(url)content_list = self.get_content_list(html_str)self.save_content_list(content_list)self.total_response_num +=1pool.apply_async(self.exetute_requests_item_save)
- 添加回调函数
通过apply_async的方法能够让函数异步执行,但是只能够执行一次,为了让其能够被反复执行,通过添加回调函数的方式能够让_callback 递归的调用自己,同时需要指定递归退出的条件。
def _callback(self,temp):if self.is_running:pool.apply_async(self.exetute_requests_item_save,callback=self._callback)pool.apply_async(self.exetute_requests_item_save,callback=self._callback)
- 确定程序结束的条件 程序在获取的响应和url数量相同的时候可以结束
while True: #防止主线程结束time.sleep(0.0001) #避免cpu空转,浪费资源if self.total_response_num>=self.total_requests_num:self.is_running= Falsebreakself.pool.close() #关闭线程池,防止新的线程开启
# self.pool.join() #等待所有的子线程结束
2、使用线程池实现爬虫的具体实现
import requests
from lxml import etree
import datetime
from queue import Queue
from multiprocessing.dummy import Pool
import timeclass Spider():def __init__(self):self.url = "http://www.baidu.com/8hr/page/{}" # 注意这是假url,实际跑不通,得换你自己的self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36'}# 构建构建每页url地址# self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36"}# 创建队列self.q_url = Queue()# 创建进程池对象self.pool = Pool(5)# 请求url总数据self.total_request_num = 0# 获取响应总数self.total_response_num = 0# 停止回调标志self.is_running = True# 获取urldef get_url(self):for i in range(1, 14):url = self.url.format(i)# 将生成的url放入到队列中self.q_url.put(url)self.total_request_num += 1# 向标题页发送请求,获取响应内容def get_html(self,url):resp = requests.get(url, headers=self.headers)return resp.content# 获取每页的标题和标题urldef get_items(self,html_resp):# 获取可以xpath的对象html_url_element = etree.HTML(html_resp)# 进行xpath提取标题和urlhtml_url_a = html_url_element.xpath('//a[@class="recmd-content"]')# print(len(html_url_a))titles = []for a in html_url_a:item = {}# print(type(a))# 通过xpath得到的是列表,[0]是为了将列表中的内容取出来if a.xpath('./text()') != []: #为空说明是广告item['title'] = a.xpath('./text()')[0]item['title_url'] = a.xpath('./@href')[0]titles.append(item)return titles# 保存def save(self,titles):for t in titles:print(t)# 完整的执行流程def execute_request_items_save(self):url = self.q_url.get()html_resp = self.get_html(url)titles = self.get_items(html_resp)self.save(titles)self.total_response_num += 1return '完整执行'# 回调函数def _callback(self,xxx):# callback函数必须接收一个参数!# xxx参数是self.execute_request_item_save的返回值!!!# 哪怕用不上 也必须接收!print(xxx)if self.is_running:self.pool.apply_async(self.execute_request_items_save, callback=self._callback)def run(self):self.get_url()for i in range(5):self.pool.apply_async(self.execute_request_items_save, callback=self._callback)# 退出机制while True:#避免cpu空转,浪费资源time.sleep(0.0001)if self.total_response_num == self.total_request_num and self.total_request_num == 13:self.is_running = Falsebreakself.pool.close()print('程序执行结束')if __name__ == '__main__':start_time = datetime.datetime.now()spider = Spider()spider.run()end_time = datetime.datetime.now()print('多线程池消耗时间{}'.format(end_time-start_time))# 单线程消耗时间0:00:06.377732# 多进程消耗时间0: 00:03.296609# 多线程池消耗时间0:00:01.761484
小结:
- 线程池导包: from multiprocessing.dummy import Pool
- 线程池的创建:pool = Pool(process=3)
- 线程池异步方法:pool.apply_async(func)
五、协程池实现爬虫
1、协程池模块使用介绍
协程池模块
import gevent.monkeygevent.monkey.patch_all()from gevent.pool import Pool
2、使用协程池实现爬虫的具体实现过程
import gevent.monkey
# monkey补丁要打在发送请求之前
gevent.monkey.patch_all()
import requests
from lxml import etree
import datetime
from queue import Queue
from gevent.pool import Pool
import timeclass Sprider():def __init__(self):self.url = "http://www.baidu.com/8hr/page/{}" # 注意这是假url,实际跑不通,得换你自己的self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36'}# 构建构建每页url地址# self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36"}# 创建队列self.q_url = Queue()# 创建进程池对象self.pool = Pool(5)# 请求url总数据self.total_request_num = 0# 获取响应总数self.total_response_num = 0# 停止回调标志self.is_running = True# 获取urldef get_url(self):for i in range(1, 14):url = self.url.format(i)# 将生成的url放入到队列中self.q_url.put(url)self.total_request_num += 1# 向标题页发送请求,获取响应内容def get_html(self,url):resp = requests.get(url, headers=self.headers)return resp.content# 获取每页的标题和标题urldef get_items(self,html_resp):# 获取可以xpath的对象html_url_element = etree.HTML(html_resp)# 进行xpath提取标题和urlhtml_url_a = html_url_element.xpath('//a[@class="recmd-content"]')# print(len(html_url_a))titles = []for a in html_url_a:item = {}# print(type(a))# 通过xpath得到的是列表,[0]是为了将列表中的内容取出来if a.xpath('./text()') != []: #为空说明是广告item['title'] = a.xpath('./text()')[0]item['title_url'] = a.xpath('./@href')[0]titles.append(item)return titles# 保存def save(self,titles):for t in titles:print(t)# 完整的执行流程def execute_request_items_save(self):url = self.q_url.get()html_resp = self.get_html(url)titles = self.get_items(html_resp)self.save(titles)self.total_response_num += 1return '完整执行'# 回调函数def _callback(self,xxx):# callback函数必须接收一个参数!# xxx参数是self.execute_request_item_save的返回值!!!# 哪怕用不上 也必须接收!print(xxx)if self.is_running:self.pool.apply_async(self.execute_request_items_save, callback=self._callback)def run(self):self.get_url()for i in range(5):self.pool.apply_async(self.execute_request_items_save, callback=self._callback)# 退出机制while True:#避免cpu空转,浪费资源time.sleep(0.0001)if self.total_response_num == self.total_request_num and self.total_request_num == 13:self.is_running = Falsebreak# self.pool.close() # 协程没有过close函数print('程序执行结束')if __name__ == '__main__':start_time = datetime.datetime.now()spider = Sprider()spider.run()end_time = datetime.datetime.now()print('多协程池消耗时间{}'.format(end_time-start_time))# 单线程消耗时间0:00:06.377732# 多进程消耗时间0: 00:03.296609# 多线程池消耗时间0:00:01.761484# 多协程池消耗时间0:00:01.689546
总结
对单线程、多进程、多线程、线程池、进程池、协程池的使用进行了总结,但是感觉还是有些缺陷。参考我其他文章把