V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
dofine
V2EX  ›  Python

请教 Python 的 producer-consumer 模型与数据库操作问题

  •  
  •   dofine ·
    dofine · 2016-12-03 12:47:54 +08:00 · 2333 次点击
    这是一个创建于 2931 天前的主题,其中的信息可能已经有所发展或是发生改变。

    最近在学习生产者消费者模型,假如有如下需求:从某个网址读取 log ,提取出用户 id 然后发送邮件。在不涉及数据库操作的情况下,下面的示例代码(打不开 gist 只能贴在这里了。。)可以运行。

    但现在问题来了,如果需要将每一次读取的 log 内容和发送记录都存储在数据库里,保证同一个 id 只收到一次,这个数据库操作应该放在哪个线程里面呢? 用的是 kennethreitz 大神的 records + Python 自带的 Sqlite3 ,尝试了在 2 个 Thread __init__ 的时候分别连接同一个数据库文件,但是提示在一个线程内创建的连接不能在另一个线程内使用,这里有些迷惑。 Or 这个需求可以用另外的原理完成?

    import threading
    import time
    import Queue
    
    Q = Queue.Queue()
    class ProducerThread(threading.Thread):
        def __init__(self, out_q):
            threading.Thread.__init__(self, name='Producer')
            self.q = out_q
    
        def run(self):
            while True:
                # 这里读取 log
                log = getlog()
                if log:
                    for each in log:
                        self.q.put(each)
                else:
                    print('NO MORE LOG')
                    time.sleep(60)
    
    
    class ConsumerThread(threading.Thread):
        def __init__(self, out_q):
            threading.Thread.__init__(self, name='Consumer')
            self.q = out_q
    
        def run(self):
            while True:
                # consume the data.
                chunk = self.q.get()
                print(u'开始发送...{0}'.format(chunk))
                time.sleep(2)  # 模拟发送
                self.q.task_done()
    
    
    def main():
        t = ProducerThread(Q)
        t.start()
        c = ConsumerThread(Q)
        c.start()
        Q.join()
    
    
    if __name__ == "__main__":
        main()
    
    8 条回复    2016-12-03 14:19:04 +08:00
    lonelinsky
        1
    lonelinsky  
       2016-12-03 13:16:30 +08:00
    数据库本身就有读写锁,直接在两个线程里面各自连接数据库不就好了嘛
    dofine
        2
    dofine  
    OP
       2016-12-03 13:24:06 +08:00
    @lonelinsky 现在是有 3 个线程,如果我连接数据库写在 producer 的 `__init__` 里面,会提示这个 sqlite3 object 是在主线程里创建的呢。。我的理解是这样是在 producer 这个线程内创建的啊。。?

    ```python
    def __init__(self):
    self.db = sqlite3.connect('db.db')
    ```
    lonelinsky
        3
    lonelinsky  
       2016-12-03 13:34:57 +08:00   ❤️ 1
    @dofine
    t = ProducerThread(Q) 这个时候就会调用了__init__了,所以是在主线程的,
    t.start() 会调用 run 方法,你把连接的代码挪到 run 方法里面去应该就好了
    dofine
        4
    dofine  
    OP
       2016-12-03 13:48:40 +08:00
    @lonelinsky en ,测试了一下目前是好的。。现在要处理两个线程同时写入这个数据库的问题了。。是不是又要开一个队列执行数据库的写操作呀。
    lonelinsky
        5
    lonelinsky  
       2016-12-03 13:53:19 +08:00
    @dofine 不需要,你直接写就好,数据库层有读写锁保护的,没有问题的
    lonelinsky
        6
    lonelinsky  
       2016-12-03 13:55:44 +08:00
    当然,如果你的写并发量大的话,用队列,然后起一个专门的数据库写线程,性能会好一点
    dofine
        7
    dofine  
    OP
       2016-12-03 14:03:01 +08:00
    @lonelinsky python + sqlite3 好像不行呀,万一我那个写入 log 和写入发送记录同时写入就会报错== (其实这两个放在两个数据库文件里会不会更好?
    lonelinsky
        8
    lonelinsky  
       2016-12-03 14:19:04 +08:00
    @dofine
    看了下文档果然,不过你可以控制下超时时间 https://docs.python.org/2/library/sqlite3.html#sqlite3.connect
    针对你的需求来说,反正是完全独立的表,之间不需要建立关联的直接分两个数据库确实比较好
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2618 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 14:56 · PVG 22:56 · LAX 06:56 · JFK 09:56
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.