// @flow
import { Observable } from 'rxjs';
import { values } from 'ramda';
import { HORSERACING_ID, GREYHOUNDS_ID } from '../../../common/constants';
import { fromDiffusion } from './observable';
import { handleTopics } from '../handlers/handle-topics';
import { DIFFUSION_HANDLED_TOPICS } from '../handled-topics';
import * as paths from '../topic/paths';
import { diffusionMessageTypes } from '../message-descriptor';
import { DiffusionClientDefault } from './diffusion-client-default';
import type { Config, Message, Credentials } from '../types';
import type { LiveEventType, DiffusionData } from '../../types';
import {
  filter,
  map,
  catchError,
  bufferTime,
  onErrorResumeNext,
} from 'rxjs/operators';

const topicList = values(DIFFUSION_HANDLED_TOPICS);

let instance;

type EventToSubscribeType = { +id: string, +sportId: string };

const createQueue = (predicate: () => boolean) => {
  const queue = [];

  const push = (item) => {
    queue.push(item);
    check();
  };

  const check = () => {
    if (predicate()) {
      queue.forEach((fn) => fn());
      queue.length = 0;
    }
  };

  return { push, check };
};

export class DiffusionClient {
  Diffusion: DiffusionClientDefault;
  _queue: Array<() => void>;
  queue: $Call<typeof createQueue, () => boolean>;
  config: Config;
  subscribedEvent: ?EventToSubscribeType;
  stream: rxjs$Observable<Message>;
  handledStream: rxjs$Observable<DiffusionData>;
  bufferedStream: rxjs$Observable<DiffusionData>;
  eventStream: rxjs$Observable<DiffusionData>;
  quoteStream: rxjs$Observable<DiffusionData>;
  quoteStreamBuffered: rxjs$Observable<Array<DiffusionData>>;
  subscribedTopics: Set<string>;
  isConnectedState: boolean;
  isConnectedTimer: TimeoutID;

  constructor(config: Config) {
    if (instance) {
      return instance;
    }

    instance = this;

    this.Diffusion = DiffusionClientDefault;
    this.queue = createQueue(() => this.Diffusion.isConnected());
    this.config = config;
    this.subscribedEvent = null;
    this.subscribedTopics = new Set();
    this.isConnectedState = false;

    this.stream = fromDiffusion({
      Diffusion: DiffusionClientDefault,
      onCallbackFunction: this.onCallbackFunction.bind(this),
      onDataFunction: this.onDataFunction.bind(this),
      ...config,
    });

    this.handledStream = this.stream.pipe(
      filter(({ topic }) => topicList.includes(topic.type)),
      map(handleTopics(config)),
      filter((v) => v),
      catchError((e) => {
        // eslint-disable-next-line
        console.error(e);
        throw e;
      }),
      onErrorResumeNext()
    );

    this.bufferedStream = this.handledStream.pipe(
      bufferTime(500),
      filter((batch) => batch.length > 0)
    );

    this.eventStream = this.handledStream.pipe(
      filter(
        ({ topicType }) => topicType === DIFFUSION_HANDLED_TOPICS.EVENT_TOPIC
      )
    );

    this.quoteStream = this.handledStream.pipe(
      filter(
        ({ topicType }) => topicType === DIFFUSION_HANDLED_TOPICS.QUOTE_TOPIC
      )
    );

    this.quoteStreamBuffered = this.quoteStream.pipe(
      bufferTime(1000),
      filter((batch) => batch.length > 0)
    );
  }

  isConnected() {
    /** NOTE: this check is so long, like a 10-30 seconds to get if false */
    return this.Diffusion && this.Diffusion.isConnected();
  }

  onDataFunction() {
    /** NOTE: this is some workaround for this.Diffusion?.isConnected() */
    this.isConnectedState = true;
    clearTimeout(this.isConnectedTimer);
    this.isConnectedTimer = setTimeout(() => {
      this.isConnectedState = false;
    }, 2000);
  }

  getIsConnectedState() {
    return this.isConnectedState;
  }

  onCallbackFunction(isConnected: boolean, isReconnect?: boolean) {
    if (isConnected) {
      this.queue.check();
    }
    if (!isConnected && isReconnect) {
      this.Diffusion.reconnect();
    }
  }

  subscribe(topic: string) {
    if (this.subscribedTopics.has(topic)) {
      return;
    }
    this.subscribedTopics.add(topic);

    this.queue.push(() => {
      this.Diffusion.subscribe(topic);
    });
  }

  unsubscribe(topic: string) {
    this.subscribedTopics.delete(topic);
    this.Diffusion.unsubscribe(topic);
  }

  // eslint-disable-next-line class-methods-use-this
  ignoreSubscribeToMarkets(event: EventToSubscribeType) {
    return !event;
  }

  subscribeToAllEvents() {
    this.subscribe(paths.sports());
    this.subscribe(paths.rankedSports());
    this.subscribe(paths.events());
    if (__OSG_CONFIG__.showHorseRacing) {
      this.subscribe(paths.rankedEventsForSport(HORSERACING_ID));
    }
    if (__OSG_CONFIG__.showGreyhounds) {
      this.subscribe(paths.rankedEventsForSport(GREYHOUNDS_ID));
    }
    this.subscribe(paths.clocks());
    this.eventStream
      .pipe(
        filter(
          ({ data, type }) =>
            type === diffusionMessageTypes.initial &&
            data &&
            data.primaryMarketId
        )
      )
      .subscribe(({ data }) => {
        const { sportId, primaryMarketId, id } = data;
        if (!this.ignoreSubscribeToMarkets(data)) {
          this.subscribe(paths.market(sportId, id, primaryMarketId));
        }
      });
  }

  subscribeToEvent(event: EventToSubscribeType) {
    const { sportId, id } = event;
    const { locale } = this.config;
    this.subscribe(paths.markets(sportId, id));
    this.subscribe(paths.outcomes(sportId, id));
    this.subscribe(paths.scoreboard(sportId, id));
    this.subscribe(paths.messages(sportId, id, locale));
  }

  unsubscribeFromEvent(event: LiveEventType) {
    const { sportId, id, primaryMarketId } = event;
    const { locale } = this.config;
    this.unsubscribe(paths.markets(sportId, id));
    this.unsubscribe(paths.outcomes(sportId, id));
    this.unsubscribe(paths.scoreboard(sportId, id));
    this.unsubscribe(paths.messages(sportId, id, locale));
    this.subscribe(paths.market(sportId, id, primaryMarketId));
  }

  toggleEventSubscribe(event: EventToSubscribeType) {
    if (this.ignoreSubscribeToMarkets(event)) return;
    this.subscribeToEvent(event);
    this.subscribedEvent = event;
  }

  subscribeToPrimaryMarket(event: LiveEventType, oldPrimaryMarketId: ?string) {
    const isEventMarketsSubscribed =
      this.subscribedEvent && this.subscribedEvent.id === event.id;
    if (this.ignoreSubscribeToMarkets(event) || isEventMarketsSubscribed) {
      return;
    }

    this.subscribe(
      paths.market(event.sportId, event.id, event.primaryMarketId)
    );
    if (oldPrimaryMarketId) {
      this.unsubscribe(
        paths.market(event.sportId, event.id, oldPrimaryMarketId)
      );
    }
  }

  quotes$(credentials: Credentials) {
    return Observable.create((observer) => {
      const { locale } = this.config;
      const topic = paths.quotes(credentials.username, locale);

      let timeout;

      this.queue.push(() => {
        this.Diffusion.sendCredentials(credentials);

        // This setTimeout is required because BCC doesn't have enough time
        // to authorise if we subscribe to quotes immediately after sendCredentials
        timeout = setTimeout(() => {
          this.subscribe(topic);
        }, 1000);
      });

      const subscription = this.quoteStreamBuffered.subscribe(observer);

      return () => {
        clearTimeout(timeout);
        this.unsubscribe(topic);
        subscription.unsubscribe();
      };
    });
  }

  live$ = Observable.create((observer) => {
    this.subscribeToAllEvents();
    const subscription = this.bufferedStream.subscribe(observer);
    return () => {
      subscription.unsubscribe();
    };
  });
}
