import { Injectable } from '@angular/core';
import { ActionCreator, ActionType } from '@ngrx/store/src/models';
import { ConnectableObservable, EMPTY, from, Observable, ObservableInput, ReplaySubject, Subject, timer } from 'rxjs';
import { Action } from '@ngrx/store';
import { delayWhen, distinctUntilChanged, map, publishReplay, retryWhen, scan, tap } from 'rxjs/operators';
import { ErrorHandlerService } from '../error-handler.service';

export type BackendTask<C extends ActionCreator[]> = (action: ActionType<C[number]>) => ObservableInput<Action>;

const RETRY_ALERT_THRESHOLD = 2;

@Injectable({ providedIn: 'root' })
export class BackendTasksService {
  private pendingTasks$ = new Subject<number>();

  backgroundTaskRunning$: Observable<boolean>;
  private retries$ = new ReplaySubject<number>(1);
  retrying$: Observable<boolean>;
  retryAlert$: Observable<boolean>;

  private backendTasks: { [actionType: string]: BackendTask<any> } = {};

  constructor(private errorHandler: ErrorHandlerService) {
    this.backgroundTaskRunning$ = this.pendingTasks$.pipe(
      scan((acc, next) => acc + next, 0),
      map(next => next > 0),
      distinctUntilChanged(),
      publishReplay(1)
    );

    (this.backgroundTaskRunning$ as ConnectableObservable<boolean>).connect();

    this.retrying$ = this.retries$.pipe(map(retries => retries > 0));
    this.retryAlert$ = this.retries$.pipe(map(retries => retries > RETRY_ALERT_THRESHOLD));
  }

  register<C1 extends ActionCreator, S>(action: C1, task: BackendTask<[C1]>): void {
    if (!!this.backendTasks[action.type]) {
      throw new Error(`There is already a backend task registered for action type "${action.type}"`);
    } else {
      this.backendTasks[action.type] = task;
    }
  }

  execute(action: Action): Observable<Action> {
    const task = this.backendTasks[action.type];

    if (!!task) {
      return new Observable(observer => {
        this.pendingTasks$.next(1);
        let retryCount = 0;
        from(task(action))
          .pipe(
            this.errorHandler.logErrors(),
            retryWhen(errors =>
              errors.pipe(
                tap({
                  next: val => {
                    retryCount++;
                    console.log(`retrying... (count: ${retryCount})`);
                    this.retries$.next(retryCount);
                  }
                }),
                delayWhen(() => timer((Math.abs(retryCount - RETRY_ALERT_THRESHOLD) + 1) * 500))
              )
            ),
            tap({
              complete: () => {
                this.pendingTasks$.next(-1);
                this.retries$.next(0);
              },
              error: () => this.pendingTasks$.next(-1)
            })
          )
          .subscribe(observer);
      });
    } else {
      return EMPTY;
    }
  }
}
