V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
fjhh
V2EX  ›  Python

求问 Python 大神,多进程处理文本内数据

  •  
  •   fjhh · 2017-01-02 00:11:51 +08:00 · 6789 次点击
    这是一个创建于 2665 天前的主题,其中的信息可能已经有所发展或是发生改变。
    我的程序要实现的功能很简单,就是打开一个写有 1-500000 的 test.txt 的文档,数文档中奇数和偶数的个数,并写入一个字典
    用多进程的方法,就是把这 500000 个数分成 n 份,每个进程处理 500000/n 个数
    总之最后发现, num_of_process 越大,速度越慢,当等于 1 的时候,速度最快。这是为何?按理多进程,时间应变小。程序要做哪方面优化?

    受人之托,特来求问。惭愧了,妹子竟然写的如此一手好代码……感谢。

    from multiprocessing import pool
    import time
    import os
    import copy
    import multiprocessing


    labels= {'0': 0, '1': 0}
    train_set = 'test.txt'
    num_of_process = 2
    def statistics(file,label):
    num=int(file)
    if num%2==0:
    label['0']+=1
    else:
    label['1']+=1

    def union_dict(objs):
    _keys = set(sum([obj.keys() for obj in objs], []))
    _total = {}
    for _key in _keys:
    _total[_key] = sum([obj.get(_key, 0) for obj in objs])
    return _total

    def myprocess(data,i):
    labelnew=copy.deepcopy(labels)
    for afile in data[i*len(data)/num_of_process:(i+1)*len(data)/num_of_process]:
    statistics(afile, labelnew)
    return labelnew

    if __name__=='__main__':
    e1 = time.time()
    pool = multiprocessing.Pool(processes = multiprocessing.cpu_count())#processes=4 in my mac
    result_list=[]
    data_train=open(train_set,'r').readlines()
    for i in xrange(num_of_process):
    #multiprocessing.Process(myprocess(target=myprocess,args=[data_train,i]))
    result=pool.apply_async(myprocess,(data_train,i))
    result_list.append(result.get())

    print result_list

    pool.close()
    pool.join()
    e2 = time.time()
    print float(e2 - e1)
    18 条回复    2017-01-04 08:48:41 +08:00
    ipwx
        1
    ipwx  
       2017-01-02 00:17:22 +08:00
    机械硬盘随机读取需要寻道,速度远低于顺序读取。所以你进程分得越多越慢。

    话说现在的程序员都不知道硬盘 IO 是大部分程序的瓶颈了吗?
    ipwx
        2
    ipwx  
       2017-01-02 00:20:45 +08:00
    。。。抱歉没看你的程序就回复了。你的程序的问题不在于读写,大概在于进程间数据的拷贝。你的 data_train 在 pool 建立之后才读取的,目测数据要拷贝到子进程里面。

    如果是 *nix ,你可以试试把读取数据放到 pool 创建之前,也许有效。实在不行你用 os.fork 。
    ipwx
        3
    ipwx  
       2017-01-02 00:23:46 +08:00
    话说最后吐槽一句:你都把文件整个读到内存里面了还分进程个屁啊,统计奇数和偶数明明是 O(n) 的算法,你读取文件也是 O(n) 的操作,干嘛不一个进程直接边读边统计,还要 readlines 读到内存里面做啥。

    如果你想要每个进程分别读取部分数据(一开始我看你的题目就是这么猜想的,所以直接回答了一楼),那就会遇到机械硬盘的瓶颈,还是没有意义。

    所以结论就是,你这例子根本什么意义都没有,就算进程越多越慢也不能说明任何问题。
    likuku
        4
    likuku  
       2017-01-02 00:55:26 +08:00
    建议可以这样玩:
    1.探测当前系统 CPU 核心数 m ,
    2.将文件切分成 m 份存硬盘上,根据 m 份文件名生成任务列表,
    3.根据任务 /文件名列表创建最大进程数为 j=m 或者 j= m+2 的多进程池
    4.进程池发动任务,呼叫任务进程执行(每进程的结果写入独立某 db/某文件),记得一定得丢到后台去跑
    4.1 任务进程先检查当前系统里是否正在执行的任务进程 =j or > j (避免超载),若有空闲,则立即执行,否则等待 0.1 秒,循环检查
    5.任务进程结束,汇总结果数据
    #3 ( j 得实际测试,根据之前玩 freebsd / gentoo 时,编译软件包 编译器推荐使用 cpu 核数 +2 的并行进程数; 但去年按以上思路作过某实用小应用,实际却是在 4 核机器上最大并行开 40 个任务是刚能榨干算力)
    likuku
        5
    likuku  
       2017-01-02 00:56:53 +08:00
    @fjhh "num_of_process 越大,速度越慢,当等于 1 的时候,速度最快" 很怀疑这个运行环境是单核心 /CPU
    CosimoZi
        6
    CosimoZi  
       2017-01-02 01:16:41 +08:00
    日志统计的瓶颈一般都在 io ,多进程没有用
    appleorchard2000
        7
    appleorchard2000  
       2017-01-02 01:36:11 +08:00 via Android
    看了一下,估计是因为数据总量太小,所以分解的成本超过了带来的收益
    eyp82
        8
    eyp82  
       2017-01-02 07:36:51 +08:00
    @appleorchard2000 赞同, 50w 数据, 一个进程都不够吃的
    zmj1316
        9
    zmj1316  
       2017-01-02 08:44:22 +08:00 via Android
    这种 IO 瓶颈的加 CPU 也没用啊
    gdsagdada
        10
    gdsagdada  
       2017-01-02 09:24:33 +08:00
    妹子竟然写的如此一手好代码, no picture you say a egg
    gdsagdada
        11
    gdsagdada  
       2017-01-02 09:25:26 +08:00
    an
    yanzixuan
        12
    yanzixuan  
       2017-01-02 09:46:06 +08:00
    现成的 spark 能干的事情为啥还要自己造轮子?
    P0P
        13
    P0P  
       2017-01-02 13:44:33 +08:00
    python 的解释器还是有全局锁的 https://wiki.python.org/moin/GlobalInterpreterLock ,所以 python 的多线程效率一直不怎么样
    raiz
        14
    raiz  
       2017-01-02 16:08:01 +08:00
    @P0P 不要赖全局锁好不好,明明她用的是多进程。。
    最费时的 IO 操作在进程分支之前已经做了,而每个 worker 进程需要的运算又很少,就一个 M 个判断,所以时间主要都拿去做进程空间拷贝,数据拷贝之类的工作了。

    让妹子注册个账号好沟通一点哦 : p
    22too
        15
    22too  
       2017-01-02 16:28:14 +08:00
    你这个不是磁盘 io 问题啊。
    首先,你只打开了一次文件,所以这个耗时应该是一个定值。
    然后你要的结果是统计,所以你需要把数据分成均等的几份,交给多进程处理,这个也没有问题。
    多进程处理的结果,你放到一个 dict 中。这样相当于多个进程修改一个数据,必然会越处理越慢啊。因为一个修改的时候,另外一个必须等待,所以问题在这里。
    wind3110991
        16
    wind3110991  
       2017-01-02 18:13:46 +08:00
    和楼上说的一样,分布式情境下推荐用 spark 吧
    j5shi
        17
    j5shi  
       2017-01-02 21:45:33 +08:00 via iPhone
    @P0P 正解
    fjhh
        18
    fjhh  
    OP
       2017-01-04 08:48:41 +08:00
    感谢各位,朋友说换至单位 32 核服务器运行单位项目,速度有明显改善。
    我猜测大致如 ipwx 和 appleorchard2000 等朋友们解说,数据总量太小,所以分解的成本超过了带来的收益。
    再谢感谢大家。祝大家 2017 一切顺利。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   1343 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 17:32 · PVG 01:32 · LAX 10:32 · JFK 13:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.