进程与线程

  • 程序 (Program):存放在磁盘上的静态指令和数据的集合,是“死”的、尚未运行的代码。
  • 进程 (Process):程序的一次执行实例。操作系统为其分配独立的内存和资源,是系统进行资源分配和调度的基本单位。
  • 线程 (Thread):进程内部的一条执行流,是 CPU 调度的最小单位。同一进程的多个线程共享其资源,并发执行不同任务
  • 进程与线程均可以处理一个任务

1 核心概念:并发与并行

在多任务处理中,并发和并行是两个易混淆但核心的概念,直接决定程序的执行效率和资源利用方式:

1.1 并发(Concurrency)指软件/逻辑层面的任务管理

1765940830753

  • 定义:单个 CPU 核心交替处理多个任务,通过快速切换营造 “同时执行” 的假象。

  • 本质:任务轮流占用 CPU 资源,同一时刻只有一个任务在执行。

  • 适用场景:I/O 密集型任务(如文件读写、网络请求),任务大部分时间在等待外部资源响应。

  • 示例:一个服务员同时接待多个顾客,轮流为每个顾客点单、上菜。

1.2 并行(Parallelism)指硬件层面的真实同时执行

1765940837986

  • 定义:多个 CPU 核心同时处理多个任务,每个任务独占一个核心,真正实现 “同时执行”。

  • 本质:任务并行占用不同 CPU 资源,同一时间多个任务同步推进。

  • 适用场景:CPU 密集型任务(如数据计算、图像处理),任务主要消耗 CPU 运算资源。

  • 示例:多个服务员同时接待不同顾客,各自独立完成服务流程。

1.3 关键区别对比

特性 并发 并行
CPU 核心需求 单个即可 多个核心
执行方式 任务交替执行 任务同时执行
资源消耗 低(主要消耗切换开销) 高(消耗多核心资源)
适用任务类型 I/O 密集型(等待时间长) CPU 密集型(运算时间长)
典型场景 多用户网络服务、文件批量处理 大数据计算、3D 渲染

2 多进程:独立资源的并行执行

2.1 进程核心概念

  • 程序与进程是静态与动态的关系

  • 进程是操作系统资源分配的基本单位,每个进程拥有独立的内存空间、文件描述符等资源。

  • 进程间相互隔离,一个进程崩溃不会影响其他进程(保护模式下)。

  • 进程创建和切换开销较大(需分配资源、保存上下文),但可充分利用多 CPU 核心。

2.2 进程创建

multiprocessing 是 Python 跨平台的多进程模块,Process 类是创建进程的核心。

核心参数与方法

1
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
参数 / 方法 说明
target 进程要执行的目标函数
args 目标函数的位置参数(元组形式)
kwargs 目标函数的关键字参数(字典形式)
name 进程名称(用于调试)
daemon 是否为守护进程(True 时主进程退出则子进程强制终止)
start() 启动进程(调用目标函数)
join([timeout]) 阻塞主进程,等待子进程结束(可选超时时间)
terminate() 强制终止子进程
is_alive() 检查进程是否正在运行
run() 默认调用传入 target 的对象,如果子类化了 Process,可以重写此方法来自定义行为
pid 获取进程 ID
group 无意义,用于与threading.Thread兼容,历史遗留问题,常设None
terminate()/kill() 强制结束进程
os.getpid()/getppid() 获取当前进程编号/当前进程的父进程编号

父进程:开启进程的进程,如main进程(即运行 if name == “main” 块的进程)

基础用法(文件读写并发示例)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import time
import multiprocessing


# 写文件任务
def write_file():
with open("test.txt", "a", encoding="utf-8") as f:
count = 0
while count < 5:
f.write(f"写入内容:{count}\n")
f.flush() # 强制刷新缓冲区
print(f"写进程:完成第 {count} 次写入")
count += 1
time.sleep(0.5) # 模拟 I/O 延迟


# 读文件任务
def read_file():
with open("test.txt", "r", encoding="utf-8") as f:
time.sleep(0.2) # 等待写进程先写入
while True:
line = f.readline()
if line:
print(f"读进程:读到内容 -> {line.strip()}")
else:
if multiprocessing.active_children(): # 检查写进程是否还在运行
time.sleep(0.1)
continue
break


if __name__ == "__main__":
# Windows 系统必须在 if __name__ == "__main__" 中创建进程
pw = multiprocessing.Process(target=write_file, name="写进程")
pr = multiprocessing.Process(target=read_file, name="读进程")

pw.start() # 启动进程
pr.start()

pw.join() # 等待写进程结束
pr.join() # 等待读进程结束
print("所有进程执行完毕")
  • Unix/Linux​:使用 ​fork()​​ 复制当前进程,子进程从 ​fork​ 之后的位置继续执行;模块顶层代码不会重复执行,因此即便在顶层创建进程通常也不会触发递归问题。
  • Windows:没有 fork,采用“重新导入模块 + spawn 新解释器”的方式;子进程会从头执行模块代码,若顶层存在创建进程语句,就会重复启动,导致崩溃或无限递归。

2.3 自定义进程类

通过继承 Process 类,可更灵活地封装进程逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import os
import multiprocessing

class Worker(multiprocessing.Process):
def __init__(self, task_id):
super().__init__()
self.task_id = task_id

def run(self):
# 重写 run 方法,进程启动时自动执行
print(f"进程 {self.task_id} 启动 | 进程 ID: {os.getpid()} | 父进程 ID: {os.getppid()}")
time.sleep(1)
print(f"进程 {self.task_id} 执行完毕")

if __name__ == "__main__":
processes = [Worker(i) for i in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()
print("运行结束...")

2.4 进程池:高效管理多进程

当需要创建大量子进程时,通常使用,Pool 进程池管理进程

1765948813650

为什么要使用进程池

  • 降低创建与销毁开销:进程的启动、退出、地址空间与资源初始化代价高;进程池通过预先创建固定数量的工作进程并复用它们,显著减少频繁启停带来的系统开销。对于大量短任务尤为关键,可避免“启动成本 > 计算成本”的倒挂现象

  • 复用工作进程:任务完成后不退出,继续从池的任务队列取活干,减少重复初始化与销毁的系统调用与内存分配。

  • 内置限流与排队:当池满时,新任务在队列中等待;有进程空闲再分配执行,避免“进程海”拖垮系统。

  • 高层 API 简化开发:通过 multiprocessing.Poolmap/apply/apply_asyncconcurrent.futures.ProcessPoolExecutorsubmit/map/shutdown,可便捷地提交任务、收集结果、管理生命周期,减少样板代码与错误。

  • 结果收集与回调:支持异步提交 + 回调批量 map,便于在主进程中统一处理返回值、错误与汇总统计

进程池核心方法

1
multiprocessing.Pool([processes[,initializer[,initargs[,maxtasksperchild[,context]]]]])
  • processes:并发工作进程数;为 None 时默认使用 os.cpu_count()。用于限制同时运行的进程数量,避免系统过载。

  • initializer:每个工作进程启动前调用的初始化可调用对象;为 None 表示不做初始化。适合做一次性资源准备(如日志配置、数据库连接池等)。

  • initargs:传递给 initializer 的参数元组。

  • maxtasksperchild:每个工作进程在退出并被替换前可完成的任务数;为 None 表示工作进程会一直存活到进程池关闭。用于定期回收资源、降低长期运行导致的资源泄漏风险。

  • context:用于指定进程启动上下文(如 spawn/fork/forkserver);一般使用默认即可,特殊场景(如与 CUDA 配合)可显式指定。

方法 说明
apply(func, args) 同步执行任务,阻塞主进程直到任务完成
apply_async(func, args) 异步执行任务,返回 AsyncResult 对象(通过 get() 获取结果)
map(func, iterable) 批量异步执行任务,接收可迭代对象,返回结果列表
close() 关闭进程池,不接收新任务((不会影响已经提交的任务))
terminate() 立即终止所有进程(不管任务是否完成)
join() 等待所有任务完成(必须在 close()terminate() 后调用)

基础用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import os
import time
import multiprocessing

# 打印10个数字,每次间隔0.5秒
def func():
    for i in range(10):
        print(os.getpid(), i)
        time.sleep(0.5)

if __name__ == "__main__":
    # 指定进程池大小
    process_num = 5
    pool = multiprocessing.Pool(process_num)
    for p in range(process_num):
        # 阻塞式
        # pool.apply(func)
        # 非阻塞式
        pool.apply_async(func)
    pool.close() # 告知进程池不再接收新任务
    pool.join() # 等待进程池结束
    print("end")

批量提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import os
import time
import multiprocessing

# 耗时计算任务(CPU 密集型)
def calculate(num):
print(f"进程 {os.getpid()} 处理任务:{num}")
time.sleep(0.5) # 模拟计算耗时
return num * num

if __name__ == "__main__":
start_time = time.time()
process_num = 4 # 进程池大小(建议等于 CPU 核心数)
pool = multiprocessing.Pool(process_num)

# 方式 1:同步调用(阻塞主进程,逐个处理)
# results = [pool.apply(calculate, (i,)) for i in range(10)]

# 方式 2:异步调用(非阻塞,批量提交任务)
results = pool.map(calculate, range(10)) # 自动分配任务给进程池

pool.close() # 关闭进程池,不再接收新任务
pool.join() # 等待所有任务完成

print(f"任务结果:{results}")
print(f"总耗时:{time.time() - start_time:.2f} 秒")

2.5 进程间通信(IPC(Inter-Process Communication))

进程间内存独立,无法直接共享变量,需通过专门的通信机制.

如以下案例:子进程向传入的列表中添加元素,最终发现主进程与子进程之间的列表结果不同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import os
import multiprocessing

# 向list1中添加10个元素
def func(list1):
    for i in range(10):
        list1.append(i)
        print(os.getpid(), list1)

if __name__ == "__main__":
    list1 = []
    p1 = multiprocessing.Process(target=func, args=(list1,))
    p2 = multiprocessing.Process(target=func, args=(list1,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(os.getpid(), list1)

1. 队列通信(multiprocessing.Queue

适用于生产者 - 消费者模型,安全且跨平台:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
import random
import multiprocessing

# 生产者:生成随机数放入队列
def producer(queue):
while True:
num = random.randint(1, 100)
queue.put(num)
print(f"生产者放入:{num}")
time.sleep(random.random()) # 随机延迟

# 消费者:从队列获取数据并处理
def consumer(queue):
while True:
num = queue.get() # 无数据时阻塞
print(f"消费者处理:{num} -> 平方:{num*num}")
time.sleep(0.3)

if __name__ == "__main__":
queue = multiprocessing.Queue(maxsize=5) # 队列最大容量 5

p1 = multiprocessing.Process(target=producer, args=(queue,), daemon=True)
p2 = multiprocessing.Process(target=consumer, args=(queue,), daemon=True)

p1.start()
p2.start()

time.sleep(5) # 运行 5 秒后退出
print("主进程结束")

2. 管道通信(multiprocessing.Pipe

适用于两个进程间双向通信,速度比队列快:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import multiprocessing

def send_data(pipe):
pipe.send("Hello from 进程 A")
print(f"进程 A 发送:Hello from 进程 A")
print(f"进程 A 接收:{pipe.recv()}")

def recv_data(pipe):
print(f"进程 B 接收:{pipe.recv()}")
pipe.send("Hello from 进程 B")
print(f"进程 B 发送:Hello from 进程 B")

if __name__ == "__main__":
# 创建管道,返回两个连接对象(双向通信)
conn1, conn2 = multiprocessing.Pipe()

p1 = multiprocessing.Process(target=send_data, args=(conn1,))
p2 = multiprocessing.Process(target=recv_data, args=(conn2,))

p1.start()
p2.start()
p1.join()
p2.join()

3. 共享内存(multiprocessing.Value/Array

适用于高频数据共享,需手动加锁保证线程安全:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import multiprocessing

def add_num(value, lock):
for _ in range(10):
with lock: # 加锁防止竞争条件
value.value += 1


if __name__ == "__main__":
# 创建共享整数(i 表示 int 类型)
shared_num = multiprocessing.Value("i", 0)
lock = multiprocessing.Lock() # 互斥锁

p1 = multiprocessing.Process(target=add_num, args=(shared_num, lock))
p2 = multiprocessing.Process(target=add_num, args=(shared_num, lock))

p1.start()
p2.start()
p1.join()
p2.join()

print(f"最终结果:{shared_num.value}") # 输出 20

3 多线程:共享资源的并发执行

3.1 线程的核心特性

  • 线程是操作系统任务调度的最小单位
  • 一个进程至少有一个线程,也可以运行多个线程
  • 多个线程之间可共享数据
  • 线程间相互依赖,一个线程崩溃可能导致整个进程终止(基本不会出现,线程本身也是隔离的)
  • 多线程是指在同一进程中同时执行多个任务

3.2 线程创建:threading.Thread

threading 是 Python 标准库的多线程模块,用法与 multiprocessing.Process 类似

1
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

基础用法(交替打印示例)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 多线程创建
# 交替打印示例
import threading
import time

# 线程执行目标函数,即任务
def print_num(start,end):
for i in range(start,end+1):
print(f"{threading.current_thread().name}号线程打印:{i}")
time.sleep(0.1)

if __name__ == '__main__':
# 创建线程对象
t1 = threading.Thread(target=print_num,args=(1,5),name="小美")
t2 = threading.Thread(target=print_num,args=(1,10),name="小帅")

t1.start()
t2.start()

t1.join()
t2.join()

print("主线程结束")

自定义线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading

class MyThread(threading.Thread):
def __init__(self, task_id):
super().__init__()
self.task_id = task_id

def run(self):
print(f"线程 {self.task_id} 启动 | 线程 ID: {self.ident}")
time.sleep(1)
print(f"线程 {self.task_id} 执行完毕")

if __name__ == "__main__":
threads = [MyThread(i) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()

3.3 线程池:concurrent.futures.ThreadPoolExecutor

ThreadPoolExecutor ,线程池管理类

1
concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix="", initializer=None, initargs=())

基础用法(批量网络请求示例)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 多线程池操作
# concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix="", initializer=None, initargs=()) 线程池管理类
# 线程需求:网络请求
import threading
import time
from concurrent.futures.thread import ThreadPoolExecutor

import requests


# 模拟网络请求
def fetch_url(url):
print(f"{threading.current_thread().name}正在请求的url为:{url}")
try:
start_time = time.time()
response = requests.get(url, timeout=3)
return f"{threading.current_thread().name}访问的{url}返回的状态码:{response.status_code}"
except Exception as e:
return f"{threading.current_thread().name}访问的{url}发生异常:{e}"
finally:
end_time = time.time()
print(f"{threading.current_thread().name}访问{url},用时{end_time - start_time}线程结束")


if __name__ == '__main__':
url_list = [
"https://www.baidu.com",
"https://www.google.com",
"https://www.github.com",
"https://www.python.org"
]

start_time = time.time()

with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(fetch_url, url_list)
print( results)
print( type(results))
# 由于返回的数据是generator生成器,这里选择转成list列表
results_list = list(results)

for result in results_list:
print( result)

end_time = time.time()
print(f"访问{url_list}地址,共耗时:{end_time - start_time}秒")

3.4 线程安全:互斥锁与条件变量

线程共享资源,当多个线程修改同一资源时,会出现数据竞争(数据不一致),需通过同步机制保证线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#  线程安全问题
"""
线程安全出现前提:
1.多线程参与
2.多线程共享数据
3.多线程修改数据

出现线程安全最重要的原因:原子性问题
原子性:不可拆分的一个整体性质。每个进程在执行任务时,某些步骤是不能分割执行的,应该将所有该动作执行的代码看作一个原子性操作,操作过程中,不能让CPU执行操作同一个数据的代码
所以:多线程操作共享数据时,应该保持原子性操作不被插入打断,否则会出现线程安全问题
"""
import threading
from time import sleep

# 定义线程任务,累加1,执行N次
def add_num(value) :
for i in range(10) :
# value['value'] += 1 # 原始叠加1的操作,一句话完成

#后续使用多个步骤完成叠加1的操作,每步之间做短暂停顿
current_value = value['value']
sleep(0.001)
new_value = current_value + 1
sleep(0.001)
value['value'] = new_value
sleep(0.001)
print(f"{threading.current_thread().name}线程执行+1,此次结果为:{value['value']}")


if __name__ == '__main__':

# 多线程共享数据
shared_num = {"value": 0}

t1 = threading.Thread(target=add_num, args=(shared_num,),name="小美")
t2 = threading.Thread(target=add_num, args=(shared_num,),name="小帅")

t1.start()
t2.start()

t1.join()
t2.join()

expected = 20 # 预期值
actual = shared_num["value"] # 实际值

print(f"预期结果:{expected},实际结果:{actual}")
if expected == actual:
print("没有出现数据安全问题")
else:
print("出现了数据安全问题")

出现线程安全最重要的原因:原子性问题

原子性:不可拆分的一个整体性质。每个进程在执行任务时,某些步骤是不能分割执行的,应该将所有该动作执行的代码看作一个原子性操作,操作过程中,不能让CPU执行操作同一个数据的代码

所以:多线程操作共享数据时,应该保持原子性操作不被插入打断,否则会出现线程安全问题

1. 互斥锁(threading.Lock

某个线程要更改共享数据时,先将其锁定,此时其他线程不能更改。直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性

使用互斥锁达到了线程安全:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 线程安全问题解决:使用锁
"""
线程:threading.Lock()
多个线程操作同一个数据,需要使用同一个锁对象
"""

import threading
from time import sleep

# 定义线程任务,累加1,执行N次
def add_num(value,lock) :
for i in range(1000) :
lock.acquire() # 上锁
try:
#后续使用多个步骤完成叠加1的操作,每步之间做短暂停顿
current_value = value['value']
sleep(0.001)
new_value = current_value + 1
sleep(0.001)
value['value'] = new_value
sleep(0.001)
print(f"{threading.current_thread().name}线程执行+1,此次结果为:{value['value']}")
finally:
lock.release() # 解锁
sleep(0.0001)

if __name__ == '__main__':

# 多线程共享数据
shared_num = {"value": 0}
# 创建互斥锁,被操作共享数据的多个线程使用
lock = threading.Lock()

t1 = threading.Thread(target=add_num, args=(shared_num,lock),name="小美")
t2 = threading.Thread(target=add_num, args=(shared_num,lock),name="小帅")

t1.start()
t2.start()

t1.join()
t2.join()

expected = 2000 # 预期值
actual = shared_num["value"] # 实际值

print(f"预期结果:{expected},实际结果:{actual}")
if expected == actual:
print("没有出现数据安全问题")
else:
print("出现了数据安全问题")

2. 条件变量(threading.Condition

用于线程间通信(如生产者 - 消费者模型的同步):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 线程安全问题解决:条件变量
import random
import threading
from time import sleep


# 生产者与消费者共同分享一个队列数据
# 队列是自定义的一个队列,其中要求最多有2个数据
# 生产者一直生产,队列中有两个数据时,不再生产,等待消费者消费
# 消费者一直消费,队列中没有数据时,不再消费,等待生产者生产

class Queue:
def __init__(self, max_size):
self.max_size = max_size
self.queue = []
self.cond = threading.Condition() # 条件变量
# p线程
def put(self, item):
with self.cond: # 上锁
while len(self.queue) >= self.max_size:
print("队列已满,不再生产,生产等待")
self.cond.wait() # 本线程等待,等待该锁作用上的其他线程执行
self.queue.append( item)
print(f"生产者: 生产了一个数据 -> {item} | 队列全部数据:{self.queue}")
self.cond.notify()

# g线程
def get(self):
with self.cond: # 上锁
while len(self.queue) <= 0:
print("队列已空,不再消费,消费等待")
self.cond.wait() # 本线程等待,等待该锁作用上的其他线程执行
item = self.queue.pop(0)
print(f"消费者: 消费了一个数据 -> {item} | 队列全部数据:{self.queue}")
self.cond.notify()
return item

# 生产者与消费者
def producer(queue):
for i in range(5):
queue.put(i)
sleep(random.random())
def consumer(queue):
for i in range(5):
queue.get()
sleep(random.random())

if __name__ == '__main__':
queue = Queue(max_size=2)
t_p = threading.Thread(target=producer, args=(queue,))
t_c = threading.Thread(target=consumer, args=(queue,))
t_p.start()
t_c.start()
t_p.join()
t_c.join()

print("主线程结束")

4 进程与线程使用对比

4.1 GIL:Python 线程的 “隐形限制”

GIL(Global Interpreter Lock,全局解释器锁)是 CPython 解释器的特性,同一时刻只允许一个线程保持 Python 解释器的控制权,这意味着在任何时间点都只能有一个线程处于执行状态,即使在多 CPU 核心上,多线程也无法实现真正的并行。

4.2 核心对比

特性 进程 线程
资源占用 高(独立内存、文件描述符) 低(共享进程资源)
创建 / 切换开销
通信复杂度 高(需 IPC 机制) 低(直接共享变量)
稳定性 高(进程隔离) 低(线程崩溃影响进程)
GIL 影响 无(多进程并行) 有(多线程无法并行 CPU 任务)

4.3 选择原则

  1. CPU/GPU 密集型任务(如计算、加密、图像处理):选多进程(充分利用多核心)。
  2. I/O 密集型任务(如文件读写、网络请求、数据库操作):选多线程(低开销,高并发)。
  3. 任务数量多且轻量:选线程池(减少线程创建开销)。
  4. 任务独立性强、需隔离:选多进程(避免一个任务崩溃影响整体)。

4.4 多进程安全问题与多线程安全问题的关系

  • 相似性
    资源共享冲突:都涉及多个执行单元访问共享资源
    竞态条件:都可能出现操作时序问题
    数据一致性:都需要保证数据的正确性
  • 主要区别
    • 内存模型
      多进程:拥有独立的地址空间,需特殊机制(如 multiprocessing.Value)共享数据
      多线程:共享同一进程的内存空间,直接访问全局变量
    • 同步机制
      多进程:使用进程间同步原语(如 multiprocessing.Lock)
      多线程:使用线程同步原语(如 threading.Lock)
    • 资源开销
      多进程:创建销毁开销大,隔离性好
      多线程:轻量级
    • 错误传播
      多进程:单个进程崩溃不影响其他进程
      多线程:线程错误可能导致整个进程崩溃(基本不会出现)
    • 安全保障
      虽然本质都是并发安全问题,但具体的实现细节和解决方案有所不同。两者都需要适当的同步机制来保证数据一致性。

4.5 py脚本模拟网络请求的多任务选择

  • 推荐方案:多线程
  • 原因分析
    I/O密集型任务:浏览器模拟请求主要涉及网络通信
    等待时间长:网络请求存在大量等待时间
    GIL释放:I/O等待期间Python会释放GIL,允许其他线程执行
  • 适用场景
    网络请求:HTTP请求、API调用
    页面加载:等待服务器响应
    数据获取:从远程服务器获取数据
  • 实现建议
    使用 threading 模块
    或考虑 asyncio + aiohttp 异步方案
    避免多进程的资源开销
  • 注意事项
    控制并发数量,避免对目标服务器造成压力
    处理好线程安全问题
    考虑请求失败的重试机制

4.6 AI模型数据处理的多任务选择

例如ai中Transformer架构(一种革命性的深度学习神经网络架构)提供的自注意力机制,会将一句话分词并转换为向量,然后通过一次矩阵运算为整句话生成 Q、K、V 三个矩阵。这些计算是高度相关的,它们共同构成了一个完整的自注意力层,在实际的 PyTorch/TensorFlow(用于搭建和训练深度学习模型的开源框架) 实现中,虽然 Python 解释器受 GIL 限制,但底层的矩阵运算由 C++/CUDA 等库在 GPU/CPU 上并行执行。因此,在服务端部署时,通常采用多线程来高效地调度这些并行计算任务(如处理多个请求或多头注意力),并共享模型参数和中间结果。

1769139057194

根据自注意力公式:

1769397534057

最终会将一句话生成Q/K/V矩阵(自注意力机制后续计算的基础),类似这样(每行代表一个分词):
1769399880080

Q (Query):代表“查询者”,用于去“询问”其他词。

K (Key) & V (Value):代表“被查询的内容”。每个词也都有自己的键值对 (k_i, v_i),供其他词的 Q来查询。

计算第 i个词的输出时,需要用到所有词KV,但仅使用第 i个词自己的 Q

LLM 对输入(比如一句话)在每一层 Transformer 中都会生成 Q、K、V 矩阵,并在该层用它们做自注意力计算,然后把结果传给下一层,形成多层堆叠

1. 数据局部性与共享状态

  • 在计算第 i个词的注意力时,你需要访问所有其他词的信息(即整个句子的 KV矩阵)。
  • 如果使用多进程,每个进程都有自己独立的内存空间。这意味着:
    • 你需要将整个 KV矩阵复制给每一个进程。
    • 进程间通信(IPC)会有显著的开销。
  • 如果使用多线程,所有线程都共享同一份 KV矩阵。当一个线程读取数据时,其他线程也可以直接读取,零拷贝,速度极快

2. 任务粒度与开销

  • 要处理的任务数量等于句子长度(比如20个词)。这是一个非常轻量级的任务集合。
  • 创建进程的开销非常大(毫秒级),而创建线程的开销非常小(微秒级)。为了处理20个词去创建20个进程,然后还要进行进程间通信,这在性能上是完全不可接受的。

3. 现代框架的实现方式

  • 很多框架,底层会自动调用高度优化的库。
  • 这些库本身就是多线程的。它会自动利用你电脑的所有CPU核心或GPU核心来进行并行计算。

举个例子:

这就像一家餐厅(你的程序)接到一个订单(一个句子)。

  • 多线程:厨房里的几个厨师(线程)共享同一个冰箱(内存),一起协作完成一道菜(一个句子)的不同步骤。他们之间沟通顺畅,效率极高。
  • 多进程:你为了切一根葱(一个词),专门开了一家新餐厅(进程),这家新餐厅还需要从老餐厅运过去所有的食材(数据拷贝)。

4.7 什么时候会考虑多进程?

虽然在这个具体任务上不适用,但多进程在AI领域有其不可替代的价值,主要体现在任务级别的并行

场景 原因
批量推理 你有1000个句子要处理。与其一个接一个地算,不如开4个进程,每个进程处理250个句子。它们各自独立,互不干扰,实现了批处理任务的并行。
服务高并发 一个Web API同时收到100个请求。为每个请求创建一个独立的进程(或使用进程池),可以避免某个请求的异常导致整个服务崩溃。
模型集成 你要同时使用BERT、RoBERTa、GPT等多个不同的模型做预测。每个模型可以在自己的进程中运行,避免相互阻塞。

继续上面的比喻:

  • 多进程:餐厅一下子来了100个订单。老板开了10个分店(进程),每个分店独立处理10个订单。这样总的处理速度就快了10倍。