V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
uti6770werty
V2EX  ›  Python

请教,从 FIFO 队列中取出 SQL 语句,写入到 MySQL,如何可以拉满拉爆性能?

  •  
  •   uti6770werty · 100 天前 · 1502 次点击
    这是一个创建于 100 天前的主题,其中的信息可能已经有所发展或是发生改变。

    情况: 在 main()

        from multiprocessing import Manager
        from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
        
        # 建立全局变量字典
        GOLVAR = Manager().dict()
        # SQL 处理队列
        SQLQueue = Manager().Queue()
         
        # 处理 SQL 队列功能,一个单独进程在运行
        ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1)
        # 启动
        ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd, SQLServerInfo, SQLQueue, GOLVAR)
         
        def AA(someData,sqlqueue):
         	#略
         	XXX
         	sqlCommand = XXX
         	sqlqueue.put(sqlCommand)
         	retrun
        
        def BB(someData,sqlqueue):
         	#略,和 AA 结构一样,最后往队列里 put(sqlCommand)
         	sqlqueue.put(sqlCommand)
         	retrun
         		
        def CC(someData,sqlqueue):
         	#略
         	sqlqueue.put(sqlCommand)
         	retrun    
         		
       # 开动制造
        while True:
      	# AA,BB,CC,DD 等处理函数按顺序,循环制造 SQL 语句,运行 AA,BB,CC,DD 等处理数据的函数处理上,其实几乎都不怎么占 CPU,I/O,最后向 Manager().Queue() put 入大量 SQL 语句
         		
         	# 进去的 SQL 语句只有四种,
         	# INSERT INTO tblname (x) VALUE (x);
         	# INSERT INTO ... SELECT FROM XXX(最复杂也就嵌了 3 层);
         	# UPDATE SET...
         	# DELETE FROM...
         		
         	# SQLQueue 量高的时候 1 秒进 4 万条,低的时候,200 秒不进 1 条
           	time.sleep(100)
    
    
    # 处理 SQL 队列
    def procSQLcmd(sqlinfo, sqlqueue, golvar):
        import time
        import datetime
        from dbutils.pooled_db import PooledDB
        import pymysql
        from concurrent.futures import ThreadPoolExecutor
        from MYFunc import SQLcmdData
        from myFunc import colrRedB
        from myFunc import TranDicttoSQLcmd
    
        from warnings import filterwarnings
        filterwarnings("error", category=pymysql.Warning)
    
        POOL = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=600,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
            mincached=5,  # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
            maxcached=5,  # 链接池中最多闲置的链接,0 和 None 不限制
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错
            maxusage=None,  # 一个链接最多被重复使用的次数,None 表示无限制
            setsession=[],  # 开始会话前执行的命令列表。
            ping=1,  # ping MySQL 服务端,检查是否服务可用。
            host=sqlinfo['ip'],
            port=sqlinfo['port'],
            user=sqlinfo['user'],
            password=sqlinfo['password'],
            database=sqlinfo['database'],
            charset=sqlinfo['charset']
        )
    
        DBconn = POOL.connection()
        
        def exeCu(conn, sqltext):
            try:
                cur = conn.cursor()
                cur.execute(sqltext)
                # cur.commit()
                cur.close()
            except pymysql.Warning as e:
                # print(f'#detial:{str(e)}\n',colrRedB(f"SQL ERR: {sqltext}"))
                sqlsqlcmd = sqltext.replace("'","\\'").replace('"','\\"')
                resonsql = str(e).replace("'","\\'").replace('"','\\"')
                SQLErrorDict = {'sqlcmd': sqlsqlcmd,
                                'reson': resonsql,
                                'UpdateTime': datetime.datetime.now().replace(microsecond=0)}
                SQLCmd = TranDicttoSQLcmd('MYSQLERRLog', SQLErrorDict, None)
                SQLcmdData(sqlinfo, SQLCmd)
            return
    
        while True:
            if sqlqueue.qsize() == 0:
                # 开关
                if golvar['stopsqlflag'] == False:
                    time.sleep(2)
                    break
    
            # SQL 语句执行,必须按队列 FIFO 顺序写入
            while not sqlqueue.empty():
                with ThreadPoolExecutor(1) as executor:
                    executor.submit(exeCu, DBconn, sqlqueue.get())
    
        DBconn.close()
        return		
    
    请教问题:
    1 、这样的设计,写入每秒是 800 ~ 2500 条左右,虽然能做到对 MySQL 服务器写入浪涌的削峰填谷,但 SQLQueue 在峰值的时候,很容易一下就超了 17 万,太多的未写入,也影响了 main()的大循环
    2 、从 MySQL 的服务器的性能判断来看,
    SHOW STATUS WHERE (Variable_name like '%thre%' OR Variable_name like '%conn%' OR Variable_name like '%cache%');
    SHOW PROCESSLIST; 
     MySQL 服务器其实跟睡着了没区别,瞬时链接数 3,4 个,没有感受到什么事情(是对 PooledDB 的用法有问题?)
    3 、以前以为是服务器 I/O 的问题,换 8 核 16 线程 CPU 的机器,换上 SSD,内存 64GB,my.cnf 的 cache 调到 65%,都没有太大改善
    4 、请教如何调整做法,从 SQLQueue 取出 SQL 语句怼服务器,可以拉满拉爆?
    5 、小范围,小应用,上大工业架构的方式就算了,折腾不起。。。
    
    10 条回复    2021-08-30 10:16:35 +08:00
    BBCCBB
        1
    BBCCBB  
       100 天前
    看你这个貌似没用到批量写入? 可以尝试批量
    heyjei
        2
    heyjei  
       100 天前
    代码没细看,但思路其实很简单,攒一波数据,到 1 千条或者 1 千条没到但 1 秒钟到了,再批量输入。如果批量写入的方案还是不满足,可以把数据写入到文件里,然后再定时调用 load data infile,load data infile 的写入速度可以达到磁盘的最大 IO 速度(前提是使用 MyISAM,并且没有索引)
    heyjei
        3
    heyjei  
       100 天前
    还有一种改动最小的一种方式:

    我们的 SQL 语句是 insert into table_name (column1, column2) values (value1, value2)

    在下面的语句中,你不要把整个语句 put 进去,把 (value1, value2) put 进去
    sqlqueue.put(sqlCommand)

    在下面的语句,get 之后,不要立即执行,攒够 1000 个数据,或者 1 秒超时,然后拼接 SQL 成完整的语句并执行。
    # SQL 语句执行,必须按队列 FIFO 顺序写入
    while not sqlqueue.empty():
    with ThreadPoolExecutor(1) as executor:
    executor.submit(exeCu, DBconn, sqlqueue.get())
    uti6770werty
        4
    uti6770werty  
    OP
       100 天前
    @BBCCBB

    @heyjei

    队列里,不全是 INSERT INTO 。。,也许还有偶然一两个 ALERT 也不一定,要按 FIFO 顺序,所以就不好套批量模板了。。。

    by the way,有试过 sqlcmd + ";" + sqlcmd,这样操作过,但似乎 PooledDB.conn.cursor().excute 不支持这种多语句组装命令执行? 前几天有试过,当时没成功,没研究下去,后面去研究如何高并发去了,结果更迷糊,就这个场合用,现有的高并发非常折腾。。。
    liprais
        5
    liprais  
       100 天前
    你的 mysql 服务器有几个核心?
    600 个 connection 太多了
    uti6770werty
        6
    uti6770werty  
    OP
       100 天前
    上面忘了说一个事情,就是就算是峰值 17W 条数据里也好,平时 5,6 千条也好,很多时候都队列的数据,是表里已经有的了,表的索引机制已经避免了重复插入数据,所以存表里的数据量其实不多的。。。

    @liprais 8 核,16 线程,CentOS 6 + MySQL 5.5,按月份分表,最多的表数据不过 800 万
    uti6770werty
        7
    uti6770werty  
    OP
       100 天前
    @liprais PooledDB 的 connection 很奇怪的,它这里 600 只是最高允许 600 而已,我现在只是一台数据处理电脑向 MySQL 写数据而已,SHOW PROCESSLIST 看,也就 7,8 条连接
    noparking188
        8
    noparking188  
       99 天前
    所以这段代码真是生产上用的嘛?
    看这段代码是用线程做并发,线程开销比较大,这种 IO 密集任务不大行,可以换协成程测下效果,更轻量级,几年前写过类似脚本用的 gevent,ayncio 我没研究过,楼主也可以看看,个人感觉主要是想办法提高并发处理能力吧,和 MySQL 没啥关系
    说的不对还望指正
    noparking188
        9
    noparking188  
       99 天前
    个人感觉比较简单的做法就是换 redis 做队列,生成 SQL 单独一个程序跑,消费队列数据发 SQL 请求的一个程序,要提高并发起简单地多个进程就行了,多进程+多线程(协程)的方式,用 supervisor 托管更好
    当然这样的方案是建立在示例代码用线程并发造成网络 IO 请求瓶颈的猜测上
    todd7zhang
        10
    todd7zhang  
       98 天前
    没动啊,既然严格要求 FIFO, 为啥去执行 SQL 的时候,还要开 ThreadPoolExecutor ?这种情况,从 sqlqueue 拿出来虽然是顺序的,但是感觉执行过去就可能乱序啊,毕竟 while 里面在不停的开 executor 。

    我感觉都不用 POOL, 就一个 conn 不停的执行 sql 就好了?
    with conn.cursor() as cr:
    while not sqlqueue.empty():
    cr.execute(sqlqueue.get())
    关于   ·   帮助文档   ·   API   ·   FAQ   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   1546 人在线   最高记录 5497   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 20ms · UTC 17:16 · PVG 01:16 · LAX 09:16 · JFK 12:16
    ♥ Do have faith in what you're doing.