import {
  defer,
  distinctUntilChanged,
  filter,
  firstValueFrom,
  Observable,
  shareReplay,
} from 'rxjs';

interface AbstractStore<TState> {
  getValue(): TState;
}

interface AbstractQuery<TState> {
  select<TValue>(selector: (state: TState) => TValue): Observable<TValue>;
}

/**
 * Create an Akita query, enhanced with `Reader` capabilities.
 * @param store
 * @param query
 * @param selector
 * @returns
 */
export function selectReader<TState, TValue>(
  store: AbstractStore<TState>,
  query: AbstractQuery<TState>,
  selector: (state: TState) => TValue
): Reader<TValue> {
  const stream$ = defer(() =>
    query
      .select((s) => selector(s))
      .pipe(
        distinctUntilChanged(),
        shareReplay({ bufferSize: 1, refCount: true })
      )
  ) as Reader<TValue>;

  stream$.getSnapshot = () => selector(store.getValue());
  stream$.untilNext = (options?: { ignoreUndefined?: boolean }) =>
    firstValueFrom(
      stream$.pipe(
        filter((value) => {
          if (options?.ignoreUndefined && value === undefined) {
            return false;
          } else {
            return true;
          }
        })
      )
    );

  return stream$;
}

/**
 * An `Observable` instance with enhanced capabilities.
 */
export interface Reader<T> extends Observable<T> {
  /**
   * Get the current representation of the store. When
   * never emitted results in `undefined`.
   */
  getSnapshot: () => T | undefined;
  /**
   * Get the next representation of the store. When
   * never emitted, waits for the next emission. When
   * replayed, emits the latest emission.
   */
  untilNext: (options?: { ignoreUndefined?: boolean }) => Promise<T>;
}
