import { EMPTY, interval, merge } from 'rxjs';
import * as RxO from 'rxjs/operators';
import { ofType } from 'redux-observable';

import type { Epic } from 'flow-types/Epic';

export const startProcessWatcher: Epic = action$ =>
  action$.pipe(
    ofType('process/start'),
    RxO.filter(action => action.processId && !!action.config),
    RxO.mergeMap(action => {
      const result = [];

      const {
        config: { duration },
        processId
      } = action;

      if (duration) {
        result.push(
          interval(duration * 1000).pipe(
            RxO.take(1),
            RxO.map(() => ({ type: 'process/complete', processId })),
            RxO.takeUntil(
              action$.pipe(
                ofType('process/complete'),
                RxO.filter(untilAction => untilAction.processId === processId)
              )
            )
          )
        );
      }

      if (result.length === 0) return EMPTY;

      return merge(...result);
    })
  );
