V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
Licsber
V2EX  ›  Python

如何配合 multiprocessing 使用 hashlib 来计算多种摘要?

  •  
  •   Licsber ·
    licsber · 2022-04-17 16:46:46 +08:00 · 2244 次点击
    这是一个创建于 933 天前的主题,其中的信息可能已经有所发展或是发生改变。
    1. 想获取一个大文件(>1GiB)的 md5 、sha1 、sha256 、crc32 、md4 等信息
    2. 只想要文件被完整读取一次

    使用 hashlib 获取摘要的时候明显代码瓶颈在单核 cpu 上
    profile 显示主要都在各个 hashobj 的 update()方法耗时最长

    我看官方文档里有这么一句话:

    Note For better multithreading performance, the Python GIL is released for data larger than 2047 bytes at object creation or on update.
    

    然而并没有发现实际起作用 即 GIL 没有被释放 占用率仍然是 100%cpu
    验证了不是 io 瓶颈 python3.9.2

    第 1 条附言  ·  2022-04-17 19:26:18 +08:00

    总算写完了 感谢#1回复
    是我把GIL想错了
    陷入了cpu占用率100%就想多进程的误区
    GIL是线程的概念 某线程释放了GIL之后其他线程也可以使用
    而不是说主线程释放GIL之后就会接着主线程之后的代码
    想成async类似的机制了

    由于附言超字数了 引用库和线程启动的代码就略去了

    BUF_SIZE = 4 * 1024 * 1024  # 4MiB
    BUF_PRE_FETCH = 64  # 最多消耗 256MiB 额外内存
    
    
    def cal_hashes(f: typing.IO):
        f.seek(0)
    
        res = {
            'finish': False,
        }
        queues = {
            'md5': Queue(),
            'sha1': Queue(),
            'sha256': Queue(),
        }
    
        def _producer(_f: typing.IO):
            while True:
                time.sleep(0.01)
                empty_flag = True
    
                for _, queue in queues.items():
                    if not queue.empty():
                        empty_flag = False
                        break
    
                if empty_flag:
                    count = BUF_PRE_FETCH
                    while count > 0:
                        count -= 1
                        content = _f.read(BUF_SIZE)
                        if not content:
                            res['finish'] = True
                            return
    
                        for _, queue in queues.items():
                            queue.put_nowait(content)
    
                    # 合理的相信算完256M的数据
                    # 至少需要这么久
                    # 树莓派4: 120MiB/s
                    # 8代i5:  370MiB/s
                    time.sleep(0.3)
    
        def _consumer(_algo: str):
            hash_obj = hashlib.new(_algo)
            while True:
                if res['finish'] and queues[_algo].empty():
                    break
    
                try:
                    content = queues[_algo].get(timeout=1)
                except Empty:
                    continue
    
                hash_obj.update(content)
                queues[_algo].task_done()
    
            res[_algo] = hash_obj.hexdigest().upper()
    
        return res
    
    5 条回复    2022-04-18 13:50:13 +08:00
    LeeReamond
        1
    LeeReamond  
       2022-04-17 17:29:48 +08:00   ❤️ 1
    hashlib 是通过 ffi 调用实现的,不需要多进程,直接使用多线程即可释放 GIL ,你说不能释放 GIL 我感觉是你哪里错了。
    Licsber
        2
    Licsber  
    OP
       2022-04-17 17:31:59 +08:00
    @LeeReamond #1 感谢 我刚思考了一会好像理解我哪里想错了
    我现在用 threading 库实现一下 过会贴代码
    Licsber
        3
    Licsber  
    OP
       2022-04-17 19:20:58 +08:00
    终于写完了 单测也测完了 至少我是很满意的
    ```python3
    BUF_SIZE = 4 * 1024 * 1024 # 4MiB
    BUF_PRE_FETCH = 64 # 最多消耗 256MiB 额外内存


    def cal_hashes(f: typing.IO):
    f.seek(0)

    res = {
    'finish': False,
    }
    queues = {
    'md5': Queue(),
    'sha1': Queue(),
    'sha256': Queue(),
    }

    def _producer(_f: typing.IO):
    while True:
    time.sleep(0.01)
    empty_flag = True

    for _, queue in queues.items():
    if not queue.empty():
    empty_flag = False
    break

    if empty_flag:
    count = BUF_PRE_FETCH
    while count > 0:
    count -= 1
    content = _f.read(BUF_SIZE)
    if not content:
    res['finish'] = True
    return

    for _, queue in queues.items():
    queue.put_nowait(content)

    # 合理的相信算完 256M 的数据
    # 至少需要这么久
    # 树莓派 4:120MiB/s
    # 8 代 i5: 370MiB/s
    time.sleep(0.3)

    def _consumer(_algo: str):
    hash_obj = hashlib.new(_algo)
    while True:
    if res['finish'] and queues[_algo].empty():
    break

    try:
    content = queues[_algo].get(timeout=1)
    except Empty:
    continue

    hash_obj.update(content)
    queues[_algo].task_done()

    res[_algo] = hash_obj.hexdigest().upper()

    ths = []
    producer = threading.Thread(target=_producer, args=(f,))
    producer.start()
    ths.append(producer)
    for algorithm in queues.keys():
    consumer = threading.Thread(target=_consumer, args=(algorithm,))
    consumer.start()
    ths.append(consumer)

    for th in ths:
    th.join()

    return res
    ```
    JSPIXiaoHei
        4
    JSPIXiaoHei  
       2022-04-18 10:12:52 +08:00
    写的是 Forensic 工具?
    Licsber
        5
    Licsber  
    OP
       2022-04-18 13:50:13 +08:00
    @JSPIXiaoHei #4 不是( 没这么高级
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5750 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 37ms · UTC 01:46 · PVG 09:46 · LAX 17:46 · JFK 20:46
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.