import type { Observable, SchedulerLike } from 'rxjs';
import { asyncScheduler } from 'rxjs';
import { filter, map, mergeAll } from 'rxjs/operators';
import type { RequestDataPayload } from '../../actions';
import type {
  RequestStatusHandler,
  RequestStatusWithoutRetry,
} from '../../request-status-handler.service';
import { stringifyReference } from '../../store.helpers';
import type { DatastoreCollectionType } from '../../store.model';

/**
 * Deduplicates requests by comparing them with requests received in the last
 * `windowTime` ms. Also keeps treack of deduped requests in the buffer and
 * maps them to the original request to check if the original request is ready.
 *
 * Requests with ids: (e.g. #objects and #object)
 * If a request asks for a particular object id which has been requested before
 * within the interval, it is removed from the request, and then emitted.
 * If this would result in no object ids remaining, the original request
 * is not emitted. Note that the original request is split such that each
 * output request only contains one id each, all emitted simultaneously.
 * i.e. U1:2:3 => U1,U2,U3
 *
 * Example: With a window size of 3 seconds, a request for user with id 1 (U1),
 * then for ids 2 and 1 (U2:1) two seconds later, then again two seconds later:
 *
 * 0     1     2     3     4     5s
 * U1----------U2:1--------U2:1--|
 *        dedupeRequests(3000)
 * U1----------U2----------U1----|
 *
 * Please visit the `.spec.ts` file for more details + diagrams
 *
 * Requests with queries: (e.g. #list)
 * If a request has the same query as another request within the interval,
 * it is not emitted, including empty query objects.
 *
 * IMPORTANT: Assumes that there is at most one clientRequestId for requests
 * in the input observable. This means deduping must occur BEFORE batching.
 *
 * @param windowTime window interval in milliseconds
 */
export function dedupeRequests<C extends DatastoreCollectionType>(
  windowTime: number,
  maxBufferTime: number,
  dedupeRequestBuffer: Map<
    string,
    {
      readonly payload: RequestDataPayload<any>;
      // The list of requests that were deduped on this matching request
      readonly dedupedPayloads: readonly RequestDataPayload<any>[];
      // The current status of the matching request
      readonly statusObject?: RequestStatusWithoutRetry<any>;
      // The time this matching request was buffered
      readonly bufferTime: number;
    }
  >,
  originalRequestBuffer: Map<
    string,
    {
      // The original request from the request effect payload before it was split by ids
      readonly originPayload: RequestDataPayload<any>;
      // The list of document ids it is waiting on to be returned from the API to be ready
      readonly readyIds: readonly number[];
      // The time this matching request was buffered
      readonly bufferTime: number;
    }
  >,
  scheduler: SchedulerLike = asyncScheduler,
  // Request status handler instance. Can be undefined for unit testing.
  requestStatusHandler?: RequestStatusHandler,
): (
  source$: Observable<RequestDataPayload<C>>,
) => Observable<RequestDataPayload<C>> {
  if (windowTime === 0) {
    return (source$: Observable<RequestDataPayload<C>>) => source$;
  }

  return (source$: Observable<RequestDataPayload<C>>) =>
    source$.pipe(
      map(req => {
        // all requests split into ONE id each
        const splitRequests = splitOnIds(req);
        return splitRequests.map(splitReq => ({ originalReq: req, splitReq }));
      }),
      mergeAll(),
      filter(({ originalReq, splitReq }) => {
        const currTime = scheduler.now();

        // Clean dedupe buffer of of expired requests and set them as errored
        for (const refString of dedupeRequestBuffer.keys()) {
          const bufferedRequestElement = dedupeRequestBuffer.get(refString);
          if (
            bufferedRequestElement &&
            bufferedRequestElement.bufferTime + maxBufferTime < currTime
          ) {
            for (const dedupedPayload of bufferedRequestElement.dedupedPayloads) {
              // We can assume there's only 1 client request id as this is before batching
              const originalRequest = originalRequestBuffer.get(
                dedupedPayload.clientRequestIds[0],
              );

              // Error the request and clean the original request buffer
              if (originalRequest) {
                // Reimplement once we find out how to disable partial returns
                /*
                requestStatusHandler?.update(originalRequest.originPayload, {
                  ready: false,
                  error: {
                    errorCode: ErrorCodeApi.INTERNAL_SERVER_ERROR,
                  },
                });
                */

                originalRequestBuffer.delete(
                  dedupedPayload.clientRequestIds[0],
                );
              }
            }

            // Clean the dedupe request buffer
            dedupeRequestBuffer.delete(refString);
          }
        }

        // Clean original request buffer of expired requests
        for (const clientRequestId of originalRequestBuffer.keys()) {
          const originalRequest = originalRequestBuffer.get(clientRequestId);
          if (
            originalRequest &&
            originalRequest.bufferTime + maxBufferTime < currTime
          ) {
            // Reimplement once we find out how to disable partial returns
            /*
            requestStatusHandler?.update(originalRequest.originPayload, {
              ready: false,
              error: {
                errorCode: ErrorCodeApi.INTERNAL_SERVER_ERROR,
              },
            });
            */

            originalRequestBuffer.delete(clientRequestId);
          }
        }

        const idsOrQueryString = splitReq.ref.path.ids
          ? splitReq.ref.path.ids[0]
          : stringifyReference(splitReq.ref);
        const refString = [
          splitReq.ref.path.collection,
          splitReq.ref.path.authUid,
          idsOrQueryString,
        ].join('~');

        const bufferedRequest = dedupeRequestBuffer.get(refString);

        // If no existing request exists or the existing request is errored, treat the request as new
        if (
          bufferedRequest === undefined ||
          bufferedRequest.statusObject?.error
        ) {
          dedupeRequestBuffer.set(refString, {
            payload: splitReq,
            dedupedPayloads: [],
            statusObject: undefined,
            bufferTime: currTime,
          });

          return true;
        }

        // If buffered request is outside dedupe window, override it instead of deduping
        // We effectively treat this as a new request but move the old deduped requests that were pending to this one.
        if (bufferedRequest.bufferTime + windowTime < currTime) {
          dedupeRequestBuffer.set(refString, {
            payload: splitReq,
            dedupedPayloads: bufferedRequest.dedupedPayloads,
            statusObject: undefined,
            bufferTime: currTime,
          });

          return true;
        }

        // Append the deduped requests to list of existing deduped requests waiting for the API
        // request to return from the leading request in the dedupe buffer
        const bufferedRequestCompleted = bufferedRequest.statusObject?.ready;
        if (!bufferedRequestCompleted) {
          dedupeRequestBuffer.set(refString, {
            ...bufferedRequest,
            dedupedPayloads: [...bufferedRequest.dedupedPayloads, splitReq],
          });
        }

        const originalBufferedRequest = originalRequestBuffer.get(
          splitReq.clientRequestIds[0],
        );
        const isIdQuery =
          !splitReq.ref.query &&
          splitReq.ref.path.ids &&
          splitReq.ref.path.ids[0];

        if (isIdQuery) {
          // If the original request doesn't not already exist in the buffer, add the new original request to the buffer.
          // Append the deduped requested ids to the readyIds if the leading request is already completed.
          let readyIds = originalBufferedRequest
            ? originalBufferedRequest.readyIds
            : [];
          if (bufferedRequestCompleted) {
            readyIds = [...readyIds, Number(splitReq.ref.path.ids[0])];
          }

          if (!originalBufferedRequest || bufferedRequestCompleted) {
            if (
              bufferedRequestCompleted &&
              readyIds.length === originalReq.ref.path.ids?.length
            ) {
              // If we see that all the requested ids are deduped and they are all already completed/ready, then we can just propagate the status now.
              requestStatusHandler?.update(
                originalReq,
                bufferedRequest.statusObject,
              );
            } else {
              // Otherwise, just update the readyIds / add the original request to buffer if not already set
              originalRequestBuffer.set(splitReq.clientRequestIds[0], {
                originPayload: originalReq,
                readyIds,
                bufferTime: currTime,
              });
            }
          }
        } else if (bufferedRequestCompleted) {
          // For query requests, there's no split requests, so if the deduped request is ready then we can just propagate the status
          requestStatusHandler?.update(
            originalReq,
            bufferedRequest.statusObject,
          );
        }

        return false;
      }),
      map(({ originalReq, splitReq }) => splitReq),
    );
}

/**
 * Splits a single request with more than one object id into multiple requests
 * each with one id in its path
 */
export function splitOnIds<C extends DatastoreCollectionType>(
  request: RequestDataPayload<C>,
): readonly RequestDataPayload<C>[] {
  const { path } = request.ref;
  const objectIds = path.ids || [];

  // Don't split requests that don't have `ids` or have a query
  // (Yes requests can have `ids` and other query parameters)
  if (objectIds.length === 0 || request.ref.query !== undefined) {
    return [request];
  }

  return objectIds.reduce((requests: RequestDataPayload<C>[], id) => {
    const splitReq: RequestDataPayload<C> = {
      type: path.collection,
      ref: {
        path: {
          collection: path.collection,
          authUid: path.authUid,
          ids: [id],
        },
      },
      clientRequestIds: request.clientRequestIds,
      resourceGroup: request.resourceGroup,
      isRefetch: request.isRefetch,
    };
    return [...requests, splitReq];
  }, []);
}
