V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
4ever911
V2EX  ›  Python

Python 多线程 Websockets 退出出错

  •  
  •   4ever911 · 2016-11-12 20:58:28 +08:00 · 2481 次点击
    这是一个创建于 2726 天前的主题,其中的信息可能已经有所发展或是发生改变。
    我的程序创建了一个线程在后台接收消息,

    def threadFunc():

    ....dosomework

    while self._active:
    message = yield from wsclient.recv()
    ...dosomework

    .....


    我的程序要关闭, 所以想通知这个线程退出, 所以, 我在外面 设置 上面这个 while 的判断标志 self._active 为 False ,希望下个循环就退出线程。

    但是问题是, 如果一段时间, recv 没有数据读的时候, wsclient.recv() 这个地方会一直阻塞住了, 不会去调 while 的下一次循环。。。。

    有什么方法可以退出呢? 这个能设置一个 timeout 吗? 我尝试在创建得到 wsclient 的时候 wsclient.settimeout(3) 好像这个 recv 还是会阻塞住。

    而 Python 又不能像 c++那样,直接粗暴的 terminate thread , 请问, 有什么好的办法吗?
    11 条回复    2016-11-13 15:27:11 +08:00
    raysonx
        1
    raysonx  
       2016-11-12 21:10:28 +08:00
    你直接用 CTRL+C 终止的话, wsclient.recv()应该会触发中断异常才对。
    raysonx
        2
    raysonx  
       2016-11-12 21:19:31 +08:00
    我不是专门的 Python 程序员,查了一下, Python 里面的信号默认只会被主线程捕获,猜测应该是对子线程设置了 signal mask 。
    楼主或许可以尝试这两个方法:
    1. 主线程中处理 SIG_INT ,强制终止子线程。
    2. 或者在子线程中设置一个 bool 变量,每次循环前检查 bool 变量的值决定继续执行或退出。
    4ever911
        3
    4ever911  
    OP
       2016-11-12 21:50:01 +08:00
    @raysonx 你可能没仔细看上面的帖子, 也可能我没说清楚。

    1. python 不像 c++那样提供强制退出线程的函数。 有的时候,你 CTRL+C ,主程序退出了, 你用 Process Monitor 海能看到有代码在跑。

    2. 我设置了这样的变量让他退出, 现在关键是 recv 那个函数阻塞住不返回, 也就不会去访问那个变量。我尝试设置超时,也可能是我设置有问题。 对 Python 的异步还没完全搞明白。
    guyskk
        4
    guyskk  
       2016-11-12 21:59:12 +08:00 via Android
    标准库里的 signal 模块可以添加处理函数
    raysonx
        5
    raysonx  
       2016-11-12 22:13:29 +08:00
    我去 Google 查了一下, Python 的线程实现有一些奇怪的行为。
    比如,主线程如果 block 在 thread.join()上,是不能捕获消息的(可能和全局解释器锁有关)。

    我拿 StackOverflow 上的一个不通的示例代码作了些修改,使 CTRL+C 可以成功工作了(这里在主线程中调用 signal.pause()等待 SIGINT , join()不行):

    import signal, sys, threading, time

    THREADS = []

    def handler(signal, frame):
    global THREADS
    print "Ctrl-C.... Exiting"
    for t in THREADS:
    t.alive = False
    sys.exit(0)

    class thread(threading.Thread):
    def __init__(self):
    self.alive = True
    threading.Thread.__init__(self)


    def run(self):
    n = 0
    while self.alive:
    n = n + 1
    print("round %s" %n)
    time.sleep(1)
    pass

    def main():
    global THREADS
    t = thread()
    t.start()
    THREADS.append(t)
    signal.pause()
    for t in THREADS:
    t.join()

    if __name__ == '__main__':
    signal.signal(signal.SIGINT, handler)
    main()


    一些资料:
    http://stackoverflow.com/questions/1635080/terminate-a-multi-thread-python-program
    http://stackoverflow.com/questions/19652446/python-program-with-thread-cant-catch-ctrlc
    http://stackoverflow.com/questions/631441/interruptible-thread-join-in-python
    raysonx
        7
    raysonx  
       2016-11-12 22:21:08 +08:00
    关于 socket 的 setTimeout(),我没有测试,应该是不会有问题的。怀疑主线程没有捕获到 SIGINT 。
    4ever911
        8
    4ever911  
    OP
       2016-11-13 00:55:38 +08:00 via iPhone
    好像还是用抛异常来解决超时杀线程比较方便


    import threading
    import time
    import inspect
    import ctypes

    def _async_raise(tid, exctype):
    """raises the exception, performs cleanup if needed"""
    tid = ctypes.c_long(tid)
    if not inspect.isclass(exctype):
    exctype = type(exctype)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
    if res == 0:
    raise ValueError("invalid thread id")
    elif res != 1:
    # """if it returns a number greater than one, you're in trouble,
    # and you should call it again with exc=NULL to revert the effect"""
    ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
    raise SystemError("PyThreadState_SetAsyncExc failed")

    def stop_thread(thread):
    _async_raise(thread.ident, SystemExit)

    class TestThread(threading.Thread):
    def run(self):
    print "begin"
    while True:
    time.sleep(0.1)
    print "end"
    if __name__ == "__main__":
    t = TestThread()
    t.start()
    time.sleep(1)
    stop_thread(t)
    print "stoped"
    dsphper
        9
    dsphper  
       2016-11-13 01:24:19 +08:00
    将线程所处理的 socket 句柄关闭即可(也就是通知客户端我将你的连接关闭了。), recv 将自动抛异常关闭了。
    dsphper
        10
    dsphper  
       2016-11-13 01:24:46 +08:00
    哦,对注意捕获 recv 异常。
    4ever911
        11
    4ever911  
    OP
       2016-11-13 15:27:11 +08:00
    算了,算了, 不麻烦了, 我还是 setDaemon 方式退出拉倒。。。后事不料理了。。。。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2273 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 06:04 · PVG 14:04 · LAX 23:04 · JFK 02:04
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.