共计 5515 个字符,预计需要花费 14 分钟才能阅读完成。
一、多任务原理
-
概念
现代操作系统比如 Mac OS X,UNIX,Linux,Windows 等,都是支持“多任务”的操作系统
-
什么叫多任务?
就是操作系统可以同时运行多个任务
-
单核 CPU 实现多任务原理
- 多核 CPU 实现多任务原理
-
实现多任务的方式
-
多进程模式
启动多个进程,每个进程虽然只有一个线程,但是多个进程可以一起执行多个任务
-
多线程模式
启动一个进程,在一个进程的内部启动多个线程,这样多个线程也可以一起执行多个任务
-
多进程 + 多线程
启动多个进程,每个进程再启动多个线程
-
协程
-
多进程 + 协程
-
二、进程
1、概念
-
什么是进程?
是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体
-
对于操作系统
一个任务就是一个进程。比方说打开浏览器就是启动一个浏览器的进程,在打开一个记事本就启动一个记事本进程,如果打开两个记事本就启动两个记事本进程
-
注意
进程是系统中程序执行和资源分配的基本单位。每个进程有自己的数据段、代码段和堆栈段
2、使用进程
-
单进程现象
import time def run1(): while 1: print("zutuanxue_com is a good man") time.sleep(1) def run2(): while 1: print("zutuanxue_com is a nice man") time.sleep(1) if __name__ == "__main__": run1() # 不会执行 run2() 函数,只有上面的 run1() 结束才能执行 run2() run2() -
启动进程实现多任务
-
multiprocessing 模块
跨平台的多进程模块,提供了一个 Process 类用来示例化一个进程对象
-
Process 类
作用:创建进程 (子进程)
参数 说明 target 指定进程执行的任务 args 给进程函数传递的参数,是一个元组 注意:此时进程被创建,但是不会启动进程执行
启动进程:Process 对象.start()
-
示例代码
import time from multiprocessing import Process def run1(name): while 1: print("%s is a good man"%name) time.sleep(1) def run2(): while 1: print("zutuanxue_com is a nice man") time.sleep(1) if __name__ == "__main__": # 程序启动时的进程称为主进程 (父进程) # 创建进程并启动 p = Process(target=run1, args=("kaige",)) p.start() # 主进程执行 run2() 函数 run2()
-
-
主进程负责调度
主进程主要做的是调度相关的工作,一般不负责具体业务逻辑
import time from multiprocessing import Process def run1(): for i in range(7): print("zutuanxue_com is a good man") time.sleep(1) def run2(name, word): for i in range(5): print("%s is a %s man"%(name, word)) time.sleep(1) if __name__ == "__main__": t1 = time.time() # 创建两个进程分别执行 run1、run2 p1 = Process(target=run1) p2 = Process(target=run2, args=("kaige", "cool")) # 启动两个进程 p1.start() p2.start() # 查看耗时 t2 = time.time() print("耗时:%.2f"%(t2-t1)) -
父子进程的先后顺序
主进程的结束不能影响子进程,所以可以等待子进程的结束再结束主进程,等待子进程结束,才能继续运行主进程
import time from multiprocessing import Process def run1(): for i in range(7): print("zutuanxue_com is a good man") time.sleep(1) def run2(name, word): for i in range(5): print("%s is a %s man"%(name, word)) time.sleep(1) if __name__ == "__main__": t1 = time.time() p1 = Process(target=run1) p2 = Process(target=run2, args=("kaige", "cool")) p1.start() p2.start() # 主进程的结束不能影响子进程,所以可以等待子进程的结束再结束主进程 # 等待子进程结束,才能继续运行主进程 p1.join() p2.join() t2 = time.time() print("耗时:%.2f"%(t2-t1))
3、全局变量在多个子进程中不能共享
import time | |
from multiprocessing import Process | |
money = 100 | |
def run1(): | |
# global money # 在进程中无法使用全局变量 | |
money = 200 | |
print("run1----------------", money, id(money)) | |
for i in range(5): | |
print("zutuanxue_com is a good man") | |
time.sleep(1) | |
def run2(name, word): | |
print("run2----------------", money, id(money)) | |
for i in range(5): | |
print("%s is a %s man"%(name, word)) | |
time.sleep(1) | |
if __name__ == "__main__": | |
t1 = time.time() | |
# 在创建进程时会将主进程的资源拷贝到子进程中,子进程单独有一份主进程中的数据,相互不影响 | |
p1 = Process(target=run1) | |
p2 = Process(target=run2, args=("kaige", "cool")) | |
p1.start() | |
p2.start() | |
p1.join() | |
p2.join() | |
print("main-------------", money) | |
t2 = time.time() | |
print("耗时:%.2f"%(t2-t1)) |
4、启动大量子进程
# Pool 类:进程池类 | |
from multiprocessing import Pool | |
import time | |
import random | |
def run(index): | |
print("子进程 %d 启动"%(index)) | |
t1 = time.time() | |
time.sleep(random.random()* 5+2) | |
t2 = time.time() | |
print("子进程 %d 结束,耗时:%.2f" % (index, t2-t1)) | |
if __name__ == "__main__": | |
print("启动主进程……") | |
# 创建 10 个子进程执行 run 功能 | |
# 创建进程池对象 | |
# 由于 pool 的默认值为 CPU 的核心数,假设有 4 核心,至少需要 5 个子进程才能看到效果 | |
# Pool() 中的值表示可以同时执行进程的数量 | |
pool = Pool() | |
for i in range(1, 7): | |
# 创建子进程,并将子进程放到进程池中统一管理 | |
pool.apply_async(run, args=(i,)) | |
# 等待子进程结束 | |
# 关闭进程池:在关闭后就不能再向进程池中添加进程了 | |
# 进程池对象在调用 join 之前必须先关闭进程池 | |
pool.close() | |
#pool 对象调用 join,主进程会等待进程池中的所有子进程结束才会继续执行主进程 | |
pool.join() | |
print("结束主进程……") |
5、单进程与多进程复制文件对比
-
单进程复制文件
import time def copy_file(path, toPath): with open(path, "rb") as fp1: with open(toPath, "wb") as fp2: while 1: info = fp1.read(1024) if not info: break else: fp2.write(info) fp2.flush() if __name__ == "__main__": t1 = time.time() for i in range(1, 5): path = r"/Users/zutuanxue_com/Desktop/file/%d.mp4"%i toPath = r"/Users/zutuanxue_com/Desktop/file2/%d.mp4"%i copy_file(path, toPath) t2 = time.time() print("单进程耗时:%.2f"%(t2-t1)) -
多进程复制文件
import time from multiprocessing import Pool def copy_file(path, toPath): with open(path, "rb") as fp1: with open(toPath, "wb") as fp2: while 1: info = fp1.read(1024) if not info: break else: fp2.write(info) fp2.flush() if __name__ == "__main__": t1 = time.time() pool = Pool(4) for i in range(1, 5): path = r"/Users/zutuanxue_com/Desktop/file/%d.mp4" % i toPath = r"/Users/zutuanxue_com/Desktop/file3/%d.mp4" % i pool.apply_async(copy_file, args=(path, toPath)) pool.close() pool.join() t2 = time.time() print("单进程耗时:%.2f"%(t2-t1
6、进程间通信
-
方式
- 有名管道
- 无名管道
- 队列
- 共享内存
- 信号
- 信号量
-
代码
from multiprocessing import Process from multiprocessing import Queue import time def product(q): print("启动生产子进程……") for data in ["good", "nice", "cool", "handsome"]: time.sleep(2) print("生产出:%s"%data) # 将生产的数据写入队列 q.put(data) print("结束生产子进程……") def customer(q): print("启动消费子进程……") while 1: print("等待生产者生产数据") # 获取生产者生产的数据,如果队列中没有数据会阻塞,等待队列中有数据再获取 value = q.get() print("消费者消费了 %s 数据"%(value)) print("结束消费子进程……") if __name__ == "__main__": q = Queue() p1 = Process(target=product, args=(q,)) p2 = Process(target=customer, args=(q,)) p1.start() p2.start() p1.join() # p2 子进程里面是死循环,无法等待它的结束 # p2.join() # 强制结束子进程 p2.terminate() print("主进程结束")
7、封装进程
zutuanxue_comProcess.py
from multiprocessing import Process | |
import time | |
class zutuanxue_comProcess(Process): | |
def __init__(self, name, word): | |
super().__init__() | |
self.name = name | |
self.word = word | |
# 必须叫 run,当对象调用 start() 方法时,默认调用 run 方法 | |
def run(self): | |
for i in range(5): | |
print("%s is a %s man" % (self.name, self.word)) | |
time.sleep(1) |
kaige.Process.py
from multiprocessing import Process | |
import time | |
class KaigeProcess(Process): | |
def __init__(self, name): | |
super().__init__() | |
self.name = name | |
# 必须叫 run,当对象调用 start() 方法时,默认调用 run 方法 | |
def run(self): | |
for i in range(2): | |
print("%s is a hansome man" % (self.name)) | |
time.sleep(1) |
main.py
import time | |
from zutuanxue_comProcess import Zutuanxue_comProcess | |
from kaigeProcess import KaigeProcess | |
if __name__ == "__main__": | |
t1 = time.time() | |
p = Zutuanxue_comProcess("zutuanxue_com", "good") | |
p.start() | |
p.join() | |
p1 = KaigeProcess("kaige") | |
p1.start() | |
p1.join() | |
t2 = time.time() | |
print("耗时:%.2f"%(t2-t1)) |
