import * as _ from 'lodash';
import { grpc } from '@improbable-eng/grpc-web';
import { stream_pb, legacy_pb } from 'libs/generated-grpc-web';
import { BehaviorSubject, Observable, Subscription } from 'rxjs';
import { first } from 'rxjs/operators';
import { stream } from 'xlsx';

export type IMgStreamFilterValue =
  | string
  | number
  | boolean
  | stream_pb.StreamFilterOp;

export interface IMgStreamFilter {
  [key: string]: IMgStreamFilterValue;
}

export interface IMgStreamControlRestartOptions {
  /**
   * CLear stream items when restarting. Otherwise the items will persist until
   * the restart is complete. Defaults to `false`
   */
  clear?: boolean;
}

export enum MgStreamErrorStatus {
  StreamControlError = 'StreamControlError',
  GrpcError = 'GrpcError',
  NetworkError = 'NetworkError',
}

export interface IMgStreamGrpcError {
  originalError: any;
  status: MgStreamErrorStatus.GrpcError;
  grpcErrorCode: grpc.Code;
}

export interface IMgStreamNetworkError {
  originalError: any;
  status: MgStreamErrorStatus.NetworkError;
}

export interface IMgStreamControlError {
  originalError: any;
  status: MgStreamErrorStatus.StreamControlError;
  action: stream_pb.StreamAction;
}

export type IMgStreamError =
  | IMgStreamGrpcError
  | IMgStreamNetworkError
  | IMgStreamControlError;

export interface IMgStreamItem<ItemT> {
  itemIndex: number;
  itemId: string;
  item: ItemT;
}

export interface IMgStreamControl<ResponseT> {
  /**
   * Close active stream's connection
   */
  close(): void;

  /**
   * Observable of items in stream
   */
  asObservable(): Observable<IMgStreamItem<ResponseT>[]>;

  /**
   * Request more items at the begining of the stream (backward)
   */
  seekBack(): void;

  /**
   * Request more items at the end of the stream (forward)
   */
  seekFront(): void;

  /**
   * Updates the filter - restarting the stream if the fitler is different
   */
  updateFilter(
    filter: IMgStreamFilter | null,
    options?: IMgStreamControlRestartOptions,
  ): void;

  /**
   * Close stream and get more items again
   */
  restart(options?: IMgStreamControlRestartOptions): void;

  /**
   * Call `callback` on item with id `id`. Underling observable is updated
   * after the callback is called.
   */
  updateItem(id: string | number, callback: (item: ResponseT) => void): void;

  /**
   * Insert item at front of stream
   */
  unshiftItem(itemId: string, item: ResponseT): void;

  /**
   * Removes and replaces item in stream changing it's id.
   */
  replaceItem(itemId: string, newItemId: string, item: ResponseT): void;

  /**
   * Removes item with id in stream
   */
  removeItem(itemId: string): void;

  /**
   * Underlying subject has completed and no more streams can be started
   */
  done(): void;

  /**
   * Checks if done has been called
   */
  readonly isDone: boolean;

  /**
   * @deprecated - Use loading$ instead
   * Check if there is an active stream control
   */
  readonly isLoading: boolean;

  /**
   * Check if there is an active stream control
   */
  readonly loading$: Observable<boolean>;

  /**
   * Check if any more items are available at the end (front) of the stream
   */
  readonly frontExhausted: boolean;

  /**
   * Check if any more items are available at the start (back) of the stream
   */
  readonly backExhausted: boolean;

  /**
   * Check if an item being loaded after a back seek has occured
   */
  readonly isBackLoading: boolean;

  /**
   * Check if an item being loaded after a front seek has occured
   */
  readonly isFrontLoading: boolean;

  /**
   * If there is an error this will be filled with details otherwise null
   */
  readonly error: IMgStreamError | null;

  /**
   * Number of items in stream
   */
  readonly length: number;

  /**
   * Only use as a last ditch effort
   */
  _getItems(): IMgStreamItem<ResponseT>[];
}

function makeRestartOptions(
  options?: IMgStreamControlRestartOptions,
): IMgStreamControlRestartOptions {
  options = options || {};
  options.clear = !!options.clear;
  return options;
}

export function toStreamFilterMessage(
  filter: IMgStreamFilter,
): stream_pb.StreamFilter {
  let streamFilter = new stream_pb.StreamFilter();
  let keyValues = streamFilter.getKeyValuesMap();

  const setNumberValue = (
    msg: {
      setUint32Value(n: number): void;
      setInt32Value(n: number): void;
      setDoubleValue(n: number): void;
    },
    value: number,
  ) => {
    if (Number.isInteger(value)) {
      if (value > 2147483647) {
        msg.setUint32Value(value);
      } else {
        msg.setInt32Value(value);
      }
    } else {
      msg.setDoubleValue(value);
    }
  };

  for (const filterKey in filter) {
    const value = filter[filterKey];
    let streamFilterValue = new stream_pb.StreamFilterType();

    if (typeof value === 'string') {
      if (value.startsWith('> ')) {
        const gtValue = parseFloat(value.substr(2));

        if (isNaN(gtValue)) {
          streamFilterValue.setStringValue(value);
        } else {
          const streamFilterOp = new stream_pb.StreamFilterOp();
          const streamFilterGtValue = new stream_pb.StreamFilterOp.GtValue();
          const streamFilterGtValueValue = new stream_pb.StreamFilterOp.Value();
          setNumberValue(streamFilterGtValueValue, gtValue);
          streamFilterGtValue.setValue(streamFilterGtValueValue);
          streamFilterOp.setGtValue(streamFilterGtValue);
          streamFilterValue.setOp(streamFilterOp);
        }
      } else if (value.startsWith('>= ')) {
        const gteValue = parseFloat(value.substr(3));

        if (isNaN(gteValue)) {
          streamFilterValue.setStringValue(value);
        } else {
          const streamFilterOp = new stream_pb.StreamFilterOp();
          const streamFilterGteValue = new stream_pb.StreamFilterOp.GteValue();
          const streamFilterGteValueValue =
            new stream_pb.StreamFilterOp.Value();
          setNumberValue(streamFilterGteValueValue, gteValue);
          streamFilterGteValue.setValue(streamFilterGteValueValue);
          streamFilterOp.setGteValue(streamFilterGteValue);
          streamFilterValue.setOp(streamFilterOp);
        }
      } else if (value.startsWith('< ')) {
        const ltValue = parseFloat(value.substr(2));

        if (isNaN(ltValue)) {
          streamFilterValue.setStringValue(value);
        } else {
          const streamFilterOp = new stream_pb.StreamFilterOp();
          const streamFilterLtValue = new stream_pb.StreamFilterOp.LtValue();
          const streamFilterLtValueValue = new stream_pb.StreamFilterOp.Value();
          setNumberValue(streamFilterLtValueValue, ltValue);
          streamFilterLtValue.setValue(streamFilterLtValueValue);
          streamFilterOp.setGteValue(streamFilterLtValue);
          streamFilterValue.setOp(streamFilterOp);
        }
      } else if (value.startsWith('<= ')) {
        const lteValue = parseFloat(value.substr(3));

        if (isNaN(lteValue)) {
          streamFilterValue.setStringValue(value);
        } else {
          const streamFilterOp = new stream_pb.StreamFilterOp();
          const streamFilterLteValue = new stream_pb.StreamFilterOp.LteValue();
          const streamFilterLteValueValue =
            new stream_pb.StreamFilterOp.Value();
          setNumberValue(streamFilterLteValueValue, lteValue);
          streamFilterLteValue.setValue(streamFilterLteValueValue);
          streamFilterOp.setGteValue(streamFilterLteValue);
          streamFilterValue.setOp(streamFilterOp);
        }
      } else {
        streamFilterValue.setStringValue(value);
      }
    } else if (typeof value === 'boolean') {
      streamFilterValue.setBoolValue(value);
    } else if (typeof value === 'number') {
      setNumberValue(streamFilterValue, value);
    } else if (value instanceof stream_pb.StreamFilterOp) {
      streamFilterValue.setOp(value);
    } else {
      console.warn('<mg-stream> canot cannot use value as filter:', value);
      break;
    }

    keyValues.set(filterKey, streamFilterValue);
  }

  return streamFilter;
}

export function mgSetupStreamControlFilter(
  control: stream_pb.StreamControl,
  filter: IMgStreamFilter,
) {
  const streamFilter = toStreamFilterMessage(filter);

  control.setStreamFilter(streamFilter);
}

/**
 * @param itemMapper - Defaults to item.toObject() for legacy purposes
 */
export function mgStreamControlObservable<ResponseT, ServiceT>(
  service: ServiceT,
  controlRpcName: keyof ServiceT,
  streamRpcName: keyof ServiceT,
  filter: IMgStreamFilter | null = null,
  itemMapper: (item: any) => any = item => item.toObject(),
): IMgStreamControl<ResponseT> {
  const subject = new BehaviorSubject<IMgStreamItem<ResponseT>[]>([]);
  const items: (IMgStreamItem<ResponseT> | null)[] = [];

  // Boolean to indicate an item was recieved on the stream after a control has
  // been called.
  const itemAfterControl = {
    front: false,
    back: false,
  };

  let streamError: IMgStreamError | null = null;
  let cancelActiveControl: () => void = () => {};
  let activeControl: Promise<void> | null = null;
  let frontExhausted: boolean = false;
  let backExhausted: boolean = false;
  let isClosed: boolean = false;
  let isDone: boolean = false;
  let streamSub = new Subscription();
  let stream: ({ cancel(): void } & Observable<any>) | null = null;
  let streamId: stream_pb.StreamID | null = null;
  let initialItemRecieved: boolean = false;
  const loading$ = new BehaviorSubject<boolean>(false);

  const isFullyExhausted: () => boolean = () => {
    return frontExhausted && backExhausted;
  };

  const clearItems = () => {
    items.splice(0, items.length);
  };

  const updateItems = () => {
    const filteredItems = items.filter(item => !!item);
    subject.next(<IMgStreamItem<ResponseT>[]>filteredItems);
  };

  const makeStream = (
    streamId: stream_pb.StreamID,
  ): { cancel(): void } & Observable<any> => {
    return (<any>service)[streamRpcName](streamId);
  };

  const doClose = () => {
    if (stream && !isClosed) {
      try {
        stream.cancel();
      } catch (err) {
        // This isn't a stop-all error
        console.warn('Stream cancel error:', err);
      }

      isClosed = true;
    }
  };

  const handleStreamError = (err: any) => {
    if (err.code) {
      streamError = {
        originalError: err,
        status: MgStreamErrorStatus.GrpcError,
        grpcErrorCode: err.code,
      };
    } else {
      streamError = {
        originalError: err,
        status: MgStreamErrorStatus.NetworkError,
      };
    }
  };

  const catchStreamThrow = (err: any) => {
    handleStreamError(err);
    subject.error(err);
    throw err;
  };

  const _setupStreamObs = (streamId: stream_pb.StreamID) => {
    isClosed = false;
    isDone = false;
    frontExhausted = false;
    backExhausted = false;
    initialItemRecieved = false;
    streamError = null;
    activeControl = null;
    loading$.next(false);
    cancelActiveControl();
    cancelActiveControl = () => {};

    doClose();

    stream = makeStream(streamId);
    const itemIds: { [key: string]: number } = {};

    streamSub.unsubscribe();
    streamSub = stream.subscribe(
      res => {
        if (!initialItemRecieved) {
          clearItems();
          initialItemRecieved = true;
        }

        itemAfterControl.back = true;
        itemAfterControl.front = true;

        let index = items.length;
        let itemId: string = '';
        let itemPosition = stream_pb.StreamItemPosition.NONE;
        const item = res.getItem();
        const itemMetadata: stream_pb.StreamItemMetadata =
          res.getItemMetadata();

        if (itemMetadata) {
          index = itemMetadata.getIndex();
          itemId = itemMetadata.getId();
          itemPosition = itemMetadata.getPosition();
        }

        if (item) {
          if (itemId && typeof itemIds[itemId] !== 'undefined') {
            items[itemIds[itemId]] = null;
          }

          if (itemPosition === stream_pb.StreamItemPosition.UPSERT) {
            items.splice(index, 1, {
              itemIndex: index,
              itemId: itemId,
              item: itemMapper(item),
            });
          } else if (itemPosition === stream_pb.StreamItemPosition.PUSH_FRONT) {
            items.push({
              itemIndex: items.length,
              itemId: itemId,
              item: itemMapper(item),
            });
          } else if (itemPosition === stream_pb.StreamItemPosition.PUSH_BACK) {
            items.unshift({
              itemIndex: 0,
              itemId: itemId,
              item: itemMapper(item),
            });
          }

          updateItems();
        }
      },
      err => {
        isClosed = true;
        isDone = true;
        frontExhausted = true;
        backExhausted = true;
        if (!streamError) {
          handleStreamError(err);
        }
        if (!initialItemRecieved) {
          clearItems();
          initialItemRecieved = true;
        }
        updateItems();
      },
      () => {
        isClosed = true;
        isDone = true;
        frontExhausted = true;
        backExhausted = true;
        activeControl = null;
        loading$.next(false);
        cancelActiveControl();

        if (!initialItemRecieved) {
          clearItems();
          initialItemRecieved = true;
        }
        updateItems();
      },
    );
  };

  const _doControl = async (streamAction: stream_pb.StreamAction) => {
    const controlRequest = new stream_pb.StreamControl();
    controlRequest.setAction(streamAction);

    if (streamId) {
      controlRequest.setStreamId(streamId);
    } else if (filter) {
      mgSetupStreamControlFilter(controlRequest, filter);
    }

    const controlResponsePromise: Promise<stream_pb.StreamControlResponse> = (<
      any
    >service)[controlRpcName](controlRequest);

    cancelActiveControl = () => {
      try {
        (<any>controlResponsePromise).cancel();
      } catch (err) {
        console.warn('Stream cancel control error:', err.message);
      }
    };

    const controlResponse = await controlResponsePromise.catch(
      catchStreamThrow,
    );

    cancelActiveControl = () => {};

    const streamStatus = controlResponse.getStreamStatus();
    const controlResponseStatus = controlResponse.getStatus();

    if (controlResponseStatus != legacy_pb.StatusCode.OK) {
      // If we get an error on a seek action control we mark it as exhausted
      switch (streamAction) {
        case stream_pb.StreamAction.SEEK_ANY:
          frontExhausted = true;
          backExhausted = true;
          break;
        case stream_pb.StreamAction.SEEK_FRONT:
          frontExhausted = true;
          break;
        case stream_pb.StreamAction.SEEK_BACK:
          backExhausted = true;
          break;
      }

      // If we're fully exhausted and we've errored from it then that's bad
      if (isFullyExhausted()) {
        streamError = {
          originalError: null,
          status: MgStreamErrorStatus.StreamControlError,
          action: streamAction,
        };
        subject.error(
          new Error(
            `Stream control status: ${controlResponseStatus} (StatusCode.ERROR)`,
          ),
        );
      }

      return;
    }

    switch (streamStatus) {
      case stream_pb.StreamStatus.STREAM_EXHAUSTED_BACK:
        backExhausted = true;
        break;
      case stream_pb.StreamStatus.STREAM_EXHAUSTED_FRONT:
        frontExhausted = true;
        break;
      case stream_pb.StreamStatus.STREAM_EXHAUSTED:
        frontExhausted = true;
        backExhausted = true;
        break;
    }

    streamId = controlResponse.getStreamId();

    if (!stream) {
      _setupStreamObs(streamId);
    }
  };

  const doControl = async (streamAction: stream_pb.StreamAction) => {
    // Only allow 1 control to be done at a time
    while (activeControl !== null && activeControl !== undefined) {
      // We don't need to report the error of other controls so we mute it here
      await activeControl.catch(() => {});
    }

    try {
      if (!isDone && !isClosed) {
        activeControl = _doControl(streamAction);
        loading$.next(true);
        await activeControl;
      }
    } finally {
      activeControl = null;
      loading$.next(false);
    }

    updateItems();
  };

  const doRestart = (_options?: IMgStreamControlRestartOptions) => {
    const options = makeRestartOptions(_options);

    doClose();
    cancelActiveControl();
    stream = null;
    streamId = null;
    isClosed = false;
    isDone = false;
    activeControl = null;
    loading$.next(false);

    if (options.clear) {
      clearItems();
      initialItemRecieved = false;
    }

    doControl(stream_pb.StreamAction.SEEK_FRONT);
  };

  const updateItem = (
    id: string | number,
    callback: (item: ResponseT) => void,
  ) => {
    const foundItem = items.find(item => item?.itemId === id);
    if (foundItem) {
      callback(foundItem.item);
      updateItems();
    }
  };

  return {
    close: () => doClose(),
    asObservable: () => subject.asObservable(),
    seekFront: () => doControl(stream_pb.StreamAction.SEEK_FRONT),
    seekBack: () => doControl(stream_pb.StreamAction.SEEK_BACK),
    restart: doRestart,
    updateItem: updateItem,
    updateFilter: (
      newFilter: IMgStreamFilter | null,
      options?: IMgStreamControlRestartOptions,
    ) => {
      if (_.isEqual(newFilter, filter)) {
        return;
      }

      filter = newFilter;
      doRestart(options);
    },
    unshiftItem: (itemId: string, item: ResponseT) => {
      items.unshift({
        item: itemMapper(item),
        itemId,
        itemIndex: items[0] ? items[0].itemIndex - 1 : 0,
      });

      updateItems();
    },
    replaceItem: (itemId: string, newItemId: string, item: ResponseT) => {
      const rmvIdx = items.findIndex(item => item?.itemId === itemId);
      if (rmvIdx != -1) {
        const rmvItem = items[rmvIdx];

        if (!rmvItem) {
          throw new Error('stream replaceItem found index, but it was null');
        }

        items.splice(rmvIdx, 1, {
          item: itemMapper(item),
          itemId: newItemId,
          itemIndex: rmvItem.itemIndex,
        });
        updateItems();
      }
    },
    removeItem: (itemId: string) => {
      const rmvIdx = items.findIndex(item => item?.itemId === itemId);
      items.splice(rmvIdx, 1);
      updateItems();
    },
    done: () => {
      doClose();
      subject.complete();
      isDone = true;
      frontExhausted = true;
      backExhausted = true;
    },
    get frontExhausted() {
      return frontExhausted;
    },
    get backExhausted() {
      return backExhausted;
    },
    get isLoading() {
      return loading$.getValue();
    },
    loading$: loading$.asObservable(),
    get isFrontLoading() {
      if (activeControl !== null && activeControl !== undefined) return true;
      return false;
    },
    get isBackLoading() {
      if (activeControl !== null && activeControl !== undefined) return true;
      return false;
    },
    get isDone() {
      return isDone;
    },
    get error() {
      return streamError ? Object.assign({}, streamError) : null;
    },
    get length() {
      return items.length;
    },
    _getItems() {
      return subject.getValue();
    },
  };
}
