celery worker 开启多进程来模拟进程池,每个子进程里面执行 requests.get()。大体代码如下:
celery task 定义
@ce.task(name='xxx', bind=True)
def period_external_nascan(self, *args, **kwargs):
wfd = WebFingerprintDiscern(kwargs)
wfd.run()
类定义
import billiard as multiprocessing
class WebFingerprintDiscernProcess(multiprocessing.Process):
def __init__(self, config, queue, lock): # rules, basic_infos,
multiprocessing.Process.__init__(self)
self.config = config
self.queue = queue
self.lock = lock
def run(self):
while True:
try:
url = self.queue.get(block=False)
except SoftTimeLimitExceeded as e:
raise
except queue.Empty as e:
# logging.debug(e, exc_info=True) # KLD 记录进程退出,多线程容易导致 logging 死锁
break
try:
try:
wa = requests.get(url, xxx)
...还有其他耗时操作,大概耗时 20 分钟。
except SoftTimeLimitExceeded as e:
raise
except Exception as e:
logging.error('update url(%s) state error, %s' % (url, e), exc_info=True)
except SoftTimeLimitExceeded as e:
raise
except Exception as e:
logging.error(e, exc_info=True)
finally:
self.queue.task_done()
class WebFingerprintDiscern:
def __init__(self, config):
self.config = config
self.scan_list = json.loads(self.config['task_target'])
self.queue = multiprocessing.JoinableQueue()
self.lock = multiprocessing.Lock()
self.process_count = 16 # 默认进程数
def run(self):
self.scan_list = sorted(self.scan_list) # ['http://2.2.2.2:22', 'http://www.baidu.com', ......]
for url in self.scan_list:
self.queue.put(url)
time.sleep(3) # KLD 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty()才会返回 False。 参考: https://docs.python.org/zh-cn/3/library/multiprocessing.html
self.scan_start()
def scan_start(self):
"""
进程池扫描启动
:return:
"""
for i in range(self.process_count):
t = WebFingerprintDiscernProcess(self.config, self.queue, self.lock)
t.daemon = True
t.start()
self.queue.join()
启动第一个 celery worker
celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=/root/python/asset/worker1.state -n worker1@%h
启动第二个 celery worker
celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=/root/python/asset/worker2.state -n worker2@%h
每个 worker 里面接收到的 scan_list 大概 254 个域名,每个域名在子进程中 requests.get()之后还需要执行一段耗时约 20 分钟的分析操作,所以创建的 16 个子进程不可能很快就结束,需要跑一段时间。
现在的问题是同时运行两个 celery worker 之后的 2-3 分钟之内,部分进程就会意外结束(意外是指捕获不到任何异常或错误),变成僵尸进程,就像下面这种,有可能是因为 https://docs.python.org/zh-cn/3/library/multiprocessing.html,于是我在上面的代码中用了 time.sleep(3),但是仍然没有解决该问题。
[root@VM_9_196_centos asset]# ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
Z+ 10503 10509 [celery] <defunct>
Z+ 10503 10510 [celery] <defunct>
等待较长一段时间后,发现 worker2 的 16 个子进程均变为僵尸进程,如下图,而 worker2 的任务迟迟没有结束,这个地方一直想不明白,16 个子进程已经全部变为僵尸进程,应该主进程就会结束了,没结束说明队列中还有值?因为最后有 self.queue.join(),但是如果队列中有值,子进程不可能全部结束,因为子进程里面调用了 self.queue.get(),可能是取出来之后没走到 task_done ?或者取的过程出现了问题?这个地方一直没想明白咋回事。
[root@VM_9_196_centos asset]# ps -ef | grep python
root 10485 21930 0 17:27 pts/4 00:00:29 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker2.state -n worker2@%h
root 10503 10485 0 17:27 pts/4 00:00:00 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker2.state -n worker2@%h
root 22192 28529 0 15:01 pts/6 00:00:44 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root 22238 22192 0 15:01 pts/6 00:00:00 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root 22245 22238 7 15:01 pts/6 00:30:23 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
root 22258 22238 7 15:01 pts/6 00:30:29 xxx/celery -A celery_worker.ce worker -Q tasks -l info --concurrency=1 --statedb=worker1.state -n worker1@%h
[root@VM_9_196_centos asset]# ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
Z+ 10503 10509 [celery] <defunct>
Z+ 10503 10510 [celery] <defunct>
Z+ 10503 10511 [celery] <defunct>
Z+ 10503 10512 [celery] <defunct>
Z+ 10503 10513 [celery] <defunct>
Z+ 10503 10514 [celery] <defunct>
Z+ 10503 10515 [celery] <defunct>
Z+ 10503 10516 [celery] <defunct>
Z+ 10503 10517 [celery] <defunct>
Z+ 10503 10518 [celery] <defunct>
Z+ 10503 10519 [celery] <defunct>
Z+ 10503 10520 [celery] <defunct>
Z+ 10503 10521 [celery] <defunct>
Z+ 10503 10522 [celery] <defunct>
Z+ 10503 10523 [celery] <defunct>
Z+ 10503 10524 [celery] <defunct>
Z+ 22238 22243 [celery] <defunct>
Z+ 22238 22244 [celery] <defunct>
Z+ 22238 22246 [celery] <defunct>
Z+ 22238 22247 [celery] <defunct>
Z+ 22238 22248 [celery] <defunct>
Z+ 22238 22249 [celery] <defunct>
Z+ 22238 22250 [celery] <defunct>
Z+ 22238 22251 [celery] <defunct>
Z+ 22238 22252 [celery] <defunct>
Z+ 22238 22253 [celery] <defunct>
Z+ 22238 22254 [celery] <defunct>
Z+ 22238 22255 [celery] <defunct>
Z+ 22238 22256 [celery] <defunct>
Z+ 22238 22257 [celery] <defunct>
[root@VM_9_196_centos asset]#
之前是使用的多线程,但是遇到了这个问题 https://www.v2ex.com/t/597143 ,而且不想去掉 logging,就改成了多进程。
问题:
目前实验结果: 如果只运行一个 celery worker 好像是没有问题的,刚开始子进程不会意外结束,持续观察中,猜测最后执行完也能够正常结束?
奖金发放规则: 每个问题优先解答者获奖金,如果对了一半,另外一个人对了一半,合力帮助解决,一人一半的奖金。
联系方式: 扣扣 9614 六二 392
非常感谢你的阅读,让我困扰了好久。
1
hhbcarl 2019-10-29 00:02:24 +08:00
没仔细分析你的问题的根因,但是有个建议是既然用了 Celery,就应该用好 Celery。它是能够自己管理并行任务的,没必要自己去创建和管理进程。
详见: http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups |
2
ClericPy 2019-10-29 00:53:58 +08:00
太长了... 挣钱机会留给学生吧...
celery 自从在公司里见识过内存泄漏的小坑以后, 基本不敢碰了 不过提到僵尸进程, 感觉有点像我以前碰到过的情况, 先确认下是僵尸进程还是孤儿进程, 两者不太一样. 我最后是用 psutil 粗暴查杀的... 不过之前用的方法可以给你参考下: 1. close_fds 参数 2. kill 子进程的时候一定要 wait, wait 超时(Python3 才有超时... Python2 自己用 timer 做)再去强杀 3. 考虑子进程里带上个 timer kill self 吧, 这个方法最蠢又最简单... 俗称蠢强蠢强的... |
3
lolizeppelin 2019-10-29 09:03:33 +08:00
都是 linux 编程基础,和 python 关系不大
老老实实去熟悉 fork exec wait 等 linux 编程基础吧,这些基础支持不懂,无论你用什么多进程库都会遇到类似问题。 顺便...对于不熟悉 linux 编程也不想读 multiprocessing 源码的同学, 稍微复杂一点的代码都不要使用 multiprocessing |
4
ytymf 2019-10-29 09:45:39 +08:00
multiprocessing 一定要确保父进程是单线程的,如果不能确保,可以尝试一下启动方式为 spawn 或者 forkserver 启动进程。
|
5
hanssx OP @hhbcarl 谢谢师父,你说的这个 canvas 之前有了解,但是现在项目代码不太容易改,周末尝试一下。
---------------------------------------------------------------------------------------------------------------------------------------- @ClericPy 是僵尸进程,Z 代表 Zombie,僵尸进程产生的原因是因为子进程结束之后,父进程仍然存在且没有 wait 或 communicate 子进程,产生原因我倒是清楚,只是不明白为什么一开始就有子进程结束,因为队列里面的东西很多,不可能一开始子进程就结束的。 ---------------------------------------------------------------------------------------------------------------------------------------- @ytymf “multiprocessing 一定要确保父进程是单线程的”,这个是啥意思呀,是确保父进程是单进程吧? celery worker 里面看确实是单进程。 |
6
ytymf 2019-10-29 13:18:43 +08:00 1
@hanssx 不,就是确保父进程是单线程的,也就是说你要 fork 的那个进程里不能有多个线程。可以了解一下 fork safe 这个概念,fork 出来的子进程并不会继承父进程的所有线程,会造成一些问题。
为了解决这个问题,multiprocessing 给出了 spwan 跟 forkserver 两个启动子进程的方式,这里摘抄一点官方文档: Depending on the platform, multiprocessing supports three ways to start a process. These start methods are spawn The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process objects run() method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver. Available on Unix and Windows. The default on Windows and macOS. fork The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic. Available on Unix only. The default on Unix. forkserver When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited. |
7
ytymf 2019-10-29 13:27:02 +08:00
@hanssx 看 fork 的介绍,Note that safely forking a multithreaded process is problematic. 我的经验是,后果就是 fork 出的子进程会成为僵尸进程
|
8
hanssx OP @ytymf 多谢指教,学到了。我这个主进程是 celery worker 产生的进程,我代码中并没有用线程,更没有用多线程。僵尸进程产生的原因应该子进程结束之后,父进程仍然存在且没有 wait 或 communicate 子进程,也就是没处理子进程发来的 SIGCHILD KILL 那个信号。
|
9
ytymf 2019-10-29 13:59:56 +08:00 1
@hanssx 很难说 celery worker 本身的实现里面没有线程,而且大概率是有线程的。你可以简单试试改下进程启动方法为 spawn,如果问题解决那就是这个问题。
|
11
lolizeppelin 2019-10-30 11:11:53 +08:00
|
12
lolizeppelin 2019-10-30 11:16:06 +08:00
都说了, 不熟悉 linux 编程也不想读 multiprocessing 源码的同学, 稍微复杂一点的代码都不要使用 multiprocessing
好好把 linux 相关的父子进程,信号处理学学也就半天时间 回头你再看 multiprocessing 自然知道到底是什么问题,原理不了解和你说也会变更多基础问题疑问 好好学习下 linux 这部分基础知识真不需要那多时间的,论坛反而是浪费时间。 |
13
hanssx OP @ytymf 试了一下,celery worker 直接启动不起来,考虑用 canvas 重构一下,还是感谢师父,学到了。
import billiard as multiprocessing multiprocessing.set_start_method('spawn') |
14
hanssx OP @lolizeppelin 那个是这样的,本身使用 multiprocessing 没啥问题,在 celery worker 里面使用就会有问题,celery 也给了一个 billiard,算是 multiprocessing 的 patch。
|
15
lolizeppelin 2019-10-30 11:39:56 +08:00
这么说吧, multiprocessing 没有问题,你知道原因根本不用 patch
spwan 也就是是 fork exec 了一个新 python 进程专门执行部分代码避免当前进程的污染 但是这种绕圈的方式是不合理的和混乱的. 当然你觉得能用就行也无所谓. |
16
hanssx OP @lolizeppelin 额,spawn 模式启动不起来 celery worker,师父你知道哪儿的问题吗,能说一下吗。
你说的基础知识,我理解可能是信号处理,你是说子进程结束的时候发送 SIGCHILD KILL 之类的信号,如果父进程不处理就会使子进程变为僵尸进程是吧,这个我是知道的,我也避免了,加上了子进程.join() |
17
lolizeppelin 2019-10-30 12:31:34 +08:00 1
好好学习下 linux 这部分基础知识真不需要那多时间的
先看信号 https://www.ibm.com/developerworks/cn/linux/l-ipc/part2/index1.html 再看僵尸进程 https://monkeysayhi.github.io/2018/12/05/%E6%B5%85%E8%B0%88Linux%E5%83%B5%E5%B0%B8%E8%BF%9B%E7%A8%8B%E4%B8%8E%E5%AD%A4%E5%84%BF%E8%BF%9B%E7%A8%8B/ 最好把 ibm 文档理进程间通信都好好看一便,认真看多看几遍,看明白了,想明白了,再回头看你刚才帖子说“我知道的”部分是哪里错了,好好了解下父子进程共享了什么东西会导致什么问题 基础打牢了很多问题就解决了.真的不难,没搞清楚只能瞎问 后面可以 subprocess 的源码熟悉进程间通信和 fd 关闭在 python 的写法和处理方式 然后把 https://github.com/openstack/oslo.service/blob/master/oslo_service/service.py 的代码读透 然后有需要可以读下 multiprocessing 的源码,不需要通读你有上面的知识大致接到 multiprocessing 如何工作的即可 之后你不需要论坛理问什么 celery 多进程的问题了,搞懂原理了才能真正解决问题,因为这些问题都不是 python 的问题都是系统原理性问题 |
18
ytymf 2019-10-30 13:21:45 +08:00
@lolizeppelin 只是给楼主一个快速尝试看能不能解决问题的方法,确实是不推荐直接 spawn 的。你说的没错,是要看好基础,可是也得解决具体的问题才行。
|
19
ytymf 2019-10-30 13:31:21 +08:00 1
@hanssx billiard 我没用过,可能跟原生的 multiprocessing 有区别了,最终可以尝试下 forkserver。其实楼上说的有道理,你的用法有点点奇特了,即使用 spwan 或者 forkserver 能够启动,也不是最优雅的办法,还是重构成正经的方法比较好
|
20
lolizeppelin 2019-10-30 13:47:37 +08:00
你的快速方法明显是不对的
学习的时候遇到深度问题如果超过能力或者没有必要的确需要跳过,但是非常基础的问题习惯性跳过是不行的 多进程问题是非常基础也常用的知识,只要你用到多进程就会遇到相关问题,这次绕圈解决了下次一样有问题解决不了 老老实实把坑填了用的时间比这次绕圈解决长一点,但是你淌过去了以后就解决起其他多进程问题就有底了 python 残疾的多线程是必须多进程的,不搞懂以后坑的是自己 |
21
ytymf 2019-10-30 13:55:27 +08:00
@lolizeppelin 楼主给的题目是有限定的,是个命题作文,那就是如何在 celery worker 下启动子进程,这个问题要怎么解决?即使通晓了你给的所有知识,遇到这个具体的问题不还是得看 celery 具体的实现,找出冲突的共享资源?如果父进程是楼主自己写的,我一定会推荐他搞明白。可面对的是一个庞杂的第三方库,又是具体的命题作文,那要怎么回答呢?
|
22
hanssx OP @ytymf 确实是的,这个父进程是 celery worker 的,如果不用 celery,这套东西是没问题的,我修改了如下代码:
``` def run(self): self.scan_list = sorted(self.scan_list) # ['http://2.2.2.2:22', 'http://www.baidu.com', ......] for url in self.scan_list: self.queue.put(url) time.sleep(3) # KLD 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty()才会返回 False。 参考: https://docs.python.org/zh-cn/3/library/multiprocessing.html self.scan_start() ``` 改为 ``` def run(self): self.scan_list = sorted(self.scan_list) # ['http://2.2.2.2:22', 'http://www.baidu.com', ......] scan_list_len = len(self.scan_list) # self.total_domain_cnt = len(self.scan_list) for url in self.scan_list: self.queue.put(url) while self.queue.qsize() != scan_list_len: time.sleep(3) # KLD 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty()才会返回 False。 参考: https://docs.python.org/zh-cn/3/library/multiprocessing.html self.scan_start() ``` ``` def scan_start(self): """ 进程池扫描启动 :return: """ for i in range(self.process_count): t = WebFingerprintDiscernProcess(self.config, self.queue, self.lock) t.daemon = True t.start() self.queue.join() ``` 改为 ``` def scan_start(self): """ 进程池扫描启动 :return: """ process_list = [] for i in range(self.process_count): # As far as possible one should try to avoid shifting large amounts of data between processes. # https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming t = WebFingerprintDiscernProcess(self.config, self.queue, self.lock) t.daemon = True t.start() process_list.append(t) for p in process_list: p.join() self.queue.join() ``` 不过我的用法确实太奇怪,celery 官方是不推荐在里面使用多进程的,billiard 看 issue 里面好像也要换的意思。 |
23
hanssx OP 这回复不支持 markdown 也有点难受。
|
24
lolizeppelin 2019-10-30 14:17:34 +08:00
无论什么扩展,最后都是要 fork
搞清楚多进程知识以后, 是不是 celery 都没关系,知道什么时候退出,什么时候回收就是 哪有那么麻烦, 只要你处理好异常,os._exit 的时候我管你是 celery 还是 flask 还是 dj |
25
ytymf 2019-10-30 14:32:03 +08:00
@lolizeppelin 现实中是麻烦的,如果父进程不是 fork safe 的,就是很难处理,不是任何父进程都可以很安全地被 fork 的(与 python 无关),参考
https://stackoverflow.com/questions/6078712/is-it-safe-to-fork-from-within-a-thread 不知道是不是这个原因,python3.8 中 macos 下的 multiprocessing 的默认启动方式也被改成了 spwan,之前是 fork。 |
26
lolizeppelin 2019-10-30 14:52:51 +08:00
没有区别, posix_spawn 就是 fork+exec,以前没有 posix_spawn 系统调用而已
搞清楚了原理,自然知道如何避免 celery 有可能带来的负面影响 multiprocessing 本来就是就是单次脚本中让你快速多进程跑代码的库,自己里面生线程还有管道 /socket 通信,里面不说多复杂但进程肯定干净,混合到复杂代码里基础不牢问题都不知道出哪,本来就不适合用到服务中。 特别是 multiprocessing 进程池中可复用的进程除非你能确定进程不被污染,否则跑起来就是自己找坑。 所以我早说了, 对于不熟悉 linux 编程也不想读 multiprocessing 源码的同学, 稍微复杂一点的代码都不要使用 multiprocessing。 |
27
lolizeppelin 2019-10-30 14:53:18 +08:00
是进程肯定不干净
|
28
ytymf 2019-10-30 17:19:48 +08:00
@lolizeppelin “搞清楚了原理,自然知道如何避免 celery 有可能带来的负面影响”, 这一点具体要怎么做呢,读 celery 的源代码么
|
29
hanssx OP 我之前写的用 join 的方法不对,join 方法虽然也有 wait 的功能,但是如果单个进程 join 还好,多个进程的话可能在 join 之前进程就已经结束变为僵尸进程,现在的方法是处理 SIGCHILD 信号,这样保证不会有僵尸进程了,至于还会不会有其他问题,下下周再跑程序时反馈,感谢各位的帮助。如果有更好的办法,请告知,红包以表谢意。
|