import { AsyncSubject, Observable, of, ReplaySubject, Subscription, timer } from 'rxjs';
import { catchError, finalize, flatMap, shareReplay, tap, toArray } from 'rxjs/operators';

export interface RefreshSubject<T> {
  subject: Observable<T>;
  scheduleRefresh: () => Observable<T>;
  initialValue: Observable<T>;
}

/** Create a RxJS Subject, which can be refreshed by calling the scheduleRefresh() function. A refresh function must
 * be provided in order to perform refreshes.
 * initialValue Observable provides the result of the *first* attempt to refresh and may emit an error
 * refresh subjects are smart, they do not call the refresh function unless there are observers. If there are no observers, refreshes are
 * simply scheduled until an observer subscribes.
 * @param refreshFn The function to call whenever a refresh is executed.
 * @param ready: An observable specifying whether the refreshFn is "ready", that is, all refreshes will be postponed until
 * the ready observable completes.
 */
export const createRefreshSubject = <T>(refreshFn: () => Observable<T>, ready: Observable<unknown> = of()): RefreshSubject<T> => {
  ready = ready.pipe(
      // toArray ensures exactly one invocation of the succeeding operators
      toArray(),
      shareReplay()
  );
  // The subject we use for emitting to subscribers
  const internalSubject = new ReplaySubject<T>(1);

  // Whether a refresh is scheduled. If a refresh is scheduled, a refresh will take place when there is a subscriber and there is
  // no refresh already in progress
  let refreshScheduled = true;
  // whether a refresh operation is currently in progress
  let refreshInProgress = false;
  // the number of subscribers. We count subscribers such that we don't refresh unless someone is using the result.
  let subscriberCount = 0;
  // This subject is returned from scheduleRefresh, and will contain the value returned from that particular refresh attempt.
  let refreshCompleteSubject = new AsyncSubject<T>();
  // The initial value returned by the first refresh attempt. May fail
  const initialValueSubject = new AsyncSubject<T>();
  // Whether the initial value subject has been completed
  let initialValueCompleted = false;

  // Refresh now
  const refreshNow = () => {
    if (refreshInProgress) {
      throw new Error('Cannot refresh when a refresh is already in progress or subscriber count is zero');
    }
    refreshInProgress = true;
    refreshScheduled = false;
    const result$ = ready.pipe(
        flatMap(() => refreshFn()),
        shareReplay()
    );
    result$.pipe(
        tap((value) => {
          internalSubject.next(value);
          refreshCompleteSubject.next(value);
          refreshCompleteSubject.complete();
          refreshCompleteSubject = new AsyncSubject<T>();
          if (!initialValueCompleted) {
            initialValueSubject.next(value);
            initialValueSubject.complete();
            initialValueCompleted = true;
          }
        }),
        catchError(e => {
          console.log('Refresh failed', e);
          refreshCompleteSubject.error(e);
          refreshCompleteSubject.complete();
          refreshCompleteSubject = new AsyncSubject<T>();
          if (!initialValueCompleted) {
            initialValueSubject.error(e);
            initialValueCompleted = true;
          }
          // TODO automatically schedule new refresh?
          return of();
        }),
        finalize(() => {
              refreshInProgress = false;
              if (refreshScheduled && subscriberCount > 0) {
                refreshNow();
              }
            }
        )).subscribe();
    return result$;
  };
  const subject = new Observable<T>(subscriber => {
    ++subscriberCount;
    const internalSubscription = internalSubject.subscribe(subscriber);
    if (refreshScheduled && !refreshInProgress) {
      refreshNow();
    }
    return () => {
      internalSubscription.unsubscribe();
      --subscriberCount;
    };
  });

  // We wrap the initialValueSubject in order to schedule a refresh on first subscription.
  const initialValue = new Observable<T>(subscriber => {
    const internalSubscription = initialValueSubject.subscribe(subscriber);
    if (!refreshInProgress && !initialValueCompleted) {
      refreshNow();
    }
    return () => {
      internalSubscription.unsubscribe();
    };
  });

  const scheduleRefresh = (): Observable<T> => {
    if (subscriberCount === 0 || refreshInProgress) {
      refreshScheduled = true;
    } else {
      refreshNow();
    }
    return refreshCompleteSubject;
  };
  return {subject, scheduleRefresh, initialValue};
};

/**
 * Create a subject (observable) that is automatically refreshed every intervalMs milliseconds. the refreshFn is called
 * to provide refreshed values. If refreshFn emits an error, the error is ignored and the value is not refreshed.
 * Additionally, a refreshNow function is provided to schedule an immediate refresh in addition to the timed refreshes.
 */
export const refreshSubject = <T>(intervalMs: number,
                                  refreshFn: () => Observable<T>): { subject: Observable<T>, refreshNow: () => void } => {
  const internalSubject = new ReplaySubject<T>(1);
  let subscriberCount = 0;

  let timerSubscription: Subscription;

  const setupTimer = () => {
    timerSubscription = timer(0, intervalMs).pipe(
        flatMap(() => refreshFn().pipe(
            catchError((e) => {
              console.error('refresh function failed', e);
              return of<T>();
            })
        ))).subscribe(value => internalSubject.next(value));
  };

  const teardownTimer = () => {
    timerSubscription.unsubscribe();
  };
  const subject = new Observable<T>(subscriber => {
    ++subscriberCount;
    if (subscriberCount === 1) {
      setupTimer();
    }
    const subscription = internalSubject.subscribe(subscriber);
    return () => {
      --subscriberCount;
      subscription.unsubscribe();
      if (subscriberCount === 0) {
        teardownTimer();
      }
    };
  });
  const refreshNow = () => refreshFn().subscribe(value => internalSubject.next(value));
  return {subject, refreshNow};
};
