import env from '@/plugins/env';
import { createSSE } from '@/api/api';
import { debug } from '@/utils/helpers';
import { HTTP_CODES as HTTP_CODES_NON_TYPE } from '@/utils/constants';

export enum SSETopics {
  /** События по активному пулу */
  ActivePool = 'processing:active_pool',

  /** События по пассивному пулу */
  PassivePool = 'processing:passive_pool',

  /** Возвращает событие `ready` */
  Internal = 'processing:internal',
}

export enum SSEType {
  Error = 'error',
  Heartbeat = 'heartbeat',
  Statistics = 'statistics',
}

export enum SSEActivePoolTypes {
  Statistics = 'statistics',
  Distribution = 'distribution',
  Timeout = 'timeout',
}

export enum SSEPassivePoolTypes {
  Ready = 'ready',
  Taken = 'taken',
  Released = 'released',
  Resolved = 'resolved',
  Timeout = 'timeout',
}

export type SSETypes = SSEType | SSEActivePoolTypes | SSEPassivePoolTypes;

const esLog = debug('MEDIC SSE', '#1976D2');
const sseUrl = env.get('VUE_APP_SSE_API');

const HTTP_CODES = HTTP_CODES_NON_TYPE as Record<number, string>;

const ATTEMPTS_TO_RECONNECT = 10;

export const SSE_STATUSES: Record<number, string> = {
  0: 'CONNECTING',
  1: 'OPEN',
  2: 'CLOSED',
};

/** Список SSE подключений */
const esStore: any = {};
const esStoreReconnect: any = {};

interface IWatchEvents {
  /** Ключ источника событий */
  key: string;

  /** Колбэк для ошибок */
  errCallback: (error: any) => any;

  /** Колбэк при получении данных */
  callback: (data: any) => any;

  /** Параметры для запроса */
  query?: {
    topics: SSETopics[];
    heartbeat: boolean;
  };
  isReconnect: boolean;
}

class SSE {
  /** Singleton */
  private static _instance: SSE;

  /** Существующий instance SSE */
  public static get Instance() {
    return this._instance || (this._instance = new this());
  }

  /**
   * Подписаться на получение событий
   * @public
   * @action SSE_EVT
   */
  async watchEvents({
    key,
    errCallback,
    callback,
    query,
    isReconnect,
  }: IWatchEvents) {
    if (!isReconnect) {
      esStoreReconnect[key] = 0;
    } else {
      esStoreReconnect[key] = (esStoreReconnect[key] || 0) + 1;
    }

    if (esStore[key]) {
      const error = new Error(`SSE error: "${key}" already watched`);
      console.error(error.message);
      await this.unwatchEvents(key, error);
    }

    try {
      esStore[key] = await createSSE(`${sseUrl}/v3/events`, query);
    } catch (err) {
      return errCallback(err);
    }

    esStore[key].onopen = () => {
      esLog('Connected');
    };

    esStore[key].onerror = async (err: any) => {
      await this.unwatchEvents(key, err);

      if (esStoreReconnect[key] > ATTEMPTS_TO_RECONNECT) {
        const error = err?.status
          ? HTTP_CODES[err?.status]
          : err?.message || 'Ошибка';
        return errCallback(new Error(error));
      } else {
        // do reconnect
        await this.watchEvents({
          key,
          errCallback,
          callback,
          query,
          isReconnect: true,
        });
      }
    };

    esStore[key].onmessage = (event: any) => {
      this._eventHandler(event, callback);
    };
  }

  /**
   * Остановить получение событий
   * @public
   *
   * @param key Ключ источника событий
   * @param err Ошибка по которой закрылось соединение
   */
  async unwatchEvents(key: string, err?: any) {
    if (esStore[key]) {
      await esStore[key].close();
      esStore[key] = null;
    }
    esLog('Closed', err ? err?.message : 'by user');
  }

  /**
   * Получить статус активности SSE соединения
   * @public
   *
   * @param key Ключ источника событий
   */
  getStatus(key: string) {
    if (!esStore[key]) return null;

    const num = Number(esStore[key].readyState);
    return SSE_STATUSES[num] || null;
  }

  /**
   * Обработка полученных событий
   * @private
   *
   * @param event Событие из подключенного SSE
   * @param callback Обработчик события
   * @param errCallback Обработчик ошибки
   */
  private async _eventHandler(event: any, callback: (data: any) => void) {
    try {
      const data = JSON.parse(event.data);

      // Heartbeat не имеет типа, поэтому проверяем на наличие поля time
      const type = data?.type
        ? (data?.type as SSETypes)
        : data?.time
        ? SSEType.Heartbeat
        : null;

      // Фильтруем сообщения и не выводим их в логи
      // иначе невозможно читать события в Sentry
      if (env.get('VUE_APP_SSE_DEBUG')) {
        // При наличии флага выводим все сообщения
        esLog(
          'Event',
          (type ? `[${type}]` : '') + ` ${event.lastEventId}:`,
          data?.payload || data,
        );
      } else if (type !== SSEType.Heartbeat && type !== SSEType.Statistics) {
        // Фильтруем в логах статистику и heartbeat
        esLog(
          'Event',
          (type ? `[${type}]` : '') + ` ${event.lastEventId}:`,
          data?.payload || data,
        );
      }

      if (type !== SSEType.Error) callback(data);
    } catch (err) {
      console.error(err);
    }
  }
}

export default SSE.Instance;
