import type { OnDestroy } from '@angular/core';
import { ApplicationRef, Inject, Injectable } from '@angular/core';
import { Auth } from '@freelancer/auth';
import type { AuthState } from '@freelancer/auth/interface';
import { duplicateFirst, reemitWhen, select } from '@freelancer/operators';
import type { Interface } from '@freelancer/types';
import { QueueSubject, isDefined, toObservable } from '@freelancer/utils';
import { UntilDestroy, untilDestroyed } from '@ngneat/until-destroy';
import { Actions } from '@ngrx/effects';
import { Store } from '@ngrx/store';
import { ErrorCodeApi } from 'api-typings/errors/errors';
import type { Observable } from 'rxjs';
import {
  Subscription,
  combineLatest,
  firstValueFrom,
  from,
  isObservable,
  of,
} from 'rxjs';
import {
  distinctUntilChanged,
  filter,
  map,
  pairwise,
  share,
  shareReplay,
  startWith,
  switchMap,
  take,
  takeWhile,
  tap,
  withLatestFrom,
} from 'rxjs/operators';
import type {
  RequestDataAction,
  RequestDataPayload,
  TypedAction,
} from './actions';
import { isApiFetchSuccessAction } from './actions';
import { StoreBackend } from './backend';
import { DatastoreCollection } from './datastore-collection';
import { DatastoreDocument } from './datastore-document';
import { REFETCH_CONFIG } from './datastore.config';
import { LOGGED_OUT_KEY, RefetchConfig } from './datastore.interface';
import { arrayIsShallowEqual, setDiff } from './helpers';
import { DatastoreMissingModuleError } from './missing-module-error';
import type {
  DocumentOptionsObject,
  DocumentQuery,
  NullQuery,
  Query,
} from './query';
import { emptyQueryObject, isDocumentOptionsObject } from './query';
import { REQUEST_DATA_CONFIG, RequestDataConfig } from './request-data';
import type { RequestError } from './request-status-handler.service';
import {
  RequestStatusHandler,
  requestStatusesEqual,
} from './request-status-handler.service';
import type { BackendPushResponse } from './store-backend.model';
import {
  flattenQuery,
  getEqualOrInIdsFromQuery,
  isIdQuery,
  isNullRef,
  removeEqualOrInIdsFromQuery,
  selectDocumentsForReference,
  stringifyReference,
} from './store.helpers';
import type {
  ApproximateTotalCountType,
  DatastoreCollectionType,
  DatastoreFetchCollectionType,
  DatastorePushCollectionType,
  Path,
  PushDocumentType,
  QueryParam,
  QueryParams,
  Reference,
  StoreState,
  UserCollectionStateSlice,
} from './store.model';
import { WebSocketService } from './websocket';

@UntilDestroy({ className: 'Datastore' })
@Injectable()
export class Datastore implements OnDestroy {
  private isInitiallyStable$: Observable<boolean>;
  private subscriptions = new Subscription();
  private readonly refetchCollectionsQueue$ = new QueueSubject<
    readonly DatastoreCollectionType['Name'][]
  >();
  private readonly refetchCollections$: Observable<
    readonly DatastoreCollectionType['Name'][]
  > = this.refetchCollectionsQueue$.asObservable().pipe(share());

  constructor(
    private store$: Store<StoreState>,
    private action$: Actions<TypedAction>,
    private storeBackend: StoreBackend,
    private webSocketService: WebSocketService,
    private requestStatusHandler: RequestStatusHandler,
    private auth: Auth,
    private appRef: ApplicationRef,
    @Inject(REQUEST_DATA_CONFIG) private requestDataConfig: RequestDataConfig,
    @Inject(REFETCH_CONFIG) private refetchConfig: RefetchConfig,
  ) {
    // This takes values until the first true, emitting the true
    this.isInitiallyStable$ = this.appRef.isStable.pipe(
      takeWhile(val => !val, true),
      startWith(false),
      shareReplay({ bufferSize: 1, refCount: true }),
    );
    // We need to make this observable hot so we get the initial version rather
    // than the current version when we subscribe in the code afterwards.
    this.subscriptions.add(this.isInitiallyStable$.subscribe());
  }

  ngOnDestroy(): void {
    this.subscriptions.unsubscribe();
  }

  isCollectionWhitelistedForRefetch(collectionName: string): boolean {
    return this.refetchConfig.supportedCollections.includes(collectionName);
  }

  /**
   * Returns a collection of documents, customised by query.
   *
   * @param collectionName Collection name
   * @param queryFn Query to refine results
   */
  collection<C extends DatastoreCollectionType>(
    collectionName: C['Name'],
    queryFn?: (q: Query<C>) => Query<C> | Observable<Query<C> | NullQuery>,
  ): DatastoreCollection<C>;
  collection<C extends DatastoreCollectionType>(
    collectionName: C['Name'],
    ids$: Observable<readonly number[]> | Observable<readonly string[]>,
  ): DatastoreCollection<C>;
  collection<C extends DatastoreCollectionType>(
    collectionName: C['Name'],
    queryFnOrIds$?:
      | ((q: Query<C>) => Query<C> | Observable<Query<C> | NullQuery>)
      | Observable<readonly number[]>
      | Observable<readonly string[]>,
  ): DatastoreCollection<C> {
    const flattenedQuery$ = flattenQuery<C>(
      isObservable(queryFnOrIds$)
        ? query => query.where('id', 'in', queryFnOrIds$)
        : queryFnOrIds$,
    );

    const refStream$: Observable<Reference<C>> = combineLatest([
      toObservable(collectionName),
      toObservable(this.auth.authState$),
      flattenedQuery$,
    ]).pipe(
      map(
        ([
          collection,
          authState,
          { limit, queryParams, searchQueryParams, order },
        ]) => ({
          path: {
            collection,
            authUid: authState ? authState.userId : LOGGED_OUT_KEY,
          },
          query: {
            limit,
            queryParams,
            searchQueryParams,
            isDocumentQuery: false,
          },
          order,
        }),
      ),
    );

    const refetchFromReconnect$ = this.refetchFromReconnect(collectionName);
    const manualRefetch$ = this.manualRefetch(collectionName);

    const refSteamDelta$ = refStream$.pipe(
      tap(({ path: { collection } }) => {
        if (!this.storeBackend.isFeatureLoaded(collection)) {
          throw new DatastoreMissingModuleError(collection);
        }
      }),
      /*
       * To fetch the delta, we need to compare the current query with the previous query.
       * To achieve that, we append the request stream with an empty query using `duplicateFirst`,
       * then comparing with `pairwise`, before finally returning just current request.
       */
      duplicateFirst(ref => ({
        ...ref,
        query: { ...emptyQueryObject, isDocumentQuery: false },
      })),
      pairwise(), // Use pairwise to calculate and fetch new objects only
      switchMap(refs => {
        const [
          {
            path: { collection, authUid },
          },
        ] = refs;
        return this.store$.pipe(
          select(collection, authUid),
          take(1),
          withLatestFrom(this.isInitiallyStable$),
          map(([data, appIsStable]) => ({ refs, data, appIsStable })),
        );
      }),
    );

    /**
     * Turns references into requests and sends actions for those requests
     * which are processed in `request-data.effect.ts`.
     *
     * This has special logic for queries that are only searching by `id`.
     * We turn these requests into `document` requests which means:
     *  - they can be batched
     *  - they can return partial results
     *  - they don't need to be stored in the queries list in the NgRx store.
     * For these `id` queries we also only fetch the new `ids` (the delta).
     *
     * Note we always move the `ids` from the query to the `ids`
     * to simplify writing backend files.
     */
    const requestStream$: Observable<RequestDataPayload<C> | undefined> =
      refSteamDelta$.pipe(
        reemitWhen(refetchFromReconnect$, manualRefetch$),
        map(([{ refs, data, appIsStable }, fromReconnect]) => {
          const [origRef, ref] = refs;

          // Skip dispatching an action to request data on a non-null query
          if (isNullRef(ref)) {
            return undefined;
          }

          const clientRequestIds = [this.generateClientRequestId()];

          // If the new query isn't just filtering by ids, move any IDs asked for
          // into the `ids` in the `path` and out of the `query`
          if (!isIdQuery(ref.query)) {
            const request: RequestDataPayload<C> = {
              type: ref.path.collection,
              ref: {
                ...ref,
                query: removeEqualOrInIdsFromQuery(ref.query),
                path: {
                  ...ref.path,
                  ids: getEqualOrInIdsFromQuery(ref.query),
                },
              },
              clientRequestIds,
              isRefetch: fromReconnect,
            };

            // Only dispatch request if either:
            // - App is already stable
            // - Data for query is not in store
            // - Data is not transferred from SSR
            if (
              appIsStable ||
              !data?.queries[stringifyReference(request.ref)]
                ?.isTransferredFromSSR
            ) {
              this.store$.dispatch({ type: 'REQUEST_DATA', payload: request });
            }

            return request;
          }

          const idsNew = getEqualOrInIdsFromQuery(ref.query);

          // If the new query is just filtering by ids, but the original query wasn't,
          // then move the `ids` into the path and fetch all the ids
          if (!isIdQuery(origRef.query)) {
            const request: RequestDataPayload<C> = {
              type: ref.path.collection,
              ref: {
                ...ref,
                path: { ...ref.path, ids: idsNew },
                query: undefined, // don't merge in the new query, just use `path.ids`
              },
              clientRequestIds,
              isRefetch: fromReconnect,
            };

            // Only dispatch request if either:
            // - App is already stable
            // - Not all store data came from SSR hydration
            if (appIsStable || !this.ssrDataAlreadyInStore(idsNew, data)) {
              this.store$.dispatch({ type: 'REQUEST_DATA', payload: request });
            }

            return request;
          }

          /*
         If both are just fetching by ids, then only fetch:
         - The delta. If a new bid comes in there's no need to fetch all bidders,
           instead just fetch the new bidder.
         - Documents that have not been recently fetched. This is useful for
           projections where multiple collections are populated with one
           datastore call, e.g. `threads_get` with a `user_details` projection.
           This is different to the dedupe in RequestDataEffect, which is
           collection-specific and dedupes requests that are still in flight.
         */
          const idsOrig = getEqualOrInIdsFromQuery(origRef.query);

          const { dedupeWindowTime } = this.requestDataConfig;

          const idsDelta = setDiff(idsNew ?? [], idsOrig ?? []);

          const idsToFetch = (fromReconnect ? idsNew ?? [] : idsDelta)
            .filter(
              // Fetch old or missing documents
              id =>
                !data ||
                !data.documents[id] ||
                dedupeWindowTime <= 0 ||
                data.documents[id].timeFetched + dedupeWindowTime < Date.now(),
            )
            .filter(
              // Filter out SSR transferred objects if app is not stable yet
              id => appIsStable || !data?.documents[id]?.isTransferredFromSSR,
            );

          // Only ask the backend for the ids that need fetching (the delta/not recently fetched)
          this.store$.dispatch<RequestDataAction<C>>({
            type: 'REQUEST_DATA',
            payload: {
              type: ref.path.collection,
              ref: {
                ...ref,
                path: { ...ref.path, ids: idsToFetch },
                query: undefined,
              },
              clientRequestIds,
              isRefetch: fromReconnect,
            },
          });

          // But the request is for **all** the ids in the query
          return {
            type: ref.path.collection,
            ref: {
              ...ref,
              path: { ...ref.path, ids: idsNew },
              query: undefined,
            },
            clientRequestIds,
            isRefetch: fromReconnect,
          };
        }),
        shareReplay({ bufferSize: 1, refCount: true }),
      );

    const sourceStream$ = requestStream$.pipe(
      tap(request => {
        if (request) {
          this.requestStatusHandler.update(request, { ready: false });
        }
      }),
      switchMap(request => {
        if (!request) {
          // immediately emit on null query
          return of({ documentsWithMetadata: [], request });
        }
        const {
          ref,
          ref: {
            path: { collection, authUid },
          },
        } = request;

        return this.store$.pipe(
          // Grab the collection items from the store
          select(collection, authUid),
          filter(isDefined),
          map(storeSlice =>
            selectDocumentsForReference<C>(
              // This needs double cast because the store is a map of string to generic collection
              // and there's no type guarantee StoreState[collectionName] actually has a collection C
              storeSlice as unknown as UserCollectionStateSlice<C>,
              ref,
              this.storeBackend.defaultOrder(collection),
            ),
          ),
          filter(isDefined),
          distinctUntilChanged((queryResult1, queryResult2) =>
            arrayIsShallowEqual(
              queryResult1.documentsWithMetadata,
              queryResult2.documentsWithMetadata,
            ),
          ),
          tap(() => {
            if (request) {
              // Update request status here on fetching from either network or cache
              this.requestStatusHandler.update(request, { ready: true });
            }
          }),
          map(({ documentsWithMetadata, timeFetched, timeUpdated }) => ({
            documentsWithMetadata,
            request,
            timeFetched,
            timeUpdated,
          })),
        );
      }),
    );

    const idsToSubscribe$ = refStream$.pipe(
      map(ref => isIdQuery(ref.query)),
      switchMap(isQueryById => {
        if (isQueryById) {
          // When fetching by id, we subscribe to the web socket using the ids requested (not the ids in the response)
          return requestStream$.pipe(
            filter(isDefined),
            map(request => request.ref.path.ids),
            filter(isDefined),
          );
        }
        // When *not* fetching by id, we subscribe to the web socket using the ids in the response (as we don't know the ids when requesting)
        return sourceStream$.pipe(
          map(storeSlice =>
            storeSlice.documentsWithMetadata.map(doc =>
              doc.rawDocument.id.toString(),
            ),
          ),
        );
      }),
    );

    const subscribedSourceStream$ = sourceStream$.pipe(
      withLatestFrom(
        // Subscribe to the output of `subscribeToIds()` while `sourceStream$` is subscribed to
        this.storeBackend.isSubscribable(collectionName)
          ? this.webSocketService
              .subscribeToIds(collectionName, idsToSubscribe$)
              .pipe(startWith(undefined))
          : of(undefined),
      ),
      map(([storeSlice]) => storeSlice),
      shareReplay({ bufferSize: 1, refCount: true }),
    );

    const approximateTotalCountStream$: Observable<
      ApproximateTotalCountType<C> | undefined
    > = requestStream$.pipe(
      switchMap(request => {
        if (!request) {
          return of(undefined); // immediately emit on null query
        }

        const {
          ref,
          ref: {
            path: { collection, authUid },
          },
        } = request;

        return this.store$.pipe(
          select(collection, authUid),
          filter(isDefined),
          map(storeSlice => {
            const queryString = stringifyReference(ref);
            return storeSlice.queries[queryString]
              ? storeSlice.queries[queryString].approximateTotalCount
              : undefined;
          }),
        );
      }),
      shareReplay({ bufferSize: 1, refCount: true }),
    );

    const statusStream$ = combineLatest([
      requestStream$.pipe(
        switchMap(
          request =>
            request
              ? this.requestStatusHandler.get$(request)
              : from([{ ready: false }, { ready: true }]), // Null requests are always ready
        ),
      ),
      // combining with sourceStream to set ready flag to false on initial subscribe
      subscribedSourceStream$.pipe(startWith(undefined)),
    ]).pipe(
      map(([status]) => status),
      distinctUntilChanged(requestStatusesEqual),
      shareReplay({ bufferSize: 1, refCount: true }),
    );

    return new DatastoreCollection(
      this.requestDataConfig,
      refStream$,
      this.storeBackend,
      statusStream$,
      combineLatest([
        statusStream$.pipe(startWith(undefined)),
        subscribedSourceStream$,
      ]).pipe(
        map(([, source]) => source),
        distinctUntilChanged(),
      ),
      approximateTotalCountStream$,
    );
  }

  /**
   * Returns a single document from a collection specified by id.
   * If `documentId` is an observable, new documents may be fetched from the
   * network and emitted as they arrive. Note that if it emits too quickly,
   * such that the network request has not completed before the id changes,
   * that document with the previous id will not be emitted.
   *
   * @param collectionName Collection name
   * @param documentId$ If not provided, defaults to the first document of the collection.
   * Omitting this is useful for logged-out requests when document ID is not applicable
   * @param query Only required if querying by a unique secondary ID
   */
  document<C extends DatastoreCollectionType>(
    collectionName: C['Name'],
    documentId$?: string | number | Observable<string> | Observable<number>,
  ): DatastoreDocument<C>;
  document<
    C extends DatastoreCollectionType,
    OtherId extends keyof C['DocumentType'] = keyof C['DocumentType'],
  >(
    collectionName: C['Name'],
    documentSecondaryId$:
      | C['DocumentType'][OtherId]
      | Observable<C['DocumentType'][OtherId]>,
    documentQueryOrOptionObject$:
      | DocumentQuery<C, OtherId>
      | Observable<DocumentQuery<C, OtherId> | undefined>
      | DocumentOptionsObject<DocumentQuery<C, OtherId>, C['ResourceGroup']>,
  ): DatastoreDocument<C>;

  document<
    C extends DatastoreCollectionType,
    OtherId extends keyof C['DocumentType'],
  >(
    collectionName: C['Name'],
    documentId$?: string | number | Observable<string> | Observable<number>,
    documentQueryOrOptionObject$?:
      | DocumentQuery<C, OtherId>
      | Observable<DocumentQuery<C, OtherId> | undefined>
      | DocumentOptionsObject<DocumentQuery<C, OtherId>, C['ResourceGroup']>,
  ): DatastoreDocument<C> {
    // Extract query$ and resourceGroup from documentQueryOrOptionObject$
    // depending on what is given.
    const { query$, resourceGroup$ } =
      documentQueryOrOptionObject$ &&
      isDocumentOptionsObject<DocumentQuery<C, OtherId>, C['ResourceGroup']>(
        documentQueryOrOptionObject$,
      )
        ? documentQueryOrOptionObject$
        : {
            query$: documentQueryOrOptionObject$,
            resourceGroup$: undefined,
          };

    const refStream$: Observable<Reference<C>> = combineLatest([
      toObservable(collectionName),
      toObservable(this.auth.authState$),
      toObservable<number | string | undefined>(documentId$).pipe(
        distinctUntilChanged(),
      ),
      toObservable(query$).pipe(
        distinctUntilChanged(
          (a, b) =>
            !!a &&
            !!b &&
            a.caseInsensitive === b.caseInsensitive &&
            a.index === b.index,
        ),
      ),
    ]).pipe(
      map(([collection, authState, id, documentQuery]) => {
        const path = {
          collection,
          authUid: authState ? authState.userId : LOGGED_OUT_KEY,
        };

        if (!id) {
          // Treat as a document query with no params
          return {
            path,
            query: {
              isDocumentQuery: true,
              limit: 1,
              queryParams: {},
              searchQueryParams: {},
            },
          };
        }

        return documentQuery
          ? {
              path,
              query: {
                isDocumentQuery: true,
                limit: 1,
                queryParams: {
                  [documentQuery.index]: [
                    {
                      name: documentQuery.index,
                      condition: documentQuery.caseInsensitive
                        ? 'equalsIgnoreCase'
                        : '==',
                      value: id,
                    } as QueryParam<C['DocumentType'], OtherId>,
                  ],
                  // FIXME: T267853 - shouldn't need the double cast
                } as unknown as QueryParams<C['DocumentType']>,
                searchQueryParams: {},
              },
            }
          : {
              path: { ...path, ids: [id.toString()] },
            };
      }),
    );

    const refetchFromReconnect$ = this.refetchFromReconnect(collectionName);
    const manualRefetch$ = this.manualRefetch(collectionName);

    const requestStream$ = combineLatest([
      refStream$,
      toObservable(resourceGroup$),
    ]).pipe(
      reemitWhen(refetchFromReconnect$, manualRefetch$),
      withLatestFrom(this.isInitiallyStable$),
      switchMap(([[[ref, resourceGroup], fromReconnect], appIsStable]) =>
        // We need store slice here to get current document
        this.store$.pipe(
          select(
            collectionName,
            ref.path.authUid ? ref.path.authUid : LOGGED_OUT_KEY,
          ),
          take(1),
          map(store => ({
            appIsStable,
            currentDocument: ref.path.ids
              ? store?.documents[ref.path.ids[0]]
              : undefined,
            request: {
              type: ref.path.collection,
              ref,
              clientRequestIds: [this.generateClientRequestId()],
              resourceGroup,
              isRefetch: fromReconnect,
            },
          })),
        ),
      ),
      tap(({ appIsStable, currentDocument, request }) => {
        if (!this.storeBackend.isFeatureLoaded(collectionName)) {
          throw new DatastoreMissingModuleError(collectionName);
        }

        const action = {
          type: 'REQUEST_DATA',
          payload: request,
        };

        // Only dispatch request if either:
        // - App is already stable
        // - Data is not in store
        // - Data is not transferred from SSR
        if (
          appIsStable ||
          !currentDocument ||
          !currentDocument.isTransferredFromSSR
        ) {
          this.store$.dispatch(action);
        }
      }),
      map(({ request }) => request),
      shareReplay({ bufferSize: 1, refCount: true }),
    );

    const sourceStream$ = requestStream$.pipe(
      tap(request =>
        this.requestStatusHandler.update(request, {
          ready: false,
        }),
      ),
      switchMap(request => {
        const {
          ref,
          ref: {
            path: { collection, authUid },
          },
        } = request;
        return this.store$.pipe(
          select(collection, authUid),
          filter(isDefined),
          map(storeSlice =>
            // FIXME: T267853 - needs double cast because the store is a map of string to generic collection
            // and there's no type guarantee StoreState[collectionName] actually has a collection C
            this.fetchSingleDocument(
              storeSlice as unknown as UserCollectionStateSlice<C>,
              ref,
            ),
          ),

          // Returns an observable that switches between emitting the input document
          // object and another observable.
          //  - If the document object exists, the observable will emit the document
          //    object using the `of` operator.
          //  - Otherwise, it will emit an observable created by piping the action$
          //    observable through a chain of operators that filters for successful
          //    fetch actions and then checks if the requested items were found
          //    in the response payload. If the requested items are not found, it
          //    will update the request status with an error code and emit an undefined
          //    using the `map` operator. The observable then filters out any emitted values
          //    that are undefined using the `filter` operator.
          switchMap(document =>
            document
              ? of(document)
              : this.action$.pipe(
                  // Because the missing items check in RequestDataEffect can fail
                  // when requests are batched, this is a hacky way to check that the
                  // item asked for in the request does not exist even after a
                  // "successful" fetch.
                  // TODO: T267853 - If the document is cached but then deleted this check is
                  // not sufficient - need to address other actions that mutate store
                  filter(
                    action =>
                      isApiFetchSuccessAction(action) &&
                      request.clientRequestIds.every(id =>
                        action.payload.clientRequestIds.some(
                          actionId => actionId === id,
                        ),
                      ),
                  ),
                  tap(action => {
                    // TODO: T267853 - replace with ErrorHandler
                    console.warn(
                      `Fetch succeeded but document(s) not found from document call to '${request.type}'. Result was`,
                      action.payload.result,
                    );
                    this.requestStatusHandler.update(request, {
                      ready: false,
                      error: {
                        errorCode: ErrorCodeApi.NOT_FOUND,
                      } as unknown as C extends DatastoreFetchCollectionType
                        ? RequestError<C>
                        : never,
                    });
                  }),
                  map(() => undefined),
                  filter(isDefined),
                ),
          ),

          // Emissions must be distinct beforehand the status handler
          // gets called and updates the request status, otherwise statusStream$
          // receives emissions from the `RequestStatusHandler::get$(request)`.
          distinctUntilChanged(),
          tap(() => {
            this.requestStatusHandler.update(request, {
              ready: true,
            });
          }),
        );
      }),
    );

    const idsToSubscribe$ = refStream$.pipe(
      map(ref => !!ref.path.ids || isIdQuery(ref.query)),
      switchMap(isQueryById => {
        if (isQueryById) {
          // When fetching by id, we subscribe to the web socket using the ids requested (not the ids in the response)
          return requestStream$.pipe(
            map(
              request =>
                request.ref.path.ids ||
                getEqualOrInIdsFromQuery(request.ref.query),
            ),
            filter(isDefined),
          );
        }
        // When *not* fetching by id, we subscribe to the web socket using the ids in the response (as we don't know the ids when requesting)
        return sourceStream$.pipe(map(document => [document.id.toString()]));
      }),
    );

    const subscribedSourceStream$ = sourceStream$.pipe(
      withLatestFrom(
        // Subscribe to the output of `subscribeToIds()` while `sourceStream$` is subscribed to
        this.storeBackend.isSubscribable(collectionName)
          ? this.webSocketService
              .subscribeToIds(collectionName, idsToSubscribe$)
              .pipe(startWith(undefined))
          : of(undefined),
      ),
      map(([document]) => document),
      shareReplay({ bufferSize: 1, refCount: true }),
    );

    const statusStream$ = combineLatest([
      requestStream$.pipe(
        switchMap(request => this.requestStatusHandler.get$(request)),
      ),
      // combining with sourceStream to set ready flag to false on initial subscribe
      subscribedSourceStream$.pipe(startWith(undefined)),
    ]).pipe(
      map(([status]) => status),
      distinctUntilChanged(requestStatusesEqual),
      shareReplay({ bufferSize: 1, refCount: true }),
    );

    return new DatastoreDocument(
      refStream$,
      this.storeBackend,
      statusStream$,
      combineLatest([
        statusStream$.pipe(startWith(undefined)),
        subscribedSourceStream$,
      ]).pipe(
        map(([, source]) => source),
        distinctUntilChanged(),
      ),
    );
  }

  /**
   * Triggers a refetch of the specified collection names from the backend datastore.
   *
   * @deprecated Only whitelisted services can use refetch. Contact UI eng or FEI for more info.
   */
  // TODO: T293115 - Create a lint rule to whitelist services instead of using `@deprecated`
  refetch(collectionNames: readonly DatastoreCollectionType['Name'][]): void {
    this.refetchCollectionsQueue$.next(collectionNames);
  }

  private fetchSingleDocument<C extends DatastoreCollectionType>(
    storeSlice: UserCollectionStateSlice<C>,
    ref: Reference<C>,
  ): C['DocumentType'] | undefined {
    const defaultOrder = this.storeBackend.defaultOrder<C>(ref.path.collection);
    const queryResult = selectDocumentsForReference(
      storeSlice,
      ref,
      defaultOrder,
    );
    return queryResult?.documentsWithMetadata[0]?.rawDocument;
  }

  private ssrDataAlreadyInStore(
    ids: readonly (number | string)[] = [],
    data: UserCollectionStateSlice<DatastoreCollectionType> | undefined,
  ): boolean {
    return ids.every(id => data?.documents[id]?.isTransferredFromSSR);
  }

  /**
   * Creates a single document.
   */
  createDocument<
    C extends DatastoreCollectionType & DatastorePushCollectionType,
  >(
    collectionName: C['Name'],
    document: PushDocumentType<C> & {
      readonly id?: number | string;
    },
    extra?: { readonly [index: string]: string | number | object },
  ): Promise<BackendPushResponse<C>> {
    return firstValueFrom(
      combineLatest([toObservable(collectionName), this.auth.authState$]).pipe(
        map(([collection, authState]: [C['Name'], AuthState | undefined]) => {
          const path: Path<C> = {
            collection,
            authUid: authState ? authState.userId : LOGGED_OUT_KEY,
          };
          return { path };
        }),
        take(1),
        switchMap(ref => this.storeBackend.push(ref, document, extra)),
        untilDestroyed(this),
      ),
    );
  }

  private generateClientRequestId(): string {
    return Math.random()
      .toString(36)
      .substring(2, 2 + 16);
  }

  /**
   * Returns a stream that emits if we should refetch when the websocket reconnects.
   */
  private refetchFromReconnect(
    collectionName: DatastoreCollectionType['Name'],
  ): Observable<void> {
    return this.webSocketService.reconnected$.pipe(
      filter(() => this.isCollectionWhitelistedForRefetch(collectionName)),
      shareReplay({ bufferSize: 1, refCount: true }),
    );
  }

  /**
   * Returns a stream that emits if a manual refetch has been triggered on the collection.
   */
  private manualRefetch(
    collectionName: DatastoreCollectionType['Name'],
  ): Observable<void> {
    return this.refetchCollections$.pipe(
      filter(collections => collections.includes(collectionName)),
      map(() => undefined),
    );
  }
}

export type DatastoreInterface = Interface<Datastore>;
