python多进程-multiprocessing

多进程-介绍

Posted by Cong Yu on March 7, 2020

python 多进程 process

Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建

multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

target 是函数名字,需要调用的函数
args 函数需要的参数,以 tuple 的形式传入 \

star() 方法启动进程
join() 方法实现进程间的同步,等待所有进程退出。
close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。
is_alive() 判断进程状态。
terminate() 杀死进程。 \

multiprocessing 代码示例

import multiprocessing
import os
 
def run_proc(name):
    print('Child process {0} {1} Running '.format(name, os.getpid()))
 
if __name__ == '__main__':
    print('Parent process {0} is Running'.format(os.getpid()))
    for i in range(5):
        p = multiprocessing.Process(target=run_proc, args=(str(i),))
        print('process start')
        p.start()
    p.join()
    print('Process close')

运行结果 \

Parent process 13478 is Running
process start
process start
Child process 0 13479 Running 
process start
Child process 1 13480 Running 
process start
process start
Child process 2 13481 Running 
Child process 3 13482 Running 
Child process 4 13483 Running 
Process close

进程管理模型

模型预测进程需要使用到 Pipe ,用来在两个进程间通信,两个进程分别位于管道的两端。

class PredictServer:

    def __init__(self):
        self.version = {}

    def func(self, conn):
        os.environ['CUDA_VISIBLE_DEVICES'] = '0'
        import tensorflow as tf
        has_model = False
        try:
            from predict_model import predict_batch
            a = predict_batch(["xxx"], ["xxx"])
            has_model = True
            print(a)
        except:
            print("model not found...")
        while True:
            temp = conn.recv()
            flag_ = True
            try:
                json_data = json.loads(temp)
                if has_model:
                    res = predict_batch(json_data["text_as"], json_data["text_bs"])
                else:
                    res = "not found model"
            except Exception as e:
                print(repr(e))
                res = "sorry, error"
            conn.send(res)
            print(f"子进程:{os.getpid()} ,接受数据:{temp},返回:{res}")

    def predict_batch(self, text_as, text_bs):
        json_data = {
            "text_as": text_as,
            "text_bs": text_bs
        }
        key = "relation_model"
        json_data_string = json.dumps(json_data)
        pops = []
        for k, v in self.version.items():
            if not v[2].is_alive():
                pops.append(k)
        [self.version.pop(_) for _ in pops]
        if self.version.get(key, None):
            print("has model")
            [conn_a, conn_b, p, _] = self.version[key]
            if p.is_alive():
                conn_b.send(json_data_string)
                a = conn_b.recv()
            else:
                a = "model is close"
            return a
        else:
            print(f"init model {key}")
            conn_a, conn_b = multiprocessing.Pipe()
            print("ok1")
            p = multiprocessing.Process(target=self.func, args=(conn_a,))
            p.daemon = True
            self.version[key] = [conn_a, conn_b, p, True]
            p.start()
            self.version[key] = [conn_a, conn_b, p, True]
            print("ok2")
            conn_b.send(json_data_string)
            print("ok3")
            a = conn_b.recv()
            print("ok4")
            return a
            
p_model = PredictServer()
p_model.predict_batch(["xxx"],["xxx"])

模型管理类回自动检测是否创建进程,之后就会一直使用 后台预测模型服务,进行预测。

多版本切换功能

class PredictServer:

    def __init__(self):
        self.version = {}

    def func(self, conn, version):
        os.environ['CUDA_VISIBLE_DEVICES'] = '0'
        import tensorflow as tf
        has_model = False
        key = f"{version}"
        try:
            bsm = Model(version=version)
            a = bsm.predict("xxx")
            has_model = True
            print(a)
        except:
            print("model not found...")
        while True:
            temp = conn.recv()
            flag_ = True
            try:
                json_data = json.loads(temp)
                if json_data["version"] == version:
                    if has_model:
                        res = bsm.predict(json_data["text"])
                    else:
                        res = "not found model"
                else:
                    res = "sorry, not match"
            except Exception as e:
                print(repr(e))
                res = "sorry, error"
            conn.send(res)
            flag = flag_ and self.version.get(key, None) is not None and self.version[key][3]
            print(f"进程状态:{flag},子进程:{os.getpid()} ,接受数据:{temp},返回:{res}")

    def predict(self, text, version):
        json_data = {
            "version": version,
            "text": text
        }
        json_data_string = json.dumps(json_data)
        key = f"{version}"
        pops = []
        for k, v in self.version.items():
            if not v[2].is_alive():
                pops.append(k)
        [self.version.pop(_) for _ in pops]
        if self.version.get(key, None):
            print("has model")
            [conn_a, conn_b, p, _] = self.version[key]
            if p.is_alive():
                conn_b.send(json_data_string)
                a = conn_b.recv()
            else:
                a = "model is close"
            return a
        else:
            print(f"init model {version}")
            conn_a, conn_b = multiprocessing.Pipe()  # 创建一个管道,两个口
            print("ok1")
            p = multiprocessing.Process(target=self.func, args=(conn_a, version))
            p.daemon = True
            self.version[key] = [conn_a, conn_b, p, True]
            p.start()
            self.version[key] = [conn_a, conn_b, p, True]
            pops = []
            for k, v in self.version.items():
                if key != k:
                    v[2].terminate()
                    print("stop process")
                    v[2].join()
                    pops.append(k)

            [self.version.pop(_) for _ in pops]
            print("ok2")
            conn_b.send(json_data_string)
            print("ok3")
            a = conn_b.recv()
            print("ok4")
            return a
            
p_model = PredictServer()
version = 1
a = p_model.predict("xxx", version)

模型管理类回自动检测是否创建进程,同时自动切换版本(其他版本默认失效,当然修改策略保存也可)。

模型进程启动与下线

模型启动

version = 1
a = p_model.predict("xxx", version)

模型下线

for k,v in p_model.version.items():
    print(k,v)
    if v[2].is_alive():
        v[2].terminate()
        print("stop process")
        v[2].join() 

模型状态

for k,v in p_model.version.items():
    print(k, v[2].is_alive())

bug解决

# 此处代码最好放在接受,发送之后进行,避免进程卡死
pops = []
for k, v in self.version.items():
    if key != k:
        v[2].terminate()
        print("stop process")
        v[2].join()
        pops.append(k)

[self.version.pop(_) for _ in pops]


多进程遇到的坑:

1、gpu检测不到;(需要导入 tf,指定 显卡)

2、要么都用多进程,和程序启动的模型预测回存在卡死情况(原因还没找到)

3、多进程卡死,如果管道中没有数据了,接收端还去接收的话,会卡死。没有任何反应和提示,就卡住


Reference