src/utils/rxjs-operators.ts
import {OperatorFunction} from 'rxjs';
import {buffer, debounceTime, filter, share, throttleTime} from 'rxjs/operators';
type BufferDebounce = <T>(time: number) => OperatorFunction<T, T[]>;
type BufferAfterEvent = <T>(time: number) => OperatorFunction<T, T[]>;
/**
* Emits an array of the values emitted by the source after t seconds without an emission.
*/
export const bufferDebounce: BufferDebounce = time => (source) => {
const sharedSource = source.pipe(share());
return sharedSource.pipe(
buffer(sharedSource.pipe(debounceTime(time))),
filter(arr => arr.length > 0),
);
};
/**
* Emits an array of the values emitted by the source for t seconds after an emission.
*/
export const bufferDuringTime: BufferAfterEvent = time => source =>
source.pipe(
buffer(source.pipe(throttleTime(time), debounceTime(time))),
filter(arr => arr.length > 0),
);