Guzzle 源码分析

Guzzle 是一个非常强大而且稳定的 HTTP client。不同于一般的 cURL 封装组件,Guzzle 内部使用了多种请求方式,来实现 HTTP 请求,cURL 只是最常用的方式,并且 Guzzle 提供了强大的异步、并发功能,使得构建一个 HTTP 请求十分容易而且易拓展。现在 Guzzle 已经被 Drupal 整合到核心模块中了,可靠性不言而喻。Guzzle 目前使用了 PSR-7 规范,拓展性和兼容性也更加优秀了。之前在一次重构记录中提到过,但是没有深入分析过,这次决定介绍一些使用例子并深入分析其底层实现原理,如果有问题,请留言指出,共同进步。

注意:为了尽量缩减阅读量,部分源码分析只列出了关键步骤。

环境

本文使用的 Guzzle 版本为 6.3.0,composer.json 文件内容为

{
    "require": {
        "guzzlehttp/guzzle": "^6.3"
    }
}

配置

Guzzle 的各种配置都和 HTTP 请求相关,比如是否跟踪 302 跳转,是否携带 cookies,是否使用 SSL、超时等等。

配置项是以数组形式在创建 client 对象的时候传入的,所有的配置都在这里。Guzzle 会提供一个默认配置,会和自定义配置进行合并,并优先自定义配置

public function __construct(array $config = [])
{
  $this->configureDefaults($config);
}

private function configureDefaults(array $config)
{
  // 自定义配置和默认配置,在这里合并,并赋值给了成员变量
  $this->config = $config + $defaults;
}

比如这样:

$config = [
  'allow_redirects' => [
    'max'     => 5,
    'referer' => true,
  ],
  'http_errors'     => false,
  'decode_content'  => true,
  'cookies'         => true,
  'connect_timeout' => 1.5,
  'timeout'         => 2.5,
  'headers'         => [
    'User-Agent' => 'test client for chengxiaobai.com',
  ],
];

$client = new \GuzzleHttp\Client($config);

你也可以在构建请求的时候传入配置,这个时候会和构造方法中传入的配置合并,并且只对当前请求有效

private function prepareDefaults($options)
{
  $defaults = $this->config;
  // 这里这是赋值给了局部变量,所以只对当前请求有效
  $result = $options + $defaults;
  return $result;
}

比如这样:

$client = new \GuzzleHttp\Client($config);

$client->request('GET', 'https://www.chengxiaobai.com/',
         [
           'allow_redirects' => [
             'max'     => 1,
             'referer' => false,
           ],
         ]);

特殊的 handler 参数

handler 参数比较特殊,它必须是闭包,并且参数为 Psr7\Http\Message\RequestInterface 和一个 array 类型的参数,并且必须返回 GuzzleHttp\Promise\PromiseInterface 或者在成功时满足 Psr7\Http\Message\ResponseInterface

如果按照面向对象的来描述的话,就是你必须得实现一个这样的接口,Chengxiaobai\handler

interface Chengxiaobai
{
    /**
     * handler interface
     *
     * @param RequestInterface $request
     * @param array            $options
     *
     * @return Psr\Http\Message\ResponseInterface | GuzzleHttp\Promise\PromiseInterface
     */
    public function handler(Psr\Http\Message\RequestInterface $request,array $options);
}

这样对 handler 结构就很明确了吧。我们看源码怎么解析 handler 配置的。

public function __construct(array $config = [])
{
  if (!isset($config['handler'])) {
    // 创建一个默认的 handler 栈
    $config['handler'] = HandlerStack::create();
  } elseif (!is_callable($config['handler'])) {
    throw new \InvalidArgumentException('handler must be a callable');
  }
}

很明显,如果自定义了 handler 就会放弃 Guzzle 默认提供的 handlerStack。除非你有足够的把握,请不要随意操作。

举个自定义 handler 操作的例子,比如对任意一个请求都返回 404。

$client = new \GuzzleHttp\Client($config);

$response = $client->request('GET', 'www.chengxiaobai.com/archives/',
               [
                 'handler' => function (\Psr\Http\Message\RequestInterface $request, array $options) {
                   return new \GuzzleHttp\Psr7\Response(404);
                 },
               ]);

echo $response->getStatusCode();// 404

上面我们说 Guzzle 本身自带了一些 handler,我们先看看默认创建的 handlerStack 都是些什么,先不管每个 handler 里面的实现,在请求处理阶段会详细说。

public static function create(callable $handler = null)
{
  // 这里定义了底层请求实现方法
  $stack = new self($handler ?: choose_handler());
  // 下面都会添加一些 Middleware 中间件
  $stack->push(Middleware::httpErrors(), 'http_errors');
  $stack->push(Middleware::redirect(), 'allow_redirects');
  $stack->push(Middleware::cookies(), 'cookies');
  $stack->push(Middleware::prepareBody(), 'prepare_body');

  return $stack;
}

注意 choose_handler 这个方法,这个方法决定了实现请求的底层方法,通过它能让我们对 Guzzle 请求的实现的底层方法有个初步了解,也就是,所有的请求都是通过它发送出去的。仔细看源码注释,很关键。

function choose_handler()
{
    $handler = null;
    // 判定 curl 方法,如果并发和常规 curl 同时存在
    if (function_exists('curl_multi_exec') && function_exists('curl_exec')) {
        // 注册并发 curl 为默认请求方式,常规 curl 为同步请求方式
        $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());
    } elseif (function_exists('curl_exec')) {
        // 如果两种 curl 方法同时只有一个存在,则优先常规 curl
        $handler = new CurlHandler();
    } elseif (function_exists('curl_multi_exec')) {
        $handler = new CurlMultiHandler();
    }
  // 如果 allow_url_fopen 开启
    if (ini_get('allow_url_fopen')) {
        $handler = $handler
            // 已有 handler ? 再注册一个流处理 handler 
            ? Proxy::wrapStreaming($handler, new StreamHandler())
            // 否则只有流处理 handler
            : new StreamHandler();
    } elseif (!$handler) {
        throw new \RuntimeException('GuzzleHttp requires cURL, the '
            . 'allow_url_fopen ini setting, or a custom HTTP handler.');
    }

    return $handler;
}

创建完 handler 后,会往 stack 中添加一些 middleware,也就是中间件。简单介绍下,push 函数第一个参数是闭包,第二个参数是字符串,中间件的名字,middleware 主要由闭包组成,可能有的 middleware 嵌套有点多,显得有点复杂,但是无论结构如何复杂,本质上就是用来处理各种请求数据,其结构类型和 Chengxiaobai\handler 一样。

需要深入了解 Handlers 和 Middleware 的可以点击这里看官方文档,个人觉得需要对闭包掌握的比较好,才能很好的理解其设计思路。

根据上面的源码分析你可能注意到,系统默认提供的 handler 是以对象的形式存在的。但其真正使用的时候是当做闭包使用的,这里介绍的是真正发挥作用的闭包结构,而不是表面的 HandlerStack 对象。后面” 处理请求 “章节会详细介绍。

构建请求

其实所有的请求在处理上都是异步的,同步请求只不过是异步请求构建后立即要求返回结果,异步转同步。但是异步和同步的请求构建都是类似的,不同处我会说明。

public function request($method, $uri = '', array $options = [])
{
  $options[RequestOptions::SYNCHRONOUS] = true;
  // requestAsync 就是异步请求,不过直接调用了 wait 转同步
  return $this->requestAsync($method, $uri, $options)->wait();
}

请求的 uri 参数

如果你在配置中定义了 base_uri 参数,这个时候可以使用相对地址,如果没有,则不支持相对地址,Guzzle 并没有帮你校验最终 uri 参数是否正确,只有等到请求发出去了,才知道 uri 是否正确。

private function buildUri($uri, array $config)
{
  // for BC we accept null which would otherwise fail in uri_for
  $uri = Psr7\uri_for($uri === null ? '' : $uri);

  if (isset($config['base_uri'])) {
    $uri = Psr7\UriResolver::resolve(Psr7\uri_for($config['base_uri']), $uri);
  }

  // 这里使用了 PSR-7 规范,返回的是一个实现了 UriInterface 的对象
  return $uri->getScheme() === '' && $uri->getHost() !== '' ? $uri->withScheme('http') : $uri;
}

比如这样就会报错

$client = new \GuzzleHttp\Client();
$response = $client->request('GET', '/history.html');
/**
 * ountput :
 * Fatal error: Uncaught GuzzleHttp\Exception\RequestException:
 * cURL error 3: <url> malformed (see http://curl.haxx.se/libcurl/c/libcurl-errors.html)
 * in /app/vendor/guzzlehttp/guzzle/src/Handler/CurlFactory.php on line 187
 */

详细的规则可以参考 RFC 3986, section 2官方帮我们整理了一些快速了解的例子。我这里梳理出 4 种情况。

base_uri uri result
chengxiaobai.com/first/ /second chengxiaobai.com/second
chengxiaobai.com/first/ second chengxiaobai.com/first/second
chengxiaobai.com/first /second chengxiaobai.com/second
chengxiaobai.com/first second chengxiaobai.com/second

保险的情况就是每次都使用绝对路径就好了,但是有时候相对路径在做爬取的时候很有用,依据实际需求使用。

构建 reqsest

Guzzle 内部使用的 request 对象都是 Psr\Http\Message\RequestInterface 的实现,这样只要你能按照 PSR-7 的规范来就很容易拓展 Guzzle。

这里再次提醒大家,modern PHP 开发,应该遵循 PSR 规范,有利于社区更好的协作和稳健发展。

public function requestAsync($method, $uri = '', array $options = [])
{
  $request = new Psr7\Request($method, $uri, $headers, $body, $version); 
  return $this->transfer($request, $options);
}

丰富 request

transfer 的结构类型和 Chengxiaobai\handler 一样。

private function transfer(RequestInterface $request, array $options)
{
  // 这个方法会根据你的请求类型,构建更具体的请求对象
  $request = $this->applyOptions($request, $options);
  $handler = $options['handler'];
}

applyOptions 从名字能看出来,这个方法会根据你的配置,构建出相匹配的 request 对象。比如根据请求类型的不同,进行参数 encode,设置 body 比如 json、stream,设置 header 等请求细节。

注意配置传入的是一个引用,所以里面对配置的任何修改,都会影响后续操作。

private function applyOptions(RequestInterface $request, array &$options)
{
    // 各种判定,修改 $options,如果有没覆盖到的,会新生成一个 $modify 说明需要重新构建 $request
    
    // 构建新的对象方法
    $request = Psr7\modify_request($request, $modify);
      
  return $request;
}

如果没有需要修改的,就直接返回,如果有的话,会重新构建一个新的 request 对象。

注意需要的参数,有些构建参数是从 $changes 取的,但是有些是从原本的 $request 对象取的,本质上就是,有新的就用新的,没有就用老的保持不变。

function modify_request(RequestInterface $request, array $changes)
{
    if (!$changes) {
        return $request;
    }

  return new Request(
    isset($changes['method']) ? $changes['method'] : $request->getMethod(),
    $uri,
    $headers,
    isset($changes['body']) ? $changes['body'] : $request->getBody(),
    isset($changes['version'])
    ? $changes['version']
    : $request->getProtocolVersion()
  );
}

promise 简介

关于 promise,属于 guzzlehttp/promises 类库,是一个很值得学习的类库,有机会我会专门分析下它的实现原理,目前我们还是着重分析请求实现过程。

通看源码会发现,虽然 Guzzle 内部大量使用了 promise 并且夹杂着闭包很复杂,但是 promise 发挥的作用都是一样的。目前可以这么理解,就是 promise 是一个状态机,它有三种状态:等待、满足、拒绝。

下面的这个例子,只是示例会如何执行,promise 规范是有各种要求的,具体见 Promises/A + 规范,Guzzle 使用的 promise 也是该规范的一个实现。

$promise = new Promise(
    function () {
        echo 'wait';
    },
    function () {
        echo 'cancle';
    }
);
$promise->then(
    function () {
        echo 'onFulfilled';
    },
    function () {
        echo 'onRejected';
    }
)->then(
    function () {
        echo 'onFulfilled';
    },
    function () {
        echo 'onRejected';
    }
);

从等待状态开始执行,满足了就执行 onFulfilled,拒绝了就执行 onRejected,一连串下来,依靠不同的状态去执行不同的方法,配合 HTTP 请求要么成功要么失败,不会有第三种状态的场景,就可以很顺畅的理解了。

private function transfer(RequestInterface $request, array $options)
{
  $handler = $options['handler'];
  try {
    return Promise\promise_for($handler($request, $options));
  } catch (\Exception $e) {
    return Promise\rejection_for($e);
  }
}

成功就是 promise_for 失败就是 rejection_for。

promise_for

这个方法主要是用来保证返回的是一个 promise 对象,因为经过 $handler 处理后的值可能是一个 promise 对象 ($handler 如何处理紧接着会说),也能是一个 response 对象,也可能是一个异常,所以需要对对数据做一个 “清洗转换”,并返回一个满足状态的 promise。

function promise_for($value)
{
    // 如果是一个 promise 对象就直接返回
    if ($value instanceof PromiseInterface) {
        return $value;
    }
    // 如果是一个包含 then 方法的对象,会把它转换成一个 promise 对象
    if (method_exists($value, 'then')) {
        // 如果里面有 wait、cancel、resolve、reject 等方法,会把它添加进去作为默认方法,否则置为 null
        $wfn = method_exists($value, 'wait') ? [$value, 'wait'] : null;
        $cfn = method_exists($value, 'cancel') ? [$value, 'cancel'] : null;
        $promise = new Promise($wfn, $cfn);
        $value->then([$promise, 'resolve'], [$promise, 'reject']);
        return $promise;
    }
  // 前俩者都不满不足的情况下,直接返回一个满足状态的 promise。
    return new FulfilledPromise($value);
}

rejection_for

异常情况会走入到 rejection_for 方法。同理进行 “数据清洗”,并返回一个拒绝状态的 promise。

function rejection_for($reason)
{
    if ($reason instanceof PromiseInterface) {
        return $reason;
    }

    return new RejectedPromise($reason);
}

handler 处理

还是 transfer 方法,在传给 promise_for 之前,先调用了一个 $handler,也就是配置中的 handler 函数。接着就是返回一个 promise 对象,用于外层异步调用。

private function transfer(RequestInterface $request, array $options)
{
  $handler = $options['handler'];

  try {
    // 这里会先调用配置中的 handler 方法
    return Promise\promise_for($handler($request, $options));
  } catch (\Exception $e) {
    return Promise\rejection_for($e);
  }
}

处理请求

对于上面的 handler 处理小节,你可能会有疑惑,为什么就调用了 handler 函数,那不是直接开始处理请求了吗?

我们之前介绍过 handler 的数据结构,是一个是 handlerStack 对象,但是其调用本质是一系列组合闭包。但数据结构上是一个对象,怎么使用的时候就成了闭包呢?

当尝试以调用函数的方式调用一个对象时,__invoke() 方法会被自动调用。

有了这个前提,我们看下 handlerStack 源码。

从 handlerStack 的名字上,我们就能知道它是一个” 栈 “数据结构,其满足” 后进先出 “的特性。

public function __invoke(RequestInterface $request, array $options)
{
  // 这个函数主要是实现 Middleware 中间件操作
  $handler = $this->resolve();

  //这里下面紧接着会有分析
  return $handler($request, $options);
}

public function resolve()
{
  // 变量缓存,能优化部分性能
  if (!$this->cached) {
    // 这个 handler 就是之前选择的 实现请求的底层方法
    // 如果没有的话,请求都无法实现,就别折腾了,抛个异常终止吧
    if (!($prev = $this->handler)) {
      throw new \LogicException('No handler has been specified');
    }
    // 反转顺序,实现”后进先出“特性,调用每个中间件
    foreach (array_reverse($this->stack) as $fn) {
      // 中间件的注册是 [$middleware, $name] 形式的
      // 所以取第一个元素是其具体实现,第二个参数只是名字
      // 调用第一次传入的是 handler,后续传入的就是上一次处理的结果
      $prev = $fn[0]($prev);
    }
    // 所有的都处理完毕,缓存起来
    $this->cached = $prev;
  }
  return $this->cached;
}

上面就是很经典的中间件模型实现,laravel 中实现的略有区别,主要用到了 array_reduce 这个函数,但是原理上大同小异,知道其原理一通百通。

中间件流程

我们再继续看看源码。还是这个方法,不过我们分析其最终调用的实现。

根据 Middleware 流程图,我们知道最后一个调用的是 http_errors,我们就来分析它吧,没有任何特殊性,其他的 Middleware 结构都是一样的,只是有些中间件多次使用了 __invoke() 魔术方法而已。

Middleware 里面闭包结构复杂,好好理解下。

public function __invoke(RequestInterface $request, array $options)
{
  // 这个函数主要是实现 Middleware 中间件操作
  $handler = $this->resolve();
  // 现在我们分析这个
  return $handler($request, $options);
}

public static function httpErrors()
{
  // 第一次调用返回!传入一个 闭包-A
  return function (callable $handler) {
    // 第二次调用返回!传入 $request,$options
    return function ($request, array $options) use ($handler) {
      // Middleware 自己的逻辑判定返回什么样的闭包
      if (empty($options['http_errors'])) {
        // 第三次调用返回!返回 闭包-A 的处理结果
        // 这里根据配置 没有注册 then 函数,直接进行下一步处理
        return $handler($request, $options);
      }
      // 第三次调用返回!返回 闭包-A,附加 promise 
      // 根据上面我们说到的 promise 特性,这里用 then 
      // 附加了 闭包-A 处理完毕之后要调用的逻辑
      return $handler($request, $options)->then(
        function (ResponseInterface $response) use ($request, $handler) {
          $code = $response->getStatusCode();
          if ($code < 400) {
            return $response;
          }
          throw RequestException::create($request, $response);
        }
      );
    };
  };
}

关于返回层数,可以根据 return 来迅速定位,一个 return 就对应一次调用返回。

现在我们先梳理下到这步 handlerStack 被调用的次数,知道这三层闭包分别在哪里被调用了,有利于我们得出最终结果。

// 第一次
public static function create(callable $handler = null)
{
  $stack->push(Middleware::httpErrors(), 'http_errors');
}
// 第二次
public function resolve()
{
  $prev = $fn[0]($prev);
}
// 第三次
public function __invoke(RequestInterface $request, array $options)
{
  $handler($request, $options);
}

最后的结果应该是,如果按照 Middleware 结构应该是这样的:

$handler($request, $options)->then('http_errors')
                ->then('allow_redirects')
                ->then('cookies')
                ->then('prepare_body')

这个 $handler 就是最开始传入的请求实现底层方法。

整个 Middleware 就实现了,传入的时候先处理一遍请求数据,请求完了,通过 then 再处理一遍请求结果。

注意!!!由于 Middleware 的作用不同,可能有的 Middleware 并不会处理请求结果,就不会注册 then 函数。这里描述的是 Middleware 的整个流程,并没有对其中某个做特殊分析,因为其需求场景不同,逻辑处理会有细微变化。

这个 $handler 具体是哪个请求方法呢?还记得 choose_handler() 方法吗,它决定了到底使用哪种底层方法去实现请求,现在我们终于执行到发起请求的步骤了。

再回顾下 choose_handler() 方法。

function choose_handler()
{
  $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());
    $handler = new CurlHandler();
    $handler = new CurlHandler();
    $handler = $handler
        ? Proxy::wrapStreaming($handler, new StreamHandler())
        : new StreamHandler();
    return $handler;
}

这两个方法都有源码分析,没有印象的可以再回去看看。

不同 $handler 都是在 __invoke() 方法上做文章。

我们分析第一个 $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());

public static function wrapSync(
  callable $default,
  callable $sync
) {
  return function (RequestInterface $request, array $options) use ($default, $sync) {   // 注意这里的三目运算符,判定同步请求选项是否为空
    return empty($options[RequestOptions::SYNCHRONOUS])
      // 默认是并发请求 new CurlMultiHandler()
      ? $default($request, $options)
      // 这里是同步请求 new CurlHandler()
      : $sync($request, $options);
  };
}

现在,异步和同步请求终于出现了区别。我们先看同步请求。

同步请求

我们回顾下 request() 方法,注意到一个步骤。

public function request($method, $uri = '', array $options = [])
{
  // 这里往配置中添加了一个选项,设置该请求为同步的
  $options[RequestOptions::SYNCHRONOUS] = true;
}

所以,这里走的是同步请求,我们来分析 CurlHandler()

public function __invoke(RequestInterface $request, array $options)
{
  // 如果设置了延迟请求,会在这里阻塞一会
  if (isset($options['delay'])) {
    usleep($options['delay'] * 1000);
  }
  // 创建一个 handler 抽象对象
  $easy = $this->factory->create($request, $options);
  // 执行
  curl_exec($easy->handle);
  $easy->errno = curl_errno($easy->handle);
  // 请求处理结束
  return CurlFactory::finish($this, $easy, $this->factory);
}

这里需要分析一下工厂类 CurlFactory,里面大都涉及到 cURL 的一些配置,有兴趣的可以看下源码学习,配置的含义官方文档有专门的介绍,我这里就不在分析它们了,主要流程的分析还是不会缺的。

public function create(RequestInterface $request, array $options)
{
  if (isset($options['curl']['body_as_string'])) {
    $options['_body_as_string'] = $options['curl']['body_as_string'];
    unset($options['curl']['body_as_string']);
  }
  // handle 的一个抽象对象
  $easy = new EasyHandle;
  $easy->request = $request;
  $easy->options = $options;
  // 获取默认配置
  $conf = $this->getDefaultConf($easy);
  // 解析请求方法
  $this->applyMethod($easy, $conf);
  // 解析配置
  $this->applyHandlerOptions($easy, $conf);
  // 解析头部
  $this->applyHeaders($easy, $conf);
  unset($conf['_headers']);
  // 解析自定义 curl 配置
  if (isset($options['curl'])) {
    $conf = array_replace($conf, $options['curl']);
  }
  // 设置回调函数用于处理返回头
  $conf[CURLOPT_HEADERFUNCTION] = $this->createHeaderFn($easy);
  // 从 handle 池 获取一个 handle,没有就新建一个
  $easy->handle = $this->handles
    ? array_pop($this->handles)
    : curl_init();
  curl_setopt_array($easy->handle, $conf);

  return $easy;
}

public static function finish(
  callable $handler,
  EasyHandle $easy,
  CurlFactoryInterface $factory
) {
  // 这里会调用配置用设置的 on_stats 函数
  if (isset($easy->options['on_stats'])) {
    self::invokeStats($easy);
  }
  // 有错误的话走错误处理流程
  if (!$easy->response || $easy->errno) {
    return self::finishError($handler, $easy, $factory);
  }
  // 释放资源,还到 handle 池
  $factory->release($easy);
  // 处理 流数据
  $body = $easy->response->getBody();
  if ($body->isSeekable()) {
    $body->rewind();
  }
  // 返回一个满足状态的 promise
  return new FulfilledPromise($easy->response);
}

根据源码分析,同步请求在这一步就已经发出了请求,并且回调了配置中的 on_stats 函数,拿到了未经处理的返回值原始返回值,并且同步请求 handler 池,也就是复用的请求句柄为 3 个,这个没有办法修改,写死在代码中的。

异步请求

public function __invoke(RequestInterface $request, array $options)
{
    $easy = $this->factory->create($request, $options);
    // 为每个请求生成一个 ID
    $id = (int) $easy->handle;
// 注册一个 promise,分别是调用执行和关闭方法
    $promise = new Promise(
        [$this, 'execute'],
        // 依据 ID 来关闭请求
        function () use ($id) { return $this->cancel($id); }
    );
// 添加请求 底层是 curl_multi_add_handle 方法
    $this->addRequest(['easy' => $easy, 'deferred' => $promise]);
    return $promise;
}

工厂类 CurlFactory 在上面已经分析过,这里不再赘述。但是异步请求这个时候并没有发起最终的请求,先是为每个请求生成一个 ID,然后将请求添加到批处理回话句柄 (curl_multi_add_handle) 中,最后返回了一个 promise 对象,里面注册了 execute 函数和 cancel 函数,用于后面发起和关闭请求。

需要注意的就是设定了延迟执行的请求,是在 addRequest() 方法中处理的。后面在” 返回结果 “章节会讲到延迟请求处理。

流处理

配置中如果 stream 选项不为空,就会启用它,如果你没有 cURL,那就只能用它了。

public function __invoke(RequestInterface $request, array $options)
{
  // 如果设置了延迟请求,会在这里阻塞一会
  if (isset($options['delay'])) {
    usleep($options['delay'] * 1000);
  }
  // 流处理本身信息较少,所以为了补全一些信息,这里记录处理开始时间
  $startTime = isset($options['on_stats']) ? microtime(true) : null;
  try {
    // 不支持 expect header.
    $request = $request->withoutHeader('Expect');
    // 当内容为 0 的时候,依然添加一个头信息
    if (0 === $request->getBody()->getSize()) {
      $request = $request->withHeader('Content-Length', 0);
    }
    // 发起请求,然后回调 on_stats 函数
    // 解析结果,同样返回一个满足状态的 promise
    return $this->createResponse(
      $request,
      $options,
      $this->createStream($request, $options),
      $startTime
    );
  } catch (\InvalidArgumentException $e) {
    throw $e;
  } catch (\Exception $e) {
    // Determine if the error was a networking error.
    $message = $e->getMessage();
    // This list can probably get more comprehensive.
    if (strpos($message, 'getaddrinfo') // DNS lookup failed
        || strpos($message, 'Connection refused')
        || strpos($message, "couldn't connect to host") // error on HHVM
       ) {
      $e = new ConnectException($e->getMessage(), $request, $e);
    }
    $e = RequestException::wrapException($request, $e);
    $this->invokeStats($options, $request, $startTime, null, $e);

    return \GuzzleHttp\Promise\rejection_for($e);
  }
}

关于流处理,因为其底层实现是 fopen() 函数,其支持的协议比较多,不止有 HTTP,它支持的协议和封装协议在这里可以看到,所以 Guzzle 对其做了一些特殊处理以满足业务需要。

返回结果

根据上面的分析我们已经知道 transfer 方法返回的结果是什么了,然后就是获取返回结果。

同步请求

同步请求因为在 transfer 方法中,实际的请求已经发出去,已经拿到了未经处理的原始返回结果。

public function send(RequestInterface $request, array $options = [])
{ 
  // 我们注意到最后调用的 wait 方法
  return $this->sendAsync($request, $options)->wait();
}

在同步请求方法中,直接调用了 wait() 方法,所以直接走 promise 对象的 wait() 方法及注册的 then() 方法。还记得之前的 Middleware 里面注册了一些 then() 方法吗?这里主要就是调用它们了,完成中间件 “处理返回结果” 的这一步骤,当然还有一些在逻辑处理中注册的 then() 方法,在此不再举例。

异步请求

异步请求在 transfer 方法中返回的是一个 Promise,此时实际请求并没有发送。我们从官方例子来分析发送请求并且获取返回结果的方式。

$promise = $client->requestAsync('GET', 'https://www.chengxiaobai.com');
$promise->then(
    function (ResponseInterface $res) {
        echo $res->getStatusCode() . "\n";
    },
    function (RequestException $e) {
        echo $e->getMessage() . "\n";
        echo $e->getRequest()->getMethod();
    }
);

这种方式是对每个异步请求单独注册 then() 方法,说明这个请求成功了怎么处理,失败了怎么处理。

$client = new Client(['base_uri' => 'https://www.chengxiaobai.com']);
// 注册多个异步请求,实现并发
$promises = [
    'image' => $client->getAsync('/image'),
    'png'   => $client->getAsync('/image/png'),
    'jpeg'  => $client->getAsync('/image/jpeg'),
    'webp'  => $client->getAsync('/image/webp')
];
// 有一个失败就终止
$results = Promise\unwrap($promises);
// 忽略某些请求的异常,保证所有请求都发送出去
$results = Promise\settle($promises)->wait();

这个是设定多个异步请求,实现并发,并选择对部分请求错误是否忽略进行处理。

$client = new Client();
$requests = function ($total) use ($client) {
    for ($i = 1; $i < $total; $i++) {
      $uri = 'https://www.chengxiaobai.com/page/' . $i;
        // 这里用到了协程
        yield function() use ($client, $uri) {
            return $client->getAsync($uri.$i);
        };
    }
};
$pool = new Pool($client, $requests(10), [
    // 并发数
    'concurrency' => 5,
    'fulfilled' => function ($response, $index) {
        echo $res->getStatusCode() . "\n";
    },
    'rejected' => function ($reason, $index) {
        echo $e->getMessage() . "\n";
    },
]);
// 初始化 Promise
$promise = $pool->promise();
// 发起请求处理
$promise->wait();

这个是对大批量请求做出一个批量处理,类似一个请求池的的概念,设定了出口速率 (concurrency),使用统一的处理逻辑,处理请求池当中的数据。

我们来分析下 Pool 的源码,主要是构造函数。

public function __construct(
  ClientInterface $client,
  $requests,
  array $config = []
) {
  // 设定请求池大小
  if (isset($config['pool_size'])) {
    $config['concurrency'] = $config['pool_size'];
  } elseif (!isset($config['concurrency'])) {
    // 默认并发数 25
    $config['concurrency'] = 25;
  }
  if (isset($config['options'])) {
    $opts = $config['options'];
    unset($config['options']);
  } else {
    $opts = [];
  }
  // 将请求列表转换为一个迭代器
  $iterable = \GuzzleHttp\Promise\iter_for($requests);
  $requests = function () use ($iterable, $client, $opts) {
    // 遍历请求列表
    foreach ($iterable as $key => $rfn) {
      // 如果是一个 request 的实现,转换为一个异步请求
      if ($rfn instanceof RequestInterface) {
        yield $key => $client->sendAsync($rfn, $opts);
      } elseif (is_callable($rfn)) {
        // 如过是一个闭包,直接调用 
        yield $key => $rfn($opts);
      } else {
        throw new \InvalidArgumentException('...');
      }
    }
  };
  // 支持迭代的 promise 对象
  $this->each = new EachPromise($requests(), $config);
}

我们可以看到,Pool 模式下,所有的请求配置 $opts 都是一样的,所以每个请求的处理逻辑都是一样的,如果每个请求都有有定制化需求,Pool 模式可能不太适合,当然可以使用修改源码的方式,不过这个已经不符合 Pool 模式设计的初衷了。

不管哪种形式,都可以发现触发最终调用的都是 wait() 方法。这个和 promise 的规范有关。

我们看下异步如何处理请求的。

还记得异步请求返回的 promise 吗?

$promise = new Promise(
            [$this, 'execute'],
            // 依据 ID 来关闭请求
            function () use ($id) { return $this->cancel($id); }
        );

wait() 方法调用的就是 [$this, 'execute'],我们来分析它的实现。在此之前,我们需要特别说明下延迟请求。

延迟时间

对于延迟请求,同步请求和流请求很好处理,直接阻塞就好了,如果是 20 个异步请求中包含 10 个延迟请求,每个延迟时间还不相等,这个时候延迟请求的处理就得好好考虑下了。

在” 请求处理 “章节我们说过,延迟请求是没有立即加到批处理请求句柄的,它被暂时存放在 $this->delays 队列中。直到你决定发起请求了,延迟请求才被拿出来计算其是否应该被加到批处理请求句柄中。计算逻辑我们从源码看看如何计算阻塞时间。

public function execute()
{
  $queue = P\queue();
  while ($this->handles || !$queue->isEmpty()) {
    // 如果没有在进行的请求,并且延迟请求队列不为空,就开始阻塞
    if (!$this->active && $this->delays) {
      usleep($this->timeToNext());
    }
    $this->tick();
  }
}
private function timeToNext()
{
  $currentTime = microtime(true);
  $nextTime = PHP_INT_MAX;
  // 找出现有延迟请求队列中最小的延迟时间
  foreach ($this->delays as $time) {
    if ($time < $nextTime) {
      $nextTime = $time;
    }
  }
  return max(0, $nextTime - $currentTime) * 1000000;
}

execute 主要是调用了 tick() 这个方法。

public function tick()
{
  // 如果延迟请求队列不为空,处理延迟请求
  if ($this->delays) {
    $currentTime = microtime(true);
    // $this->delays[$id] = microtime(true) + ($easy->options['delay'] / 1000);
    foreach ($this->delays as $id => $delay) {
      // 延迟任务已经达到延迟预期时间,开始处理
      if ($currentTime >= $delay) {
        // 将它从延迟任务队列中删除
        unset($this->delays[$id]);
        // 添加到批量请求句柄中
        curl_multi_add_handle(
          $this->_mh,
          $this->handles[$id]['easy']->handle
        );
      }
    }
  }
  // 执行队列中的任务
  P\queue()->run();
  // 执行请求
  if ($this->active &&
      curl_multi_select($this->_mh, $this->selectTimeout) === -1
     ) {
    // See: https://bugs.php.net/bug.php?id=61141
    usleep(250);
  }
  while (curl_multi_exec($this->_mh, $this->active) === CURLM_CALL_MULTI_PERFORM);
  // 获取请求结果信息,移除请求成功的请求
  $this->processMessages();
}

然后异步处理流程就很清晰了:

  1. 如果延迟请求队列不为空并且当前没有在执行的请求,先阻塞最小的延迟时间,以保证延迟请求队列在每次请求都至少被消耗一个。如果有正在执行的请求或者延迟请求队列不为空,直接执行 2。
  2. 发起一次批量请求。
  3. 获取请求信息,移除成功的请求。
  4. 如果请求队列不为空,执行 1-3。

从上面的流程,我们可以分析得出,即使你的并发数大于请求数,也并不意味着只请求一次,可能会有重试或者延迟请求造成多次请求。并且根据步骤 1 我们也可以知道,非延迟任务也会跟着一起被阻塞。

和同步请求一样,异步请求下每个请求处理完毕后,都会执行相应的 then() 方法完成返回结果处理。

流请求

因为流请求本质上是基于 fopen 的,发起请求逻辑比较简单。

public function __invoke(RequestInterface $request, array $options)
{
  // 延迟请求直接 delay 操作
  if (isset($options['delay'])) {
    usleep($options['delay'] * 1000);
  }
  // 重点 1:解析返回值
  return $this->createResponse(
    $request,
    $options,
    // 重点 2:发起请求
    $this->createStream($request, $options),
    $startTime
  );
}

先看如何发起请求的,重点在配置项的处理。

private function createStream(RequestInterface $request, array $options)
{
    $params = [];
    // 这里设置了默认请求参数
    $context = $this->getDefaultContext($request);
  // 这里方法主要是依据配置项调用了
    // add_proxy,add_timeout,add_verify,add_cert,add_progress,add_debug
    // 其实本质上就是用自定义配置覆盖默认请求参数
    if (!empty($options)) {
        foreach ($options as $key => $value) {
            $method = "add_{$key}";
            if (isset($methods[$method])) {
                $this->{$method}($request, $context, $value, $params);
            }
        }
    }
  // 这里也是用自定义配置覆盖默认请求参数
    if (isset($options['stream_context'])) {
        if (!is_array($options['stream_context'])) {
            throw new \InvalidArgumentException('stream_context must be an array');
        }
        $context = array_replace_recursive(
            $context,
            $options['stream_context']
        );
    }
    // 解析 host,支持强制 IP 解析,v4 和 v6 都支持
    $uri = $this->resolveHost($request, $options);
    $context = $this->createResource(
        function () use ($context, $params) {
            // 这里创建资源流
            return stream_context_create($context, $params);
        }
    );

    return $this->createResource(
        function () use ($uri, &$http_response_header, $context, $options) {
            // 这里发起请求
            $resource = fopen((string) $uri, 'r', null, $context);
            $this->lastHeaders = $http_response_header;
      // 设置超时时间
            if (isset($options['read_timeout'])) {
                $readTimeout = $options['read_timeout'];
                $sec = (int) $readTimeout;
                $usec = ($readTimeout - $sec) * 100000;
                stream_set_timeout($resource, $sec, $usec);
            }
            return $resource;
        }
    );
}

从代码看,默认启用了 HTTPS。

这里的自定义配置和默认配置合并,不再是之前简单的数组合并操作了,因为某个配置的修改,可能会涉及到其他配置项的变动,所以对几个主要选项 (proxy,timeout,verify,cert,progress,debug) 做了封装。

毕竟 fopen 这个功能强大的函数在设计之初其目标就是操作 resource,所以它的配置项也根据 resource 的不同而有差异,其针对 HTTP 的配置项在这里可以看到。

然后就是处理返回值,如果使用 cURL 这些都能方便的处理,但是在流处理中,就得自己去解析它,相当于要自己要完成一部分 cURL 的工作。

private function createResponse(
        RequestInterface $request,
        array $options,
        $stream,
        $startTime
    ) {
        $hdrs = $this->lastHeaders;
        $this->lastHeaders = [];
        $parts = explode(' ', array_shift($hdrs), 3);
        $ver = explode('/', $parts[0])[1];
        $status = $parts[1];
        $reason = isset($parts[2]) ? $parts[2] : null;
      // 解析 header 
        $headers = \GuzzleHttp\headers_from_lines($hdrs);
      // 解析返回类型
        list ($stream, $headers) = $this->checkDecode($options, $headers, $stream);
      // 构建一个 Psr7\StreamInterface 的 Stream 对象
        $stream = Psr7\stream_for($stream);
        $sink = $stream;
        $response = new Psr7\Response($status, $headers, $sink, $ver, $reason);
    // 回调 on_headers 函数
        if (isset($options['on_headers'])) {
            try {
                $options['on_headers']($response);
            } catch (\Exception $e) {
                $msg = 'An error was encountered during the on_headers event';
                $ex = new RequestException($msg, $request, $response, $e);
                return \GuzzleHttp\Promise\rejection_for($ex);
            }
        }
    // 回调 on_stats 函数
        $this->invokeStats($options, $request, $startTime, $response, null);
  
        return new FulfilledPromise($response);
    }

整个过程都是在解析数据,响应内容是通过 stream_get_contents 拿到的,在 Psr7\StreamInterface 实例中有体现。

这里单独说下 on_headers 这个函数。这个函数是在拿到返回头之后,依据返回头里面的信息,来判定如何响应后面的操作,在返回数据比较大的时候可以做到提前拦截,避免浪费资源。

这个设置在所有请求方式中都有效,只是用在流处理中意义更大。

$client->request('GET', 'http://httpbin.org/stream/1024', [
    'on_headers' => function (ResponseInterface $response) {
        if ($response->getHeaderLine('Content-Length') > 1024) {
            throw new \Exception('The file is too big!');
        }
    }
]);

彩蛋

在分析源码的过程中,发现了一个没有被使用的类 GuzzleHttp\Promise\Coroutine ,它也是对 promise 的一个实现,但是其是通过迭代器来实现的,会不会有协程版的 promise 呢?我们拭目以待。