V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
yagamil
V2EX  ›  Python

请问下面异步嵌套的 await 为什么会导致 mongo 入库失败

  •  
  •   yagamil · 2020-11-25 16:16:27 +08:00 · 1447 次点击
    这是一个创建于 1503 天前的主题,其中的信息可能已经有所发展或是发生改变。

    请问下面异步嵌套的 await 为什么会导致 Task <Task pending coro=<fetch() running at xxxx.py:45> cb=[gather.<locals>._done_callback() 回调失败 ?

    加入了异步插入 mongo 后导致的,去掉入库那一段就没问题。

    
    mongo_client = AsyncIOMotorClient(connect_uri)
    collection = mongo_client['db_stock']['new_stock_ttjj']
    
    def parse_json(content):
       # 解析
        content += ';function getV(){return hsEnHLwG;}'
        ctx = execjs.compile(content)
        result = ctx.call('getV')
        return result
    
    async def update_data(data):
       # 异步 mongo 入库 
        code = data['securitycode']
        found =  await collection.find_one({'securitycode':code})
        if not found:
            await collection.insert_one(data)
    
    
    async def fetch(session,page):
    
        async with session.get(home_url.format(page),headers=headers) as resp:
            content = await resp.text()
    
            try:
                js_content = parse_json(content)
                for stock_info in js_content['data']:
                    securityshortname = stock_info['securityshortname']
    
                    print(securityshortname)
                    await update_data(stock_info)
            except Exception as e:
                print(e)
    
    
    async def main():
        async with aiohttp.ClientSession() as session:
            async with session.get(home_url.format(1), headers=headers) as resp:
    
                content = await resp.text()
                js_data = parse_json(content)
                pages = js_data['pages']
                tasks =[]
                for page in range(1,pages+1):
                    task = asyncio.ensure_future(fetch(session,page))
                    tasks.append(task)
    
                await asyncio.gather(*tasks)
    
    asyncio.run(main())
    
    
    4 条回复    2020-11-25 17:25:26 +08:00
    keepeye
        1
    keepeye  
       2020-11-25 16:26:50 +08:00
    task = asyncio.ensure_future(fetch(session,page))
    似乎可以改成
    task = fetch(session,page)
    yagamil
        2
    yagamil  
    OP
       2020-11-25 16:39:09 +08:00
    @keepeye 问题不在这里。
    yagamil
        3
    yagamil  
    OP
       2020-11-25 17:12:23 +08:00
    找到原因:
    def run(main, *, debug=False):
    if events._get_running_loop() is not None:
    raise RuntimeError(
    "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
    raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()

    asyncio.run 内部定义了一个 loop,
    moto 初始化的时候内部也用了这个。

    用 loop 定义,motor 的定义放在 loop 后面。
    n37r09u3
        4
    n37r09u3  
       2020-11-25 17:25:26 +08:00
    放后面是用同一个 loop 了 吧?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2199 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 29ms · UTC 16:12 · PVG 00:12 · LAX 08:12 · JFK 11:12
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.