爬虫基础篇之多线程实现

篇幅有限

完整内容及源码关注公众号:ReverseCode,发送

执行流程

python并不像java这类高级语言需要将文件编译为机器码交给虚拟机执行,而是由python虚拟机一条条地将py语句解释运行,故而称之为解释型语言。

python先将py文件编译成字节码,交给字节码虚拟机后,虚拟机从编译得到的PyCodeObject对象在当前的上下文环境逐条执行字节码指令,完成整个程序的执行流程。

字节码在python虚拟机程序里对应的是PyCodeObject对象, .pyc文件是字节码在磁盘上的表现形式。

例如:在python test.py过程中会将test.py进行编译成字节码并解释执行,当test.py中加载了其他模块,如import urllib2,python会将urllib2.py进行编译成字节码,生成urllib2.pyc,并重新对字节码解释执行。加载模块时,当.py和.pyc同时存在时,优先运行.pyc文件,若.pyc文件比.py编译时间早,执行流程也会优先重新编译.py文件并更新.pyc文件。

.pyc文件通过内置模块py_compile来编译生成test.pyc,或者通过python -m test.py生成test.pyc

GIL

由于物理上的科技发展,各CPU厂商在核心频率上已经被多核CPU所取代,为了更有效的利用多核处理器的性能,就出现了多线程的编程技巧,又因为Python GIL的存在让Python虚拟机在进行运算时无法有效利用多核心,几乎只能单线程处理任务,所以将任务并行化,分散到多个线程或多个进程的实现和GIL本身的存在是天生冲突的矛盾。

首先,GIL不是python的特性,是实现python解析器(CPython)时的语法标准,并不是python的特性,类似的还有JPython等就没有GIL,但是CPython是大部分环境下默认的Python执行环境,自然而然CPython==python,自然而然GIL成为的python天生的缺陷。

当多核CPU出现在市场的时候,python为了支持多线程,首要解决的就是线程之间的数据完整性和数据同步性,GIL应运而生,简单粗暴的为线程加上了一把大锁,后起之库们默认都接受了这种实现方案,省去考虑额外线程间的内存锁和同步操作,导致大量的三方库都极度依赖GIL来实现线程安全。GIL这把全局排他锁,是多线程处理的致命伤,效率极底,因为python的线程就是C的一个pthread,通过操作系统的调度算法调度,为在调度过程中各线程平均利用CPU的时间,python会计算已执行的代码量,达到阈值强制释放GIL锁,触发一次操作系统的调度。

任何一个线程在唤起时可以成功获取到GIL,但是多核CPU上,release和acquire几乎没有间隔,导致其他核上的线程被唤醒时,主线程已经再次拿到GIL,导致被唤醒的其他线程只能白白浪费CPU时间,眼睁睁看着另一个线程拿个GIL快乐的执行,当达到切换时间后进入待调度状态,再次唤醒再次等待,恶性循环。

多线程方案

multiprocessing

一定程度弥补了thread库由于GIL锁导致低效的缺陷,完整的实现了一套thread接口,但是只是使用了多进程而本身不是多进程,原理上每个进程有自己独立的GIL,不会出现进程间的GIL争抢,但是增加了线程间数据通讯和同步的麻烦,由于不同进程间无法看到对方的数据状态,只能在主线程声明一个队列,通过队列的实现解决数据本身的问题,无疑徒增的代码的开发与维护成本。

concurrent

提供了多线程ThreadPoolExecutor和多进程ProcessPoolExecutor两种并发模型。

  • 计算密集型任务
    永远最多只能榨干单核CPU,如果需要提升效率,必须通过ProcessPoolExecutor fork出多个子进程来分担计算任务。
  • IO密集型任务
    CPU使用效率极低,虽然使用多线程加倍CPU使用率,但是还远远到不了饱和(100%)的地步,在单核心可以应付整体计算的前提下,自然是应该选择资源占用少的模式,也就是多线程模式。主线程是通过队列将任务传递给多个子线程的。一旦主线程将任务塞进任务队列,子线程们就会开始争抢,最终只有一个线程能抢到这个任务,并立即进行执行,执行完后将结果放进Future对象就完成了这个任务的完整执行过程。

concurrent的线程池有个重大的缺陷,那就是任务队列是无界的。如果队列的生产者任务生产的太快,而线程池消费太慢处理不过来,任务就会堆积。如果堆积一直持续下去,内存就会持续增长直到OOM,任务队列里堆积的所有任务全部彻底丢失。

concurrent原理

  1. 主线程将任务塞进TaskQueue(普通内存队列),拿到Future对象
  2. 唯一的管理线程从TaskQueue获取任务,塞进CallQueue(分布式跨进程队列)
  3. 子进程从CallQueue中争抢任务进行处理
  4. 子进程将处理结果塞进ResultQueue(分布式跨进程队列)
  5. 管理线程从ResultQueue中获取结果,塞进Future对象
  6. 主线程从Future对象中拿到结果

进程池模型中的跨进程队列是用multiprocessing.Queue实现的。它使用无名套接字sockerpair来完成的跨进程通信,socketpair和socket的区别就在于socketpair不需要端口,不需要走网络协议栈,通过内核的套接字读写缓冲区直接进行跨进程通信。multiprocessing.Queue是支持双工通信,数据流向是父子双向,只不过在concurrent的进程池实现中只用到了单工通信。CallQueue是从父到子,ResultQueue是从子到父。

当父进程要传递任务给子进程时,先使用pickle将任务对象进行序列化成字节数组,然后将字节数组通过socketpair的写描述符写入内核的buffer中。子进程接下来就可以从buffer中读取到字节数组,然后再使用pickle对字节数组进行反序列化来得到任务对象,这样总算可以执行任务了。同样子进程将结果传递给父进程走的也是一样的流程,只不过这里的socketpair是ResultQueue内部创建的无名套接字。

实战

doutula

上文讲到通过分析页面元素分页抓取doutula表情包并保存图片的的案例,由于网络请求结果返回往往比IO存储图片要快,接下来通过多线程的方式实现在IO相对较慢的前提下使用多线程处理存储图片。核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def main(page):
"""函数入口"""
for page in range(1, page + 1):
print('==============正在爬去第{}页数据==================='.format(page))
# 定义最多 max_workers 个线程的线程池来异步执行调用。如果 max_workers 为 None 或没有指定,将默认为机器处理器的个数
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
res = send_request('https://www.doutula.com/photo/list/?page={}'.format(str(page)))
# 封装解析函数
src_url = parse_data(res.text)
for file, url in src_url:
image_response = send_request(url)
# 提交线程请求,save_data是封装保存函数,file和image_response.content是save_data的入参
thread_pool.submit(save_data, file, image_response.content)
# 关闭线程池
thread_pool.shutdown()

多线程doutula

guazi

同时执行多个线程的确可以提高程序效率,但并非线程越多越好,相对计算机而言,线程越多越吃资源,成百上千个线程可能直接瘫痪。故而多线程在运行时,设置最大线程锁,设置最大线程同时允许处理任务,多线程threading使用Semaphore(无上限)或BoundedSemaphore(初始设置最大值), 如果release调用检查计数器的值是否超过了计数器最大值则出ValueError 实现并发限制。

例如:通过semaphore信号量可以利用内置计数器来控制同时运行线程的数量,启动线程(消耗信号量)内置计数器会自动减一,线程结束(释放信号量)内置计数器会自动加一;内置计数器为零,启动线程会阻塞,直到有本线程结束或者其他线程结束为止;

以下我们对瓜子二手车数据进行多线程抓取,并保存到csv中。

分析

当我们访问第一页时,https://www.guazi.com/su/buy/o1

当我们访问第二页时,https://www.guazi.com/su/buy/o2

以此类推,不难得出分页数据以start_url = 'https://www.guazi.com/su/buy/o'+str(i)形式拼接,i为页码数,根据页面控制台的Elements分析所有的数据都存在于含有类carlist clearfix js-top的ul节点下的所有li节点下,我们可以通过解析得到html代码转化为bs4库的BeautifulSoup对象,利用BeautifulSoup的api获取这些li节点的元素属性,解析拿到类型,年份,里程,售价,具体BeautifulSoup语法出门右转见中文文档

guazi页面元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def get_data(html):
# 将之前抓取的html页面解析生成BeautifulSoup对象
soup = BeautifulSoup(html, 'html.parser')
# 调用api获取含有类carlist clearfix js-top的ul节点下的所有li节点
infos = soup.find('ul', {'class': 'carlist clearfix js-top'}).find_all('li')
# 打开文件开始写入
with open('./guazi.csv', 'a', encoding='utf-8') as f:
pic_urls = []
for info in infos:
leixing = info.find('h2').get_text()
nianfen1 = info.find('div', {'class': 't-i'}).get_text()
# 通过正则分割年份字符串
nianfen2 = re.sub(r'|', '', nianfen1).split('|')
nianfen = nianfen2[0]
licheng = nianfen2[1]
didian = '苏州'
shoujia = info.find('div', {'class': 't-price'}).find('p').get_text()
try:
yuanjia = info.find('div', {'class': 't-price'}).find('em').get_text()
except AttributeError:
yuanjia = ''
tupian = info.find('a').find('img')['src']
pic_urls.append(tupian)
f.write("{},{},{},{},{},{}\n".format(leixing, nianfen, licheng, didian, shoujia, yuanjia))
return pic_urls

get_data拿到所有的数据属性后,接下来通过含有信号量的多线程方式调用封装好的download_pics方法批量抓取图片。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 定义最多10个线程同时允许
thread_lock = threading.BoundedSemaphore(value=10)
def main():
n = 0
for i in range(1,51):
start_url = 'https://www.guazi.com/su/buy/o'+str(i)
html = getHtmlText(start_url)
pic_urls = get_data(html)
for url in pic_urls:
n+=1
print('正在下载第{}张图片'.format(n))
# 上锁 避免下载同一张
thread_lock.acquire()
t = threading.Thread(target=download_pics,args=(url,n))

guazi多线程

完整源码请关注微信公众号:ReverseCode,回复:爬虫基础

文章作者: J
文章链接: http://onejane.github.io/2021/03/28/爬虫基础篇之多线程实现/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 万物皆可逆向
支付宝打赏
微信打赏