/**
 * A library for working with SockJS websockets.
 *
 * This is somewhat based on rxjs-websockets but with a lot of customisations
 * for freelancer.
 */
import { DOCUMENT } from '@angular/common';
import type { OnDestroy } from '@angular/core';
import { Inject, Injectable, NgZone, Optional } from '@angular/core';
import { Auth } from '@freelancer/auth';
import { startWithEmptyList } from '@freelancer/operators';
import type { Timer } from '@freelancer/time-utils';
import { TimeUtils } from '@freelancer/time-utils';
import { Tracking } from '@freelancer/tracking';
import type { TrackingExtraParams } from '@freelancer/tracking/interface';
import {
  QueueSubject,
  isDefined,
  jsonStableStringify,
  toNumber,
} from '@freelancer/utils';
import { UntilDestroy, untilDestroyed } from '@ngneat/until-destroy';
import { Store } from '@ngrx/store';
import { ResourceTypeApi } from 'api-typings/gotifications/gotifications';
import type { MessageApi } from 'api-typings/messages/messages_types';
import type { Observable, SchedulerLike } from 'rxjs';
import {
  BehaviorSubject,
  NEVER,
  ReplaySubject,
  Subject,
  Subscription,
  asyncScheduler,
  combineLatest,
  concat,
  firstValueFrom,
  fromEvent,
  iif,
  isObservable,
  of,
  race,
  throwError,
} from 'rxjs';
import {
  concatMap,
  delay,
  delayWhen,
  distinctUntilChanged,
  filter,
  map,
  pairwise,
  retry,
  share,
  skip,
  startWith,
  switchMap,
  take,
  tap,
  withLatestFrom,
} from 'rxjs/operators';
import type { TypedAction } from '../actions';
import { DATASTORE_CONFIG } from '../datastore.config';
import { DatastoreConfig } from '../datastore.interface';
import { setDiff } from '../helpers';
import type { StoreState } from '../store.model';
import type {
  ResourceSubMessage,
  ResourceUnsubMessage,
  SockMessageSend,
  WebsocketChannel,
  WebsocketResource,
} from './message-send-event.model';
import { MultiSet } from './multiset';
import {
  isWebsocketMessage,
  isWebsocketSubscriptionExpiring,
} from './server-event.helpers';
import type {
  ExpiringSubscription,
  WebsocketMessage,
  WebsocketServerEvent,
  WebsocketServerEventTParam,
} from './server-event.model';
import type { BaseServerMessage } from './server.types';
import {
  generateChannelSubMessage,
  generateHeader,
  generateOnlineOfflineSub,
  generateOnlineOfflineUnsub,
  generateResourcesSubMessage,
  generateResourcesUnsubMessage,
} from './sock-js';
import {
  ON_MESSAGE_TRACKING_CONFIG,
  OnMessageTrackedEventTypes,
  RANDOM_NUMBER_PROVIDER,
  RECONNECT_CONFIG,
  WEBSOCKET_BATCH_WINDOW,
  WEBSOCKET_TEST_SCHEDULER,
} from './websocket.config';
import {
  NonCleanDisconnectError,
  ObservableWebSocket,
} from './websocket.factory';
import type { WebSocketResponse } from './websocket.interface';
import { ReconnectConfig } from './websocket.interface';

export enum ConnectionStatus {
  CLOSED,
  OPEN,
}

export type ConnectionStatusDetail =
  | {
      readonly status: ConnectionStatus.CLOSED;
      readonly retryDelayInSeconds: number;
    }
  | {
      readonly status: ConnectionStatus.OPEN;
    };

@UntilDestroy({ className: 'WebSocketService' })
@Injectable({
  providedIn: 'root',
})
export class WebSocketService implements OnDestroy {
  /**
   * Emits a signal (`undefined`) when the websocket connection status changes from 0 to 1 (i.e. disconnected to connected).
   * Ignores the first status change that happens when the websocket connection is established after the webapp shell loads.
   */
  reconnected$: Observable<void>;

  private connectionStatusSubject$ = new BehaviorSubject<ConnectionStatus>(
    ConnectionStatus.CLOSED,
  );
  private connectionStatus$ = this.connectionStatusSubject$.asObservable();

  private fromServerStreamSubject$ = new Subject<
    WebsocketServerEvent<WebsocketServerEventTParam>
  >();
  fromServerStream$ = this.fromServerStreamSubject$.asObservable();

  private channelSubscriptions = new Set<WebsocketChannel>();
  private resourcesSubscriptions = new MultiSet<string>();
  private onlineOfflineSubscriptions = new MultiSet<number>();
  private unsubDebounceTime?: Timer; // TODO: T35544
  private reconnectionCount = 0;
  private reconnectFromDisconnectResetInterval?: Timer;
  private tabId: number;

  connectRetryCount = 0;
  reconnectFromDisconnectCount = 0;

  /**
   * This observable wraps the underlying web socket connection. When subscribed to, it:
   *  - Opens the web socket connection
   *  - Emits an observable that will emit server responses
   *  - Closes the connection when the observable is unsubscribed from
   */
  private _websocket$: ObservableWebSocket;
  get websocket$(): ObservableWebSocket {
    if (!this._websocket$) {
      this._websocket$ = new ObservableWebSocket(
        this.datastoreConfig.webSocketUrl,
        this.ngZone,
        this.timeUtils,
        () => {
          this.track({ name: 'WS.ON_CLOSE.HEARTBEAT_MISSING' });
          this.currentReconnectionSource = 'disconnect';
          this.requestReconnectionSubject$.next();
        },
      );
    }
    return this._websocket$;
  }

  /**
   * It is a queue of messages to be sent to the server once the web socket is established.
   */
  private messagesQueueSubject$ = new QueueSubject<SockMessageSend>();
  private resourceSubQueueSubject$ = new QueueSubject<ResourceSubMessage>();
  private resourceUnsubQueueSubject$ = new QueueSubject<ResourceUnsubMessage>();

  /**
   * This is a behavior subject that is used to request a reconnection of the web socket in certain cases:
   *  - When the native app is brought to the foreground and the connection is closed
   *  - After unsubscribing from channels
   *  - On a missing heartbeat
   */
  private _requestReconnectionSubject$: BehaviorSubject<void>;
  // eslint-disable-next-line rxjs/no-exposed-subjects
  get requestReconnectionSubject$(): BehaviorSubject<void> {
    if (!this._requestReconnectionSubject$) {
      this._requestReconnectionSubject$ = new BehaviorSubject<void>(undefined);
    }
    return this._requestReconnectionSubject$;
  }

  /**
   * The source of the current reconnection request.
   *
   * Different reconnection sources handle reconnection delay differently.
   * We delay datastore refetches after disconnection-reconnection events,
   * to avoid DoSing our own backend if the root cause is an outage.
   * We don't do that for app activation or unsubscription/resubscription events.
   *
   */
  currentReconnectionSource: 'disconnect' | 'appActive' | 'unsubscription';
  /**
   * The source of the reconnection request that triggered the last successful reconnection.
   */
  reconnectedSource: 'disconnect' | 'appActive' | 'unsubscription';

  /**
   * This doesn't necessarily reflect the real connection status of the websocket.
   * It is used to show messages to the user.
   */
  private websocketConnectStatusForUserSubject$ =
    new ReplaySubject<ConnectionStatusDetail>(1);

  private subscriptions = new Subscription();

  private readonly TRACKING_RETRY_COUNT_THRESHOLD = 5;
  private readonly TRACKING_RETRY_COUNT_DEBOUNCE = 10;

  constructor(
    @Inject(DATASTORE_CONFIG) private datastoreConfig: DatastoreConfig,
    @Inject(RECONNECT_CONFIG) private reconnectConfig: ReconnectConfig,
    @Inject(DOCUMENT) private document: Document,
    private auth: Auth,
    private tracking: Tracking,
    private store$: Store<StoreState>,
    private timeUtils: TimeUtils,
    private ngZone: NgZone,
    @Optional()
    @Inject(WEBSOCKET_TEST_SCHEDULER)
    private scheduler: SchedulerLike | null,
    @Optional()
    @Inject(RANDOM_NUMBER_PROVIDER)
    private randomNumberProvider: () => number,
  ) {
    this.tabId = Date.now();

    this.subscriptions.add(
      this.serverResponse$.subscribe({
        // Process server response
        next: ({ serverResponse, authState }) => {
          const serverResponseData = JSON.parse(serverResponse.data);
          this.processServerResponse(serverResponseData, authState.userId);
        },

        // Handle websocket errors
        error: error => {
          this.track({
            name: 'WS.ON_ERROR',
            extraParams: {
              ws_error: JSON.stringify(error),
            },
          });
        },

        // Connection has been closed
        complete: () => {
          this.fromServerStreamSubject$.complete();
          this.connectionStatusSubject$.next(ConnectionStatus.CLOSED);
        },
      }),
    );

    // Batch all subscription and unsubscription requests made within 1000ms.
    // After batching, pass this as a single batched request to the messagesQueueSubject$
    // rather than passing in each individual sub/unsub message to the stream.
    this.subscriptions.add(
      this.resourceSubQueueSubject$
        .pipe(
          this.timeUtils.rxBufferTime(WEBSOCKET_BATCH_WINDOW),
          filter(batchedRequests => batchedRequests.length > 0),
          map(batchedRequests =>
            batchedRequests.reduce((acc, curr) => {
              // Pass into a set first to remove duplicates
              const uniqueChannels = new Set([
                ...acc.body.channels,
                ...curr.body.channels,
              ]);
              return {
                channel: 'channels',
                body: {
                  channels: Array.from(uniqueChannels),
                },
              } as ResourceSubMessage;
            }),
          ),
        )
        .subscribe(batchedRequest => {
          // Only pass the sub request to the queue after batching it
          this.messagesQueueSubject$.next(batchedRequest);
        }),
    );

    this.subscriptions.add(
      this.resourceUnsubQueueSubject$
        .pipe(
          this.timeUtils.rxBufferTime(WEBSOCKET_BATCH_WINDOW),
          filter(batchedRequests => batchedRequests.length > 0),
          map(batchedRequests =>
            batchedRequests.reduce((acc, curr) => {
              // Pass into a set first to remove duplicates
              const uniqueResources = new Set([
                ...acc.body.resources,
                ...curr.body.resources,
              ]);
              return {
                channel: 'resource',
                body: {
                  route: 'unsub',
                  resources: Array.from(uniqueResources),
                },
              } as ResourceUnsubMessage;
            }),
          ),
        )
        .subscribe(batchedRequest =>
          // Only pass the unsub request to the queue after batching it
          this.messagesQueueSubject$.next(batchedRequest),
        ),
    );

    this.subscriptions.add(
      fromEvent(this.document, 'visibilitychange').subscribe(async () => {
        // Don't do anything if the app is hidden.
        if (this.document.hidden) {
          return;
        }
        const connectionStatus = await firstValueFrom(
          this.connectionStatus$.pipe(untilDestroyed(this)),
        );
        // If the websocket is already open, don't attempt to reconnect.
        if (connectionStatus === ConnectionStatus.OPEN) {
          return;
        }
        console.warn(
          'Attempting to reconnect to websocket after app becoming active.',
        );
        this.track({ name: 'WS.ON_CLOSE.AFTER_APP_ACTIVE' });
        this.currentReconnectionSource = 'appActive';
        this.requestReconnectionSubject$.next();
      }),
    );

    if (this.randomNumberProvider === null) {
      this.randomNumberProvider = Math.random;
    }

    this.reconnectFromDisconnectResetInterval = this.timeUtils.setInterval(
      () => {
        this.reconnectFromDisconnectCount = 0;
      },
      this.reconnectConfig.reconnectCountResetMilliseconds,
    );

    this.reconnected$ = this.hasReconnected(this.connectionStatus$);
  }

  /**
   * This function creates an observable that combines the latest values
   * emitted by the authState$ and requestReconnectionSubject$ observables,
   * filters out any values where isDefined is false, and maps the remaining
   * values to authState, marking the connection as closed. It then switches
   * to a new observable produced by the websocket$ observable, which contains
   * the original serverResponse$ observable and the authState.
   *
   * The new observable sends an authorization message to the websocket, subscribes
   * to events from the server, tracks a custom event based on the reconnection count,
   * increments the reconnectionCount, sets the connection status to open, and sends
   * all messages in the messagesQueueSubject$ to the websocket. The function returns
   * an observable that maps server responses to an object with the original serverResponse
   * and authState.
   *
   * If a NonCleanDisconnectError is thrown, the sequence will be retried with a delay based
   * on the number of previous retry attempts, capped at a maximum of approximately 30 seconds.
   * The retry attempt will be made when the delay is reached or the browser goes online.
   *
   * @return {Observable<WebSocketResponse>} An observable of websocket responses.
   */
  get serverResponse$(): Observable<WebSocketResponse> {
    return combineLatest([
      this.auth.authState$,
      this.requestReconnectionSubject$,
    ]).pipe(
      map(([authState]) => authState),
      // Mark the connection as closed
      tap(() => {
        this.connectionStatusSubject$.next(ConnectionStatus.CLOSED);
      }),
      filter(isDefined),
      // Hook onto the websocket
      switchMap(authState =>
        this.websocket$.pipe(
          map(serverResponse$ => ({ serverResponse$, authState })),
        ),
      ),
      // the observable produces a value once the websocket has been opened
      switchMap(({ serverResponse$, authState }) => {
        const authMessage = generateHeader(authState);
        this.messagesQueueSubject$.addFirst(authMessage);
        this.subscribeToEvents();

        if (this.reconnectionCount > 0) {
          this.track({
            name: 'WS.ON_OPEN_RECONNECT',
            extraParams: {
              reconnectType: this.currentReconnectionSource,
            },
          });
          if (this.currentReconnectionSource === 'disconnect') {
            this.reconnectFromDisconnectCount++;
          }
        } else {
          this.track({
            name: 'WS.ON_OPEN_INITIAL',
          });
        }
        this.reconnectionCount++;

        /**
         * Reset the reconnection source to 'disconnect' as the default after a reconnection
         * because it's possible that a disconnection/reconnection can happen before it is marked
         * as a disconnect through missing heartbeats.
         *
         * Retain the source of the current reconnection as it affects the reconnected$ emission
         */
        this.reconnectedSource = this.currentReconnectionSource;
        this.currentReconnectionSource = 'disconnect';

        this.connectionStatusSubject$.next(ConnectionStatus.OPEN);

        // Send the messages after the connection has been established
        this.websocket$.send?.(this.messagesQueueSubject$);

        // Subscribe to the serverResponse$ observable to get the response from the server
        return serverResponse$.pipe(
          map(serverResponse => ({ serverResponse, authState })),
        );
      }),
      // Reconnect websocket when NonCleanDisconnectError thrown
      retry({
        delay: error$ =>
          (isObservable(error$) ? error$ : of(error$)).pipe(
            // Use concat map to keep the errors in order and make sure they
            // aren't executed in parallel
            concatMap(error =>
              // Executes a conditional Observable depending on the result
              // of the first argument
              iif(
                () => error instanceof NonCleanDisconnectError,
                // If the condition is true we pipe this back into our stream and delay the retry
                of(error).pipe(
                  delayWhen(() => {
                    const retryThrottling = this.retryDelay;
                    this.notifyUser(retryThrottling);

                    this.track({
                      name: 'WS.ON_CLOSE',
                      extraParams: {
                        retryDelay: retryThrottling,
                        retryCount: this.connectRetryCount,
                      },
                    });

                    return race(
                      this.timeUtils.rxTimer(retryThrottling).pipe(
                        tap(() => {
                          this.connectRetryCount++;
                          console.warn('reconnecting (timeout)...');
                          this.track({
                            name: 'WS.ON_CLOSE.RETRY_DELAY_ELAPSED',
                          });
                        }),
                        map(() => true),
                      ),
                      this.onlineEvent$.pipe(
                        tap(() => {
                          this.connectRetryCount = 0;
                          console.warn('reconnecting (online event)...');
                          this.track({ name: 'WS.ON_CLOSE.WINDOW_ONLINE' });
                        }),
                        map(() => true),
                      ),
                    ).pipe(take(1));
                  }),
                  tap(() => {
                    this.track({ name: 'WS.ON_CLOSE.RECONNECTING' });
                  }),
                ),
                // Otherwise we throw the error (the last error)
                throwError(() => error),
              ),
            ),
          ),
      }),
    );
  }

  get onlineEvent$(): Observable<Event> {
    return fromEvent(window, 'online');
  }

  hasReconnected(connectionStatus$: Observable<number>): Observable<void> {
    return connectionStatus$.pipe(
      pairwise(),
      filter(
        ([prevOnlineState, currentOnlineState]) =>
          prevOnlineState === ConnectionStatus.CLOSED &&
          currentOnlineState === ConnectionStatus.OPEN,
      ),
      tap(() => {
        this.connectRetryCount = 0;
        this.websocketConnectStatusForUserSubject$.next({
          status: ConnectionStatus.OPEN,
        });
      }),
      // skip the initial change of status that happens when
      // the websocket connection is established for the first time
      skip(1),
      map(_ => undefined),
      switchMap(() => {
        const maxDelay =
          this.reconnectedSource === 'disconnect'
            ? this.reconnectConfig.signalDelayMaxMilliseconds
            : 0;
        return of(undefined).pipe(
          // eslint-disable-next-line local-rules/validate-timers
          delay(
            this.randomNumberProvider() * maxDelay,
            this.scheduler ?? asyncScheduler,
          ),
        );
      }),
      filter(
        () =>
          this.reconnectedSource !== 'disconnect' ||
          this.reconnectFromDisconnectCount <=
            this.reconnectConfig.reconnectEmitLimit,
      ),
      share(),
    );
  }

  getConnectStatusForUser(): Observable<ConnectionStatusDetail> {
    return this.websocketConnectStatusForUserSubject$;
  }

  private track({
    name,
    section = 'WS_CONN_EVENT',
    extraParams,
  }: {
    name: string;
    section?: string;
    extraParams?: TrackingExtraParams;
  }): void {
    if (
      !name.startsWith('WS.ON_CLOSE') ||
      this.shouldTrackWebsocketDisconnectionEvent(this.connectRetryCount)
    ) {
      this.tracking.trackCustomEvent(name, section, {
        tabId: this.tabId,
        ...extraParams,
      });
    }
  }

  ngOnDestroy(): void {
    this.subscriptions.unsubscribe();
    clearInterval(this.reconnectFromDisconnectResetInterval);
  }

  processServerResponse(
    event: WebsocketServerEvent<WebsocketServerEventTParam>,
    userId: string,
  ): void {
    // Process websocket message
    if (isWebsocketMessage<BaseServerMessage>(event)) {
      this.processServerMessage(event, userId);
    }
    // Resubscribe to resource if receiving expiration notice
    if (isWebsocketSubscriptionExpiring(event)) {
      this.resourceResubscribe(event);
    }
  }

  /**
   * This function will put the message into the queue and
   * once the connection established it's safe to send the message.
   *
   * @param message
   */
  enqueue(message: SockMessageSend): void {
    this.messagesQueueSubject$.next(message);
  }

  enqueueResourceSub(message: ResourceSubMessage): void {
    this.resourceSubQueueSubject$.next(message);
  }

  enqueueResourceUnsub(message: ResourceUnsubMessage): void {
    this.resourceUnsubQueueSubject$.next(message);
  }

  private resourceResubscribe(event: ExpiringSubscription): void {
    this.enqueueResourceSub(generateResourcesSubMessage(event.body.resources));
  }

  processServerMessage(
    event: WebsocketMessage<BaseServerMessage>,
    toUserId: string,
  ): void {
    const { body } = event;

    const action: TypedAction = {
      type: 'WS_MESSAGE',
      no_persist: body.no_persist,
      payload: {
        ...body,
        toUserId, // all WebSocket messages are tied to the current user
      },
    };
    this.store$.dispatch(action);
    // Once the store is updated then we can let everyone know.
    this.fromServerStreamSubject$.next(event);

    // TODO: T267853 - If received message tracking is extend beyond the messaging product, tracking could be abstracted to another
    // class (e.g. websocket-tracking.ts). We could actually have a class for each product, injected dynamically based on the message type.
    // This allows each product team to choose when to track the received message, and how to format the QTS event.
    if (this.shouldTrackWebsocketReceiveEvent(event, toNumber(toUserId))) {
      this.trackWebsocketReceiveEvent(event);
    }
  }

  /* We buffer the user subscriptions and unsubscriptions by 1 second
   * to reduce backend load.
   * It's possible to have subscribed and unsubscriped within that buffer
   * so we remove such sub/unsub (and reverse) manually.
   */
  private subscribeToEvents(): void {
    // Subscribe to existing subscriptions
    const existingSubscriptions = this.onlineOfflineSubscriptions.values();
    if (existingSubscriptions.length) {
      this.enqueue(generateOnlineOfflineSub(existingSubscriptions));
    }

    if (this.channelSubscriptions.size) {
      this.enqueue(
        generateChannelSubMessage(Array.from(this.channelSubscriptions)),
      );
    }

    const existingResourcesSubscriptions = this.resourcesSubscriptions
      .values()
      .map(resourceKey => JSON.parse(resourceKey) as WebsocketResource);
    if (existingResourcesSubscriptions.length) {
      this.enqueueResourceSub(
        generateResourcesSubMessage(existingResourcesSubscriptions),
      );
    }

    this.enqueue(
      generateOnlineOfflineSub(this.onlineOfflineSubscriptions.values()),
    );
  }

  /**
   * Subscribes and unsubscribes to a list of resources by id.
   *
   * This is designed such that it is passed an observable of `ids$` and returns
   * a observable to be subscribed to such that:
   *  - when the return result is subscribed this will subscribe on the
   *    websocket to the resources in `ids$`
   *  - when the `ids$` change this will subscribe to new ids and unsubscribe
   *    from old ids as required
   *  - when the return result is unsubscribed it will unsubscribe on the
   *    websocket to all of `ids$`
   */
  subscribeToIds(
    collectionName: string,
    ids$: Observable<readonly string[]>,
  ): Observable<unknown> {
    switch (collectionName) {
      case 'onlineOffline':
        return this.subscribeOnlineOffline(ids$);

      case 'groups':
        return this.subscribeToResources(
          ids$.pipe(
            map(ids =>
              ids.map(id => ({
                id: toNumber(id),
                type: ResourceTypeApi.GROUP,
              })),
            ),
          ),
        );

      case 'groupsSelf':
        return this.subscribeToResources(
          ids$.pipe(
            map(ids =>
              ids.map(id => ({
                id: toNumber(id),
                type: ResourceTypeApi.GROUP,
              })),
            ),
          ),
        );

      default:
        throw new Error(`Cannot subscribe to collection '${collectionName}'`);
    }
  }

  private subscribeOnlineOffline(
    userIds$: Observable<readonly string[]>,
  ): Observable<unknown> {
    // Watch the user id stream, calculate watch for changes,
    // subscribe/unsubscribe to any additions/removals respectively
    // and return the user status for the user who is subscribed to.
    // We add `undefined` to the end so we can get the last unsub.
    const incomingUserIds$ = concat(
      userIds$.pipe(
        distinctUntilChanged(),
        map(ids => ids.map(id => toNumber(id))),
        startWithEmptyList(),
      ),
      of([]),
    ).pipe(pairwise());

    // Work out which user ids are just added to this subscription.
    const addedUserIds$ = incomingUserIds$.pipe(
      map(([previous, current]) => setDiff(current, previous)),
    );

    // Work out which user ids are just removed to this subscription.
    const removedUserIds$ = incomingUserIds$.pipe(
      map(([previous, current]) => setDiff(previous, current)),
    );

    // Add new ids to subscription counts and return ones newly added to the global collection
    const newlyAddedUserIds$ = addedUserIds$.pipe(
      map(ids =>
        ids.filter(id => {
          this.onlineOfflineSubscriptions.add(id);
          return this.onlineOfflineSubscriptions.multiplicity(id) === 1;
        }),
      ),
    );

    // Add new ids to subscription counts and return ones newly added to the global collection
    const newlyRemovedUserIds$ = removedUserIds$.pipe(
      map(ids =>
        ids.filter(id => {
          if (this.onlineOfflineSubscriptions.multiplicity(id) === 0) {
            console.warn(`Tried removing user ${id} who is already removed.`);
          }
          this.onlineOfflineSubscriptions.remove(id);
          return this.onlineOfflineSubscriptions.multiplicity(id) === 0;
        }),
      ),
    );

    // Send websocket events and return an Rx.Observable.never()
    return NEVER.pipe(
      withLatestFrom(
        newlyAddedUserIds$.pipe(
          tap(ids => {
            if (ids.length) {
              this.enqueue(generateOnlineOfflineSub(ids));
            }
          }),
        ),
        newlyRemovedUserIds$.pipe(
          tap(ids => {
            if (ids.length) {
              this.enqueue(generateOnlineOfflineUnsub(ids));
            }
          }),
        ),
      ),
    );
  }

  /**
   * Subscribe to resources based on their ids.
   * To be used ONLY in this service as a automatic collection subscription mechanism.
   */
  private subscribeToResources(
    resources$: Observable<readonly WebsocketResource[]>,
  ): Observable<unknown> {
    const incomingResources$ = resources$.pipe(startWith([]), pairwise());

    // Work out which resources are just added to this subscription.
    const addedResources$ = incomingResources$.pipe(
      map(([previous, current]) => setDiff(current, previous)),
    );

    // Work out which resources are just removed to this subscription.
    const removedResources$ = incomingResources$.pipe(
      map(([previous, current]) => setDiff(previous, current)),
    );

    // Add new resources to subscriptions and return ones newly added
    const newlyAddedResources$ = addedResources$.pipe(
      map(resources => {
        return resources.filter(resource => {
          const resourceKey = jsonStableStringify(resource);
          if (!resourceKey) {
            throw new Error(
              `Could not JSON stringify websocket resource id: ${resource.id} type: ${resource.type}`,
            );
          }
          this.resourcesSubscriptions.add(resourceKey);
          return this.resourcesSubscriptions.multiplicity(resourceKey) === 1;
        });
      }),
    );

    // Remove resources from subscription and return ones newly removed
    const newlyRemovedResources$ = removedResources$.pipe(
      map(resources =>
        resources.filter(resource => {
          const resourceKey = jsonStableStringify(resource);
          if (!resourceKey) {
            throw new Error(
              `Could not JSON stringify websocket resource id: ${resource.id} type: ${resource.type}`,
            );
          }
          this.resourcesSubscriptions.remove(resourceKey);
          return this.resourcesSubscriptions.multiplicity(resourceKey) === 0;
        }),
      ),
    );

    // Send websocket events and return an Rx.Observable.never()
    return NEVER.pipe(
      withLatestFrom(
        newlyAddedResources$.pipe(
          tap(resources => {
            if (resources.length) {
              this.enqueueResourceSub(generateResourcesSubMessage(resources));
            }
          }),
        ),
        newlyRemovedResources$.pipe(
          tap(resources => {
            if (resources.length) {
              this.enqueueResourceUnsub(
                generateResourcesUnsubMessage(resources),
              );
            }
          }),
        ),
      ),
    );
  }

  /**
   * This overrides the list of custom channels that the websocket will be listening to.
   */
  setChannels(channels: readonly WebsocketChannel[]): void {
    const currentSubscriptions = Array.from(this.channelSubscriptions);
    const arrayDiff = (
      a: readonly WebsocketChannel[],
      b: readonly WebsocketChannel[],
    ): number[] => a.filter(x => !b.includes(x));

    this.unsubscribeChannels(arrayDiff(currentSubscriptions, channels));
    this.subscribeChannels(arrayDiff(channels, currentSubscriptions));
  }

  private subscribeChannels(channels: readonly WebsocketChannel[]): void {
    if (!channels.length) {
      return;
    }
    this.enqueue(generateChannelSubMessage(channels));
    channels.forEach(channel => this.channelSubscriptions.add(channel));
  }

  private unsubscribeChannels(channels: readonly WebsocketChannel[]): void {
    if (channels.length) {
      channels.forEach(channel => this.channelSubscriptions.delete(channel));

      /* TODO: T267853 - Do this properly.
       * Unfortunately the websocket backend doesn't support unsubscribing
       * and so we need to disconnect and reconnect the websocket to unsub :(
       *
       * Ref T35544
       */
      if (this.unsubDebounceTime) {
        clearTimeout(this.unsubDebounceTime);
      }
      this.unsubDebounceTime = this.timeUtils.setTimeout(() => {
        this.unsubDebounceTime = undefined;
        this.currentReconnectionSource = 'unsubscription';
        this.requestReconnectionSubject$.next();

        // We should also disconnect the other websocket that libnotify has.
        window.libnotify?.close?.();
      }, 5000);
    }
  }

  /**
   * When retry count reaches TRACKING_RETRY_COUNT_THRESHOLD, send tracking request only when retry count is a multiple of TRACKING_RETRY_COUNT_DEBOUNCE.
   *
   * @param retryCount
   */
  private shouldTrackWebsocketDisconnectionEvent(retryCount: number): boolean {
    return (
      retryCount < this.TRACKING_RETRY_COUNT_THRESHOLD ||
      retryCount % this.TRACKING_RETRY_COUNT_DEBOUNCE === 0
    );
  }

  private shouldTrackWebsocketReceiveEvent(
    event: WebsocketMessage<BaseServerMessage>,
    userId: number,
  ): boolean {
    const key = `${event.body.parent_type}-${event.body.type}`;

    if (!ON_MESSAGE_TRACKING_CONFIG[key]) {
      return false;
    }

    return !this.isPrivateMessage(event) || !event.body.data.remove_reason;
  }

  private isPrivateMessage(
    event: WebsocketMessage<{ readonly [index: string]: any }>,
  ): event is WebsocketMessage<MessageApi> {
    return (
      event.body.parent_type === 'messages' && event.body.type === 'private'
    );
  }

  private trackWebsocketReceiveEvent(
    event: WebsocketMessage<BaseServerMessage>,
  ): void {
    const websocketUUID = this.generateWebsocketTrackingUUID(event);
    if (websocketUUID) {
      const extraParams = { ws_uuid: websocketUUID };
      this.tracking.trackCustomEvent(
        'WS.ON_MESSAGE',
        `${event.body.parent_type}-${event.body.type}`,
        extraParams,
      );
      return;
    }
    console.warn(
      `No configuration for generating uuid for websocket of parent type {event.body.parent_type} and type ${event.body.type}`,
    );
  }

  private generateWebsocketTrackingUUID(
    event: WebsocketMessage<BaseServerMessage>,
  ): string | undefined {
    switch (`${event.body.parent_type}-${event.body.type}`) {
      case OnMessageTrackedEventTypes.MESSAGES_PRIVATE:
        return event.body.data.id;
      default:
        return undefined;
    }
  }

  /**
   * Calculates the retry delay for reconnecting to the websocket
   *
   * @private
   * @returns {number} The retry delay, in milliseconds.
   */
  get retryDelay(): number {
    // retryDelay is capped to ~30 seconds
    const retryDelayFactor = Math.min(this.connectRetryCount, 5);
    return 2 ** retryDelayFactor * 1000 + Math.round(Math.random() * 1000);
  }

  /**
   * Notifies the user of the specified retry delay if the retry count is
   * greater than 0.
   *
   * @private
   * @param {number} retryDelay - The retry delay, in milliseconds.
   */
  notifyUser(retryDelay: number): void {
    const retryDelayInSeconds = Math.round(retryDelay / 1000);
    if (this.connectRetryCount > 0) {
      // If we're having trouble reconnecting, alert the user
      this.websocketConnectStatusForUserSubject$.next({
        status: ConnectionStatus.CLOSED,
        retryDelayInSeconds,
      });
    }
  }
}
