python 多进程 process
Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
target 是函数名字,需要调用的函数
args 函数需要的参数,以 tuple 的形式传入 \
star() 方法启动进程
join() 方法实现进程间的同步,等待所有进程退出。
close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。
is_alive() 判断进程状态。
terminate() 杀死进程。 \
multiprocessing 代码示例
import multiprocessing
import os
def run_proc(name):
print('Child process {0} {1} Running '.format(name, os.getpid()))
if __name__ == '__main__':
print('Parent process {0} is Running'.format(os.getpid()))
for i in range(5):
p = multiprocessing.Process(target=run_proc, args=(str(i),))
print('process start')
p.start()
p.join()
print('Process close')
运行结果 \
Parent process 13478 is Running
process start
process start
Child process 0 13479 Running
process start
Child process 1 13480 Running
process start
process start
Child process 2 13481 Running
Child process 3 13482 Running
Child process 4 13483 Running
Process close
进程管理模型
模型预测进程需要使用到 Pipe ,用来在两个进程间通信,两个进程分别位于管道的两端。
class PredictServer:
def __init__(self):
self.version = {}
def func(self, conn):
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import tensorflow as tf
has_model = False
try:
from predict_model import predict_batch
a = predict_batch(["xxx"], ["xxx"])
has_model = True
print(a)
except:
print("model not found...")
while True:
temp = conn.recv()
flag_ = True
try:
json_data = json.loads(temp)
if has_model:
res = predict_batch(json_data["text_as"], json_data["text_bs"])
else:
res = "not found model"
except Exception as e:
print(repr(e))
res = "sorry, error"
conn.send(res)
print(f"子进程:{os.getpid()} ,接受数据:{temp},返回:{res}")
def predict_batch(self, text_as, text_bs):
json_data = {
"text_as": text_as,
"text_bs": text_bs
}
key = "relation_model"
json_data_string = json.dumps(json_data)
pops = []
for k, v in self.version.items():
if not v[2].is_alive():
pops.append(k)
[self.version.pop(_) for _ in pops]
if self.version.get(key, None):
print("has model")
[conn_a, conn_b, p, _] = self.version[key]
if p.is_alive():
conn_b.send(json_data_string)
a = conn_b.recv()
else:
a = "model is close"
return a
else:
print(f"init model {key}")
conn_a, conn_b = multiprocessing.Pipe()
print("ok1")
p = multiprocessing.Process(target=self.func, args=(conn_a,))
p.daemon = True
self.version[key] = [conn_a, conn_b, p, True]
p.start()
self.version[key] = [conn_a, conn_b, p, True]
print("ok2")
conn_b.send(json_data_string)
print("ok3")
a = conn_b.recv()
print("ok4")
return a
p_model = PredictServer()
p_model.predict_batch(["xxx"],["xxx"])
模型管理类回自动检测是否创建进程,之后就会一直使用 后台预测模型服务,进行预测。
多版本切换功能
class PredictServer:
def __init__(self):
self.version = {}
def func(self, conn, version):
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import tensorflow as tf
has_model = False
key = f"{version}"
try:
bsm = Model(version=version)
a = bsm.predict("xxx")
has_model = True
print(a)
except:
print("model not found...")
while True:
temp = conn.recv()
flag_ = True
try:
json_data = json.loads(temp)
if json_data["version"] == version:
if has_model:
res = bsm.predict(json_data["text"])
else:
res = "not found model"
else:
res = "sorry, not match"
except Exception as e:
print(repr(e))
res = "sorry, error"
conn.send(res)
flag = flag_ and self.version.get(key, None) is not None and self.version[key][3]
print(f"进程状态:{flag},子进程:{os.getpid()} ,接受数据:{temp},返回:{res}")
def predict(self, text, version):
json_data = {
"version": version,
"text": text
}
json_data_string = json.dumps(json_data)
key = f"{version}"
pops = []
for k, v in self.version.items():
if not v[2].is_alive():
pops.append(k)
[self.version.pop(_) for _ in pops]
if self.version.get(key, None):
print("has model")
[conn_a, conn_b, p, _] = self.version[key]
if p.is_alive():
conn_b.send(json_data_string)
a = conn_b.recv()
else:
a = "model is close"
return a
else:
print(f"init model {version}")
conn_a, conn_b = multiprocessing.Pipe() # 创建一个管道,两个口
print("ok1")
p = multiprocessing.Process(target=self.func, args=(conn_a, version))
p.daemon = True
self.version[key] = [conn_a, conn_b, p, True]
p.start()
self.version[key] = [conn_a, conn_b, p, True]
pops = []
for k, v in self.version.items():
if key != k:
v[2].terminate()
print("stop process")
v[2].join()
pops.append(k)
[self.version.pop(_) for _ in pops]
print("ok2")
conn_b.send(json_data_string)
print("ok3")
a = conn_b.recv()
print("ok4")
return a
p_model = PredictServer()
version = 1
a = p_model.predict("xxx", version)
模型管理类回自动检测是否创建进程,同时自动切换版本(其他版本默认失效,当然修改策略保存也可)。
模型进程启动与下线
模型启动
version = 1
a = p_model.predict("xxx", version)
模型下线
for k,v in p_model.version.items():
print(k,v)
if v[2].is_alive():
v[2].terminate()
print("stop process")
v[2].join()
模型状态
for k,v in p_model.version.items():
print(k, v[2].is_alive())
bug解决
# 此处代码最好放在接受,发送之后进行,避免进程卡死
pops = []
for k, v in self.version.items():
if key != k:
v[2].terminate()
print("stop process")
v[2].join()
pops.append(k)
[self.version.pop(_) for _ in pops]
多进程遇到的坑:
1、gpu检测不到;(需要导入 tf,指定 显卡)
2、要么都用多进程,和程序启动的模型预测回存在卡死情况(原因还没找到)
3、多进程卡死,如果管道中没有数据了,接收端还去接收的话,会卡死。没有任何反应和提示,就卡住