import { Observable, Observer } from 'rxjs';

/**
 * Returns an observable that emits each stream value with emitCondition.
 * Creates a subscription to stream only when the previous stream array value length is 0.
 * Otherwise, it emits the next value from the previous stream array.
 * @param stream {Observable<Array<T>>} The stream of array values.
 * @param emitCondition {(value: T) => boolean} The condition to emit the value.
 *
 * @example
 * const stream: Observable<["1", "2", "3", "4"]>;
 * const emitCondition = (value: string) => value === "2" || value === "4";
 * const example = flatAndShareReplay(stream, emitCondition);
 * example.subscribe(val => console.log(val));
 * // 2
 * example.subscribe(val => console.log(val));
 * // 4
 * example.subscribe(val => console.log(val));
 * // 2
 */
export function flatAndShareReplay<T>(
  stream: Observable<Array<T>>,
  emitCondition: (value: T) => boolean
): Observable<T> {
  let items: T[] = [];
  return new Observable((observer: Observer<T>) => {
    const emitItem = () => {
      let item = items.shift();
      while (items.length && item && !emitCondition(item)) {
        item = items.shift();
      }
      if (item) {
        observer.next(item);
      }
    };

    if (!!items.length) {
      emitItem();
      observer.complete();
    } else {
      stream.subscribe(
        (res) => {
          items = res;
          emitItem();
        },
        (error) => observer.error(error),
        () => observer.complete()
      );
    }
  });
}
