// @flow
import { interval } from 'rxjs';
import * as RxO from 'rxjs/operators';
import { ofType, combineEpics } from 'redux-observable';
import { toast } from 'react-toastify';
import { camelizeKeys } from 'humps';
import type { Epic } from 'flow-types/Epic';
import responseParser from 'common/epicHelpers/responseParser';
import request from 'utils/request';
import { TASK_STATUS } from 'common/helpers/response';
import type { RegisterTask, UpdateTaskStatus } from '../reducer/action.flow';
import { composeId } from '../utils';

const registerTaskWatcher$: Epic = action$ =>
  action$.pipe(
    ofType('tasks-watchers/register'),
    // only new tasks will be watched
    RxO.mergeMap((action: RegisterTask) => {
      const {
        taskId,
        taskType,
        remote,
        taskTitle,
        successMessage,
        errorMessage,
        onSuccess,
        onError
      } = action;

      const id = composeId({ taskId, taskType });

      return interval(3000).pipe(
        // request
        RxO.switchMap(() =>
          request({
            url: remote.url,
            method: remote.method
          }).pipe(responseParser, RxO.pluck('data'))
        ),
        RxO.takeWhile(data => {
          const task = camelizeKeys(data);

          return ![TASK_STATUS.READY, TASK_STATUS.ERROR].includes(task.status);
        }, true),
        RxO.mergeMap((data: Object) => {
          if (data.status === TASK_STATUS.READY) {
            toast.success(
              successMessage ?? `Процесс ${taskTitle} успешно завершён`
            );

            onSuccess?.();

            return [{ type: 'tasks-watchers/unregister', taskId, taskType }];
          }

          if (data.status === TASK_STATUS.ERROR) {
            toast.success(
              errorMessage ?? `Во время процесса ${taskTitle} возникла ошибки`
            );

            onError?.();

            return [{ type: 'tasks-watchers/unregister', taskId, taskType }];
          }

          const updateTask$: UpdateTaskStatus = {
            type: 'tasks-watchers/update-status',
            status: data.status,
            taskId,
            taskType
          };

          return [updateTask$];
        }),
        RxO.takeUntil(
          action$.pipe(
            ofType('tasks-watchers/unregister'),
            RxO.filter(unwatchAction => composeId(unwatchAction) === id)
          )
        )
      );
    }),
    RxO.takeUntil(action$.pipe(ofType('auth/sign-out')))
  );

// $FlowIgnore
export default combineEpics(registerTaskWatcher$);
