import { ApolloError } from "@apollo/client";
import moment from "moment";
import { useCallback, useContext, useEffect, useMemo, useRef, useState } from "react";
import { BroadcastStream } from "../../views/adminConsole/interfaces/broadcastStream/broadcastStream";
import { AssetBroadcastStream } from "../../views/adminConsole/vendors/openTok/components/AssetBroadcastStream";
import { TimeUnit, useHeartBeat } from "../heartBeat/heartBeat.hook";
import useTimeDelta from "../timeDelta/timeDelta.hook";
import {
  AssetControlsActions,
  AssetControlsData,
  AssetControlsHook,
  AssetControlsProps,
  useChangeAssetControlsMutation,
  useGetAssetControlsQuery,
  useSubscribeToAssetControlsChange,
  CHANGE_VOLUME_ONLY_SECONDS,
  ERROR_POOLING_THRESHOLD,
} from "./remoteAssetControls.definition";
import EventContext from "../../contexts/event/event.context";

/**
 * Hook for controlling assets with OT-PR enabled. This differs the existing
 * controlAssets.hook.ts which is used for controlling assets using OT signals
 * @param props
 */
export default function useRemoteAssetControlsHook(props: AssetControlsProps): AssetControlsHook {
  const { streams, skip = false, enablePollingInterval = false } = props;

  const [serverToClientTimeDelta, setServerToClientTimeDelta] = useState(0);

  const getAssetControlsErrorCounts = useRef(0);

  const { currentEvent } = useContext(EventContext);

  const { getTimeDelta } = useTimeDelta();
  const { meetingId } = currentEvent ?? {};

  const [getAssetControls, { data: queryData }] = useGetAssetControlsQuery({
    fetchPolicy: "cache-and-network",
    onError: (error: ApolloError) => {
      console.error(error);
      if (error?.graphQLErrors?.length) {
        getAssetControlsErrorCounts.current++;
      }
    },
  });

  const heartBeatCallback = useCallback(
    (index, next) => {
      getAssetControls({ variables: { meetingId } });
      if (getAssetControlsErrorCounts.current < ERROR_POOLING_THRESHOLD) {
        next();
      }
    },
    [getAssetControls, meetingId]
  );

  const { clearHeartBeat, scheduleHeartBeat } = useHeartBeat({
    callback: heartBeatCallback,
    startTime: 10,
    step: 10,
    timeUnit: TimeUnit.second,
  });

  const assetStreamsById: Record<string, AssetBroadcastStream> = useMemo(() => {
    return streams.reduce((result, stream) => {
      if (stream?.isV2AssetStream) {
        result[stream.id] = stream;
      }
      return result;
    }, {});
  }, [streams]);

  const assetSignalTimes = useRef<Record<string, string>>({});

  const getAdjustedPosition = useCallback(
    (position: number, signalTime: string) => {
      const timeDiffMs = moment
        .utc()
        .diff(moment.utc(signalTime).add(serverToClientTimeDelta, "seconds"));
      return position + timeDiffMs / 1000;
    },
    [serverToClientTimeDelta]
  );

  const getAdjustedTime = useCallback(() => {
    return moment.utc().subtract(serverToClientTimeDelta, "seconds").toISOString();
  }, [serverToClientTimeDelta]);

  /**
   * Play/pause/update slides for a stream depending on the action coming in
   * @param stream
   * @param data
   * @param delta
   * @param onComplete
   */
  const applyChange = useCallback(
    (stream: AssetBroadcastStream, data: AssetControlsData) => {
      switch (data.action) {
        case AssetControlsActions.UPDATE_SLIDE:
          if (stream._stats?.currentSlide !== data.data) {
            stream.changeSlide(data.data);
          }
          break;
        case AssetControlsActions.PLAY:
          const adjustedPosition = getAdjustedPosition(data.data, data.signalTime);
          const ignorePositionLocally =
            Math.abs(adjustedPosition - stream.videoElement().currentTime) <=
            CHANGE_VOLUME_ONLY_SECONDS;
          const playPosition = !ignorePositionLocally ? adjustedPosition : undefined;
          stream.play(data.volume, playPosition);
          break;
        case AssetControlsActions.PAUSE:
          stream.pause(data.volume, data.data);
          break;
        default:
          return;
      }
    },
    [getAdjustedPosition]
  );

  /**
   * Helper function for not applying past changes if new ones have already been executed and
   * verifying that the stream is an AssetBroadcastStream
   * @param stream
   * @param signalTime
   */
  const canApplyAssetChange = useCallback(
    (stream: BroadcastStream, signalTime: string) => {
      if (!stream?.isV2AssetStream) return false;

      const lastSignalTime = assetSignalTimes.current[stream.id];
      if (!lastSignalTime) return true;

      const lastChange = moment.utc(lastSignalTime);
      const currentChange = moment.utc(signalTime).add(serverToClientTimeDelta, "seconds");

      return currentChange.isAfter(lastChange);
    },
    [serverToClientTimeDelta]
  );

  useEffect(() => {
    if (serverToClientTimeDelta) return;
    getTimeDelta(new Date()).then((result) => setServerToClientTimeDelta(result?.delta));
  }, [serverToClientTimeDelta, getTimeDelta]);

  useEffect(
    function setAssetStates() {
      const assetStates = queryData?.getAssetControls ?? [];
      assetStates.forEach((assetState) => {
        const streamToUpdate = assetStreamsById[assetState.assetId];

        if (!streamToUpdate) return;

        if (canApplyAssetChange(streamToUpdate, assetState.signalTime)) {
          assetSignalTimes.current[streamToUpdate.id] = getAdjustedTime();
          applyChange(streamToUpdate, assetState);
        }
      });
    },
    [queryData, assetStreamsById, applyChange, getAdjustedTime, canApplyAssetChange]
  );

  useEffect(
    function keepAssetControlsInSync() {
      if (!meetingId || !enablePollingInterval || skip) return;
      scheduleHeartBeat();
      return () => {
        clearHeartBeat();
        getAssetControlsErrorCounts.current = 0;
      };
    },
    [meetingId, enablePollingInterval, scheduleHeartBeat, clearHeartBeat, skip]
  );

  const [updateAssetControls] = useChangeAssetControlsMutation({
    onError: console.error,
  });

  const updateAssetState = useCallback(
    async (meetingId: number, assetState: AssetControlsData) => {
      return updateAssetControls({
        variables: {
          meetingId,
          assetControls: assetState,
        },
      });
    },
    [updateAssetControls]
  );

  const onAssetControlsUpdated = useCallback(
    ({ data: subscriptionData}) => {
      const updateEvent = (subscriptionData?.data?.subscribeToAssetControlsChange ??
        {}) as AssetControlsData;

      const streamToUpdate = assetStreamsById[updateEvent.assetId];

      if (canApplyAssetChange(streamToUpdate, updateEvent.signalTime))
        applyChange(streamToUpdate, updateEvent);
    },
    [applyChange, assetStreamsById, canApplyAssetChange]
  );

  useSubscribeToAssetControlsChange({
    variables: {
      meetingId,
    },
    skip: !meetingId || skip,
    onData: onAssetControlsUpdated,
  });

  useEffect(
    function fetchInitialAssetStates() {
      if (!meetingId || queryData || skip) return;
      getAssetControls({ variables: { meetingId } });
    },
    [meetingId, queryData, getAssetControls, skip]
  );

  const changeSlide = useCallback(
    async (stream: AssetBroadcastStream, slideNumber: number) => {
      try {
        stream.changeSlide(slideNumber);
        const signalTime = getAdjustedTime();
        assetSignalTimes.current[stream.id] = signalTime;

        const changeSlidePayload: AssetControlsData = {
          assetId: stream.id,
          data: slideNumber,
          signalTime,
          action: AssetControlsActions.UPDATE_SLIDE,
        };

        return updateAssetState(meetingId, changeSlidePayload);
      } catch (error) {
        console.error(
          `Failed to change slide (meeting: ${meetingId}, assetId: ${stream.id})`,
          error
        );
      }
    },
    [getAdjustedTime, updateAssetState, meetingId]
  );

  const changeAudioVideo = useCallback(
    (
      stream: AssetBroadcastStream,
      action: Exclude<AssetControlsActions, AssetControlsActions.UPDATE_SLIDE>,
      volume = 100,
      position?: number
    ) => {
      try {
        const data = position ?? stream.videoElement().currentTime;
        const signalTime = getAdjustedTime();
        assetSignalTimes.current[stream.id] = signalTime;

        const changeAVPayload: AssetControlsData = {
          assetId: stream.id,
          data,
          signalTime,
          volume,
          action,
        };

        // ignoring setting the video position when playing the video if it is within 0.3 seconds
        // of the current timestamp (allows volume to be modified smoothly)
        const ignorePositionLocally =
          Math.abs(data - stream.videoElement().currentTime) <= CHANGE_VOLUME_ONLY_SECONDS;
        const playPosition = !ignorePositionLocally ? data : undefined;

        action === AssetControlsActions.PLAY
          ? stream.play(volume, playPosition)
          : stream.pause(volume, data);
        return updateAssetState(meetingId, changeAVPayload);
      } catch (error) {
        const actionLower = action.toLowerCase();
        const mediaType = stream.assetType.toLowerCase();
        console.error(
          `Failed to ${actionLower} ${mediaType} (meeting: ${meetingId}, assetId: ${stream.id})`,
          error
        );
      }
    },
    [getAdjustedTime, updateAssetState, meetingId]
  );

  return {
    assetControlsService: {
      changeSlide,
      changeAudioVideo,
    },
  };
}
