Skip to content
This repository has been archived by the owner on Aug 7, 2024. It is now read-only.

分析生产环境的 Promise 实现,完整理解 Promise 原理 #28

Open
fi3ework opened this issue Jul 13, 2018 · 0 comments
Open

Comments

@fi3ework
Copy link
Owner

fi3ework commented Jul 13, 2018

前言

本文适合有一定 Promise 使用基础的读者,如果你还没有使用过 Promise,建议先通过以下教程了解 Promise 的使用方法

  1. JavaScript Promise迷你书(中文版)
  2. Promise 对象 - ECMAScript 6 入门

本篇文章中我们要分析的 Promise 实现库的源码是 es6-promise,一个通过了 Promises/A+ Tests 的 polyfill,源码通过 ES6 书写比较简洁。为方便理解,放上我已经写了注释的仓库地址:es6-promise-annotated,可以配合阅读。

源码分析

思路

在正式进入源码分析之前,我们先明确 Promise 的一些基本准则:

  1. Promise 是一个类,那么每次调用 new Promise() 自然都会返回一个 Promise 实例对象,调用 .then() Promise.resolve() Promise.reject() 返回的也都是一个被包装好的 Promise 对象。
  2. Promise.prototype.thenPromise.prototype.catch 的参数中的回调函数虽然是异步执行的,但是向 promise 对象注册执行成功时和失败时相应的回调函数的行为是同步执行的。
  3. Promise 在完成所有的同步注册之后,会在下一个 event loop 启动链式调用注册的回调函数。
  4. 因为要注册的是回调事件,所以 then 的参数必须传递一个或两个函数,否则会发生值穿透,即这个 then 会被直接无视掉,就算是给 then 传递一个 Promise 对象也要通过一个函数来返回这个对象。
  5. then 中的回调函数在被执行时是同步的,想要通过 then 将多个异步函数串联起来要在每个 then 中传递一个返回 Promise 的函数。

Promise 的工作主要分为两个阶段,一是同步注册,二是异步触发链式调用

同步注册

一般来说在调用 Promise 时都是多个 Promise 对象串联起来的,then 中函数的调用是异步的, 但是 Promise 对象的创建是同步的,如下例子中:

const p1 = new Promise(function(resolve, reject){
    setTimeout(()=>{
        resolve('a', 'b')
        }, 1000)
})

p1.then((value1, value2)=>{
    console.log(value1, value2)
})

p1 和 p1 被 then 包装出来的 Promise 对象是会在整个 script 脚本的执行过程中同步完成,这个过程就是注册,每个子 promise 都会添加到父 promise 的属性上来完成注册,以便在接下来的链式调用中按照顺序执行。

链式调用

Promise 可以链式调用关键的一点就是确定好每个节点的调用接口(有点像递归的递归入口),这样每个 Promise 对象就可以通过相同的接口串联起来调用,Promise 对象的模型如下图:

img_0016

每个父 promise 传递给子 promise 们它的 settled 状态来指示该触发子 promise 成功或失败的回调函数,并传递给子 promise 它的 settled 值

构造函数

每个 Promise 对象都有三个状态:pending fulfilled rejected,所以需要有一个内部属性来标识状态。向 Promise 对象注册的回调函数也会被同步存在一个内部属性中,在执行子 promise 的时候取出进行异步调用。

Promise 的构造函数只接受一个参数,函数签名为 function(resolve, reject){},所以要对传入的参数进行判断,传入的参数非函数时直接报错(如果想传入非函数的参数要使用 Promise.resolve,会自动包装出一个 Promise 对象,后面也会讲到)。

  constructor(resolver) {
    this[PROMISE_ID] = nextId();
    // 初始化 promise
    this._result = this._state = undefined;
    // 向 promise 注册的回调函数 
    this._subscribers = [];

    if (noop !== resolver) {
      typeof resolver !== 'function' && needsResolver();
      // 确保使用 new 调用 Promise
      this instanceof Promise ? initializePromise(this, resolver) : needsNew();
    }
  }

Promise 构造函数中的函数是同步执行的,通过 initializePromise 来启动这个 Promise 对象

// 在 new Promise() 时同步执行 resolver
// 执行到 resolver 中的 resolve 时,实际执行的是 resolve(promise, value)
function initializePromise(promise, resolver) {
  try {
    resolver(function resolvePromise(value) {
      resolve(promise, value);
    }, function rejectPromise(reason) {
      reject(promise, reason);
    });
  } catch (e) {
    reject(promise, e);
  }
}

但我们知道 new Promise(function(resolve, reject)) 中 resolve 和 reject 的执行是异步的,value 这个参数就是在 Promise 的构造函数中传递给 resolve 和 reject 的参数值(可以看到,resolve 和 reject 都只能接受一个参数,后面的参数会被直接忽略)。

到这一步,回调的 resolve(value)reject(error) 实际上在内部被封装成了 (value) => resolve(promise, value)(reason) => reject(promise, reason),promise 就是当前的 Promise 对象,value 在这里就是 'a',reject 同理。表面看上去只是多了一个 promise 的参数,但是子 promise 事先已经向父 promise 通过 then 完成了同步的注册,启动之后,就能异步的链式调用了。

在这里要注意 resolve 和 reject 是异步的,后面会讲到。

注册

接下来我们来看下 thencatch 是如何在 Promise 上注册回调的。

  if (child[PROMISE_ID] === undefined) {
    makePromise(child);
  }

  const { _state } = parent;  
  // 如果当前 promise 已经 settled 了
  // 则可以直接执行 then 的 promise
  if (_state) {
    const callback = arguments[_state - 1];
    asap(() => invokeCallback(_state, child, callback, parent._result));
  } else {
    // then 前的 promise 作为 parent
    // then 后的 promise 作为 child
    // 将 then 后的 onFulfillment, onRejection 注册到 parent 的 _subscribers 上
    subscribe(parent, child, onFulfillment, onRejection);
  }

then 中传入的一个或两个函数都会被包装成 Promise 对象来满足 Promises/A+ 规定,并且只有返回 Promise 对象才可以符合调用的接口。每次 then 的 Promise 对象(子 promise)会连同它的 resolve 和 reject 回调函数一同存入被 then 的 Promise 对象(父 promise)的 _subscribers 中。

// 将 then 的 promise 的 onFulfillment, onRejection 注册到被 then 的 promise 的 _subscribers 上
// 并且调用尽快开始异步执行
// 每次注册时添加三个对象:下一个 promise,下一个 promise 的 onFulfillment,下一个 promise 的 onRejection
function subscribe(parent, child, onFulfillment, onRejection) {
  let { _subscribers } = parent;
  let { length } = _subscribers;

  parent._onerror = null;

  _subscribers[length] = child;
  _subscribers[length + FULFILLED] = onFulfillment;
  _subscribers[length + REJECTED] = onRejection;

  // promise 的启动
  // 在当前 promise 在 then 的时候,就在下一个 microtask 中注册要执行所有注册的回调函数
  if (length === 0 && parent._state) {
    asap(publish, parent);
  }
}

链式调用的实现

在上一节中,then 的 Promise 对象向被 then 的 Promise 对象注册了回调事件,至此完成了第一阶段 —— 同步注册。接下来是第二个阶段 —— 启动链式调用。

Promise 有几种创建的方法:1. 通过构造函数 new 出来的。2. 通过 then 或 reject 出来的 3. 通过 Promise.resolve 或 Promise.reject 创建的。这三种创建 Promise 对象的启动的方法相同,不同的是

  1. 构造函数 new 出来的 Promise 对象自身状态是由传入的函数的执行情况来确定。
  2. then 或 reject 出来的 Promise 执行 onFulfilled 或 onRejected 由父 promise 的状态决定,并在 onFulfilled 或 onRejected 中确定自己的状态。
  3. Promise.resolve 或 Promise.reject 返回的是一个已经 settled 的 Promise。

分别分情况来看:

构造函数 new 出来的 Promise

对于通过构造函数创建的 Promise 对象,没有父 promise 来给它 settled 状态及返回值,这两者都是在传给 Promise 的函数的逻辑中传递的。

// 在 new Promise() 时同步执行 resolver
// 但是 new Promise(function(resolve, reject)) 中 resolve 和 reject 的执行是异步的
// 执行到 resolver 中的 resolve 时,实际执行的是 resolve(promise, value)
// 借助闭包多传入了当前的 promise 对象
function initializePromise(promise, resolver) {
  try {
    resolver(function resolvePromise(value) {
      resolve(promise, value);
    }, function rejectPromise(reason) {
      reject(promise, reason);
    });
  } catch (e) {
    reject(promise, e);
  }
}

then 或 reject 出来的 Promise 对象

对于 then 或 reject 出来的 Promise 对象,要先包装出一个 Promise 对象

  if (child[PROMISE_ID] === undefined) {
    makePromise(child);
  }

然后根据父 proimse 的状态来确定自己将要执行 onFulfilled 或 onRejected,如果父 promise 已经 settled 了,则可以直接执行 then 的回调函数。

如果父 promise 还没有 settled,就默默进行注册,上面已经提到过,subscribe 函数会在第一个次注册回调函数时执行所有父 promise 的回调函数。

  const { _state } = parent;

  // 如果当前 promise 已经 settled 了
  // 则可以直接执行 then 的回调函数
  if (_state) {
    const callback = arguments[_state - 1];
    asap(() => invokeCallback(_state, child, callback, parent._result));
  } else {
    // then 前的 promise 作为 parent
    // then 后的 promise 作为 child
    // 将 then 后的 onFulfillment, onRejection 注册到 parent 的 _subscribers 上
    subscribe(parent, child, onFulfillment, onRejection);
  }

Promise.resolve 或 Promise.reject 返回的 Promise

拿 Promise.resolve 来举例即可,Promise.reject 同理。

// 返回一个 promise,并且这个 promise 即将被 resolve 了
export default function resolve(object) {
  /*jshint validthis:true */
  let Constructor = this;

  // 如果传入的就是一个 promise 那么就可以直接返回
  if (object && typeof object === 'object' && object.constructor === Constructor) {
    return object;
  }

  // 生成一个新的 promise
  let promise = new Constructor(noop);
  // 用传入的 value 去 resolve 它
  _resolve(promise, object);
  return promise;
}

这里的 _resolve 就是之前出现多次的 resolve,表示当前 Promise 已走 onFulfilled 的回调了。

确定 promise 的状态

当一个 Promise 对象接受父 promise 传入的状态或根据自身的回调函数确定要执行 onFulfilled 的回调函数时可能遇到三种情况:1. 如果传入 onFulfilled 的参数是自身,会导致递归爆栈,这时要 reject 掉。 2. 传入的回调函数是一个 thenable 对象,那么 3. 如果传入的是其他对象,则可以直接 fulfill 掉当前的 Promise 对象。

// promise: 当前 promise 对象
// value: 传入 resolve 的 value
function resolve(promise, value) {
  if (promise === value) {
    // 自己 resolve 自己会递归爆栈
    reject(promise, selfFulfillment());
  } else if (objectOrFunction(value)) {
    // 处理可能的 thenable 对象
    // 如果上一个 promise 返回的是对象或函数
    // 则可能有两种可能,一种是正常的返回结果
    // 另一个中是返回的是 thenable 对象
    handleMaybeThenable(promise, value, getThen(value));
  } else {
    // 传入基本类型可以直接 fulfill
    fulfill(promise, value);
  }
}

fulfill 函数的功能就是确定当前 promise 的 state 和 result。之后,就可以通过 asap 异步通知所有子 promise 开始执行。

// 确定状态为 fulfilled
// 在 fulfilled 自己之后,会 publish 下一个 promise
function fulfill(promise, value) {
  // 每个 promise 只能被 fulfill 或者 reject 一次
  if (promise._state !== PENDING) { return; }

  // 状态变为 fulfilled 并且保存结果
  promise._result = value;
  promise._state = FULFILLED;

  // 如果 promise 后面有 then 的函数,则尽快异步执行下一个 promise
  if (promise._subscribers.length !== 0) {
    asap(publish, promise);
  }
}

对于 publish,就是依次同步处理所有子 promise 们。如果子 promise 有注册了回调,那么触发子 promise。

// 执行 promise 之后所有 then 的函数
function publish(promise) {
  let subscribers = promise._subscribers;
  let settled = promise._state;

  // 如果后面没有 then 就直接返回
  if (subscribers.length === 0) { return; }

  let child, callback, detail = promise._result;

  // 每次注册 then 的时候,都是往 _subscribers 添加 promise 和两个回调函数,所以 +3:
  for (let i = 0; i < subscribers.length; i += 3) {
    child = subscribers[i];
    callback = subscribers[i + settled];

    if (child) {
      // 如果被 then 了,则执行子 promise 注册的回调函数
      invokeCallback(settled, child, callback, detail);
    } else {
      // 如果后面没有 then 了,则可以直接执行回调
      callback(detail);
    }
  }

  promise._subscribers.length = 0;
}

当到了 invokeCallback,实际上就是执行每个子 promise 的回调,这时会有几种情况:1. 如果注册的回调函数的确实是函数,则执行回调函数得到结果,如果没有返回错误则说明要 fulfill 这个 promise。2. 如果注册的回调函数却不是函数,则说明发生了值穿透,此时直接用父 promise 的 result 来作为子 promise 的 result 向下传递。

// 执行后续 then 中的回调函数
// 上一个 promise 的状态
// 下一个 promise
// 对应状态注册的回调函数
// 上一个 promise 的返回值
function invokeCallback(settled, promise, callback, detail) {
  // 如果 then 中传入的 callback 不是函数,则会发生值穿透
  let hasCallback = isFunction(callback),
    value, error, succeeded, failed;

  // then 中传入了函数,可以回调
  // 判断自身是 fulfill 还是 reject
  if (hasCallback) {
    value = tryCatch(callback, detail);

    if (value === TRY_CATCH_ERROR) {
      failed = true;
      error = value.error;
      value.error = null;
    } else {
      succeeded = true;
    }

    // 防止 promise resolve 自己导致递归爆栈
    if (promise === value) {
      reject(promise, cannotReturnOwn());
      return;
    }

  } else {
    // 发生值穿透,则直接使用之前 promise 传递的值
    value = detail;
    succeeded = true;
  }

  if (promise._state !== PENDING) {
    // 又重新来一轮,启发链式调用
  } else if (hasCallback && succeeded) {
    resolve(promise, value);
  } else if (failed) {
    reject(promise, error);
  } else if (settled === FULFILLED) {
    fulfill(promise, value);
  } else if (settled === REJECTED) {
    reject(promise, value);
  }
}

异步执行的实现

我们知道 Promise 处于的异步队列是 microTask,这里不再重复 task 和 mircoTask 的执行顺序,说一下 microTask 的意义,引用 顾轶灵 大神在这个问题下回答:

为啥要用 microtask?根据HTML Standard,在每个 task 运行完以后,UI 都会重渲染,那么在 microtask 中就完成数据更新,当前 task 结束就可以得到最新的 UI 了。反之如果新建一个 task 来做数据更新,那么渲染就会进行两次。

microTask 某种程度上来说就是 task 执行前的钩子函数,JS 引擎在执行完一个 task 后会更新 UI,有了 microTask 就能在改变 UI 改变前操作 DOM。

对于简易实现的 Promise,一般都是直接使用 setTimeout 来做延迟,但是 setTimeout 属于 marcotask,在 es6-promise 中按顺序使用以下方式来进行异步的延迟,优先使用可以使用的方法。

nextTick

这是 Node 中特有的 microTask 函数,在 Node 环境中直接使用它即可。

MutationObserver

MutationObserver 是 HMLT5 引入的能在某个范围内的DOM树发生变化时作出适当反应的能力.该API设计用来替换掉在DOM3事件规范中引入的Mutation事件.

在调用时创建一个 BrowserMutationObserver 来监视一个 node,回调函数为需要异步执行的回调函数。

  const observer = new BrowserMutationObserver(flush);
  const node = document.createTextNode('');
  observer.observe(node, { characterData: true });

  return () => {
    node.data = (iterations = ++iterations % 2);
  };

MessageChannel

Channel Messaging API的Channel Messaging接口允许我们创建一个新的消息通道,并通过它的两个MessagePort 属性发送数据。

同理,对其中一个通道随便发送一个消息,另一个通道执行回调即可。

另外

Note: 此特性在 Web Worker 中可用。

所以 MessageChannel 可以作为 MutationObserver 的替补及 Web Worker 中的异步方法。

// web worker
function useMessageChannel() {
  const channel = new MessageChannel();
  channel.port1.onmessage = flush;
  return () => channel.port2.postMessage(0);
}

vert.x

笔者之前从来没听说个这个东西,找到了官网,就不在这里过多研究了。

setTimeout

如果以上方法皆不行则最后采用 setTimeout 的方法来执行。但是使用 setTimeout 来进行异步操作的话就不再是 microTask 的 Promise 了。

race

race 的实现比较简单,因为 promise 只能被 settle 一次,所以直接对 race 中传递的 promise 们都 then 上 race 的回调函数即可,回调函数会被最先完成的 promise å给 settle,。

export default function race(entries) {
  /*jshint validthis:true */
  let Constructor = this;

  if (!isArray(entries)) {
    return new Constructor((_, reject) => reject(new TypeError('You must pass an array to race.')));
  } else {
    // 新建一个要返回的 promise,然后同步 resolve 要 race 的几个 promise
    // 要返回的 promise 都 then 这几个要 race 的 promise
    // 当最快的那个 promise settle 后,会 resolve 或 reject 要返回的 promise
    // 之后的再  settle 的 promise 就不再起作用了
    return new Constructor((resolve, reject) => {
      let length = entries.length;
      for (let i = 0; i < length; i++) {
        Constructor.resolve(entries[i]).then(resolve, reject);
      }
    });
  }
}

all

在内部维护一个保存结果的数组及记录未 settled 的 promise 的数量值,在每一次 settled 的时候都将这个值减1,当最后一个 promise settled 之后,记录结果的数组也都保存好了每个 promise 的返回值,可以触发包装的 promise 的回调了。

export default class Enumerator {
  constructor(Constructor, input) {
    this._instanceConstructor = Constructor;
    this.promise = new Constructor(noop);
    // 所有传入 promise 必须是本 promise 的实例
    if (!this.promise[PROMISE_ID]) {
      makePromise(this.promise);
    }

    if (isArray(input)) {
      // 一共 all 了几个 promise
      this.length = input.length;
      // 还剩几个没执行完的,初始是所有 promise 的数量
      this._remaining = input.length;
      // 保存结果的数组
      this._result = new Array(this.length);

      if (this.length === 0) {
        fulfill(this.promise, this._result);
      } else {
        this.length = this.length || 0;
        this._enumerate(input);
        // 如果传入的 input 都是同步执行,那么直接在这一轮 event-loop 中就结束了。
        if (this._remaining === 0) {
          fulfill(this.promise, this._result);
        }
      }
    } else {
      // 不是数组直接 reject
      reject(this.promise, validationError());
    }
  }
  _enumerate(input) {
    for (let i = 0; this._state === PENDING && i < input.length; i++) {
      this._eachEntry(input[i], i);
    }
  }

  _eachEntry(entry, i) {
    let c = this._instanceConstructor;
    let { resolve } = c;

    // 如果是本 promise 的 resolve
    if (resolve === originalResolve) {
      let then = getThen(entry);

      // 如果是一个已经 settled 的 promise,则 _remaining--,并记录其结果
      if (then === originalThen && entry._state !== PENDING) {
        this._settledAt(entry._state, i, entry._result);
      }
      // 如果不是一个函数,则 _remaining--,并直接将其作为结果
      else if (typeof then !== "function") {
        this._remaining--;
        this._result[i] = entry;
      }
      // 如果是本 promise 的实例,则设置回调
      else if (c === Promise) {
        let promise = new c(noop);
        handleMaybeThenable(promise, entry, then);
        this._willSettleAt(promise, i);
      }
      // 如果不是本 promise 的实例,则包装一下设置回调
      else {
        this._willSettleAt(new c(resolve => resolve(entry)), i);
      }
    }
    // 如果不是本 promise 的 resolve
    else {
      this._willSettleAt(resolve(entry), i);
    }
  }

  // 同步
  _settledAt(state, i, value) {
    let { promise } = this;

    if (promise._state === PENDING) {
      this._remaining--;
      // 如果有一个 reject,则直接 reject
      if (state === REJECTED) {
        reject(promise, value);
      }
      // 设置结果
      else {
        this._result[i] = value;
      }
    }

    // 如果所有 input 都 settled 了,可以执行 all 的回调了
    if (this._remaining === 0) {
      fulfill(promise, this._result);
    }
  }

  // 通过 then 给 promise 注册 _settledAt
  _willSettleAt(promise, i) {
    let enumerator = this;

    subscribe(
      promise,
      undefined,
      value => enumerator._settledAt(FULFILLED, i, value),
      reason => enumerator._settledAt(REJECTED, i, reason)
    );
  }
}

总结

可以看到,一个可以用于生产环境中的 Promise 库,对各种边界条件都有很好的处理,并且能兼容第三方的 thenable 对象,选用合适的 API 来实现将任务推到 microtask 中。不过,做的更好的 Promise 库还应该有很强的拓展性,比如 bluebird,但是,通过了解 es6-promise 对梳理 Promise 中的各种非常规操作还是大有好处。

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant