有幸写过一个用 yeild 和 guzzle/http 的请求池写过一个工具类
/**
* 异步/并发/批量请求 (慎用)
* @
param string $url 请求地址
* @
param array $allParams 所有请求参数 int[{once request params}]
* @
param array $options 请求配置 ['headers'=>[]]
* @
param callable|null $onSuccess 请求成功回调函数 function(ResponseInterface $response,int $index,array $params){}
* @
param callable|null $onFailure 请求失败回调函数 function(\Exception $exception,int $index,array $params){}
* @
param callable|null $onComplete 请求完成回调函数
* @
return void
* @
author LiaoYongjian
* @
date 2024-01-19 17:43
*/
public function postAsyncRequests(string $url, array $allParams, array $options = [], callable $onSuccess = null, callable $onFailure = null, callable $onComplete = null)
{
$headers = $options['headers'] ?? [];
$client = $this->getCli();
//生成器 用于生成异步请求
$generator = function () use ($client, $allParams, $url, $headers, $onSuccess, $onFailure) {
foreach ($allParams as $index => $params) {
yield function () use ($client, $url, $headers, $index, $params, $onSuccess, $onFailure) {
//异步请求
return $client->postAsync($url, [
'headers' => $headers,
'json' => $params,
])->then(
function ($response) use ($onSuccess, $index, $params) {
if ($onSuccess) {
$onSuccess($response, $index, $params);
}
return $response;
},
function (\Exception $e) use ($onFailure, $index, $params) {
if ($onFailure) {
$onFailure($e, $index, $params);
}
throw $e;
}
);
};
}
};
//异步请求池
$pool = new Pool($client, $generator());
//异步请求池的回调函数
$responses = $pool->promise()->wait();
if ($onComplete){
$onComplete($responses);
}
}