Home Reference Source

src/utils/rxjs-operators.ts

import {OperatorFunction} from 'rxjs';
import {buffer, debounceTime, filter, share, throttleTime} from 'rxjs/operators';

type BufferDebounce = <T>(time: number) => OperatorFunction<T, T[]>;
type BufferAfterEvent = <T>(time: number) => OperatorFunction<T, T[]>;

/**
 * Emits an array of the values emitted by the source after t seconds without an emission.
 */
export const bufferDebounce: BufferDebounce = time => (source) => {
	const sharedSource = source.pipe(share());
	return sharedSource.pipe(
		buffer(sharedSource.pipe(debounceTime(time))),
		filter(arr => arr.length > 0),
	);
};

/**
 * Emits an array of the values emitted by the source for t seconds after an emission.
 */
export const bufferDuringTime: BufferAfterEvent = time => source =>
	source.pipe(
		buffer(source.pipe(throttleTime(time), debounceTime(time))),
		filter(arr => arr.length > 0),
	);