@ncjamieson

RxJS: How to Write a delayUntil Operator

February 24, 2020 • 8 minute read

Traffic light
Photo by bady qb on Unsplash

Recently, I was asked — on Twitter — if there is a built-in operator that will delay notifications from a source until a signal is received from a notifier.

There isn’t, but it’s possible to write one and that’s what this article is about. Let’s do it step-by-step, starting with the signature.

The operator is only going to delay the notifications that it receives from the source. It isn’t going to manipulate the values within the notifications, so the observable’s element type is not going to change. Knowing that, we can start by writing the operator’s signature like this:

import { OperatorFunction } from "rxjs";

function delayUntil<T>(notifier: /* todo */): OperatorFunction<T, T> {
  /* todo */
}

As with takeUntil, we’ll use an observable as the notifier. The operator doesn’t care about the value that it receives from the notifier — only that has received one — so we should use Observable<any> as the type:

import { Observable, OperatorFunction } from "rxjs";

function delayUntil<T>(notifier: Observable<any>): OperatorFunction<T, T> {
  /* todo */
}

The operator’s implementation is going to split the source notifications into two streams: one stream for the pre-signal behaviour; and another for the post-signal behaviour. That means it’s going to need to subscribe to the source observable twice.

The implementation cannot subscribe directly to the source more than once — for example, with HTTP sources, multiple subscriptions would effect multiple requests — so the source has to be shared.

We can do that using the publish operator and a selector:

import { Observable, OperatorFunction } from "rxjs";
import { publish } from "rxjs/operators";

function delayUntil<T>(notifier: Observable<any>): OperatorFunction<T, T> {
  return source => source.pipe(
    publish(published => /* todo */)
  );
}

Inside the selector, we can subscribe to the published observable as many times as we like without effecting multiple subscriptions to the source.

After the source notifications are split into two streams, they will need to be joined together. We know that the first stream — the pre-signal, delayed notifications — ends when the signal is received and that the second stream — the post-signal notifications — just mirrors the source, so we can use concat to join the streams and we can use the published observable itself for the second stream:

import { concat, Observable, OperatorFunction } from "rxjs";
import { publish } from "rxjs/operators";

function delayUntil<T>(notifier: Observable<any>): OperatorFunction<T, T> {
  return source =>
    source.pipe(
      publish(published =>
        concat(
          /* todo */,
          published
        )
      )
    );
}

Now we need to figure out how to implement the stream of delayed notifications.

We know that it’s going to be based on the source notifications, so we can start with:

published.pipe(/* todo */)

We want to accumulate values until the notifier’s signal is received. The buffer operator can do that. It buffers values received from its source until it receives the notifier’s signal. It then emits those values in an array and resumes buffering the source:

published.pipe(buffer(notifier), /* todo */)

That gets us part of the way there, but we don’t want it to resume buffering once the signal is received. We want the delayed stream to complete, so that concat subscribes to its second argument — the published observable — and starts mirroring the source.

We can get the behaviour we want using the take operator:

published.pipe(buffer(notifier), take(1), /* todo */)

The delayed stream now buffers source values until a signal is received from the notifier, at which time it emits an array of values and then completes. But we don’t want an array of values emitted into the stream; we want the values themselves emitted. That means we need to flatten the stream and we can use the mergeMap operator to do that:

published.pipe(buffer(notifier), take(1), mergeMap(values => values))

We can do this because an array is a valid ObservableInput and when treated as an observable source, an array’s elements are emitted into the stream.

We can make this a little less verbose using the mergeAll operator. When applied to a higher-order observable — an observable whose elements are themselves valid observable inputs — mergeAll flattens each element it receives:

published.pipe(buffer(notifier), take(1), mergeAll())

Finally, after adding our delayed stream to the concat call, the operator is finished:

import { concat, Observable, OperatorFunction } from "rxjs";
import { buffer, mergeAll, publish, take } from "rxjs/operators";

function delayUntil<T>(notifier: Observable<any>): OperatorFunction<T, T> {
  return source =>
    source.pipe(
      publish(published =>
        concat(
          published.pipe(buffer(notifier), take(1), mergeAll()),
          published
        )
      )
    );
}

Of course, it’s a good idea to write some tests to ensure that the operator behaves as expected, too.


After publishing this, a bug was found — yes, I missed a test case and got the expectation wrong in another.

With the above implementation, if a source completes before the signal is received, any buffered notifications are emitted at the time of completion. That’s incorrect; they should be delayed until the signal is received — as that’s how the delay operator behaves.

To fix the bug, we need to prevent the source’s completion from closing the buffer. We can do this by concatenating NEVER to the published source — doing so ignores any completion notification received from published:

concat(published, NEVER).pipe(buffer(notifier), take(1), mergeAll())

With the bug fixed, the operator looks like this:

import { concat, Observable, OperatorFunction } from "rxjs";
import { buffer, mergeAll, publish, take } from "rxjs/operators";

function delayUntil<T>(notifier: Observable<any>): OperatorFunction<T, T> {
  return source =>
    source.pipe(
      publish(published =>
        concat(
          concat(published, NEVER).pipe(buffer(notifier), take(1), mergeAll()),
          published
        )
      )
    );
}

Unfortunately, there is another bug in the above implementation. It was found when a marble test was added for an edge case: a notifier that completes without emitting.

The bug is caused by the built-in buffer operator incorrectly treating the completion of its notifier as a signal. buffer is not the only operator that treats complete notifications this way; delayWhen exhibits the same behaviour. Altering the behaviour will be a breaking change — to be made in the next major version of RxJS.

We could fix the problem by ignoring notifier completion — by concatenating notifier and NEVER — but that would result in values being buffered unnecessarily: if the notifier completes without signalling, we know that we are never going to emit delayed values.

Rather than complicate the implementation with an elaborate arrangement of built-in operators — to work around the buffer bug — let’s get rid of buffer and build our own pre-signal, delayed observable using new Observable, like this:

const delayed = new Observable<T>(subscriber => {
  let buffering = true;
  const buffer: T[] = [];
  const subscription = new Subscription();
  /* todo */
  return subscription;
};

We’ll subscribe to published and for as long as the observable should be buffering, we’ll store values in the buffer. And we’ll pass any received error notification to the subscriber:

const delayed = new Observable<T>(subscriber => {
  let buffering = true;
  const buffer: T[] = [];
  const subscription = new Subscription();
  subscription.add(
    published.subscribe(
      value => buffering && buffer.push(value),
      error => subscriber.error(error)
    )
  );
  /* todo */
  return subscription;
};

We also need to subscribe to the notifier.

When the notifier emits a signal, we want to emit each of the buffered values and then complete. We also want to pass any received error notification to the subscriber. And when the notifier completes, we want to prevent further buffering and clear any buffered values:

const delayed = new Observable<T>(subscriber => {
  let buffering = true;
  const buffer: T[] = [];
  const subscription = new Subscription();
  subscription.add(
    notifier.subscribe(
      () => {
        buffer.forEach(value => subscriber.next(value));
        subscriber.complete();
      },
      error => subscriber.error(error),
      () => {
        buffering = false;
        buffer.length = 0;
      }
    )
  );
  subscription.add(
    published.subscribe(
      value => buffering && buffer.push(value),
      error => subscriber.error(error)
    )
  );
  /* todo */
  return subscription;
};

There is one more thing that we need to do with our pre-signal observable: we need to make sure that the buffer is cleared if an explicit unsubscription occurs. We can do that by adding a teardown function to the subscription, like this:

const delayed = new Observable<T>(subscriber => {
  let buffering = true;
  const buffer: T[] = [];
  const subscription = new Subscription();
  subscription.add(
    notifier.subscribe(
      () => {
        buffer.forEach(value => subscriber.next(value));
        subscriber.complete();
      },
      error => subscriber.error(error),
      () => {
        buffering = false;
        buffer.length = 0;
      }
    )
  );
  subscription.add(
    published.subscribe(
      value => buffering && buffer.push(value),
      error => subscriber.error(error)
    )
  );
  subscription.add(() => {
    buffer.length = 0;
  });
  return subscription;
};

We’ve written more code, but this implementation makes the handling of the notifier and source notifications a little clearer — certainly clearer than working around the bug in buffer.

The finished delayUntil operator looks like this:

import { concat, Observable, OperatorFunction, Subscription } from "rxjs";
import { publish } from "rxjs/operators";

export function delayUntil<T>(
  notifier: Observable<any>
): OperatorFunction<T, T> {
  return source =>
    source.pipe(
      publish(published => {
        const delayed = new Observable<T>(subscriber => {
          let buffering = true;
          const buffer: T[] = [];
          const subscription = new Subscription();
          subscription.add(
            notifier.subscribe(
              () => {
                buffer.forEach(value => subscriber.next(value));
                subscriber.complete();
              },
              error => subscriber.error(error),
              () => {
                buffering = false;
                buffer.length = 0;
              }
            )
          );
          subscription.add(
            published.subscribe(
              value => buffering && buffer.push(value),
              error => subscriber.error(error)
            )
          );
          subscription.add(() => {
            buffer.length = 0;
          });
          return subscription;
        });
        return concat(delayed, published);
      })
    );
}

Nicholas Jamieson’s personal blog.
Mostly articles about RxJS, TypeScript and React.
TwitterGitHubSponsor

© 2020 Nicholas Jamieson All Rights ReservedRSS