import { EMPTY, from, Observable, ObservableInput, of, Subject, Subscriber } from 'rxjs';
import { filter, finalize, map, switchMap, take, takeUntil, tap } from 'rxjs/operators';
import { ArrayHelper } from '../../../../helpers/arrayHelper';

/** Empile les événements dans un buffer et relache le buffer lorsque la condition est vraie ou si le flux envoie une valeur. Crée un autre buffer directement à la suite. */
export function bufferUntil<T>(pfCallback: (poValue: T) => ObservableInput<any> | boolean): (poSource: Observable<T>) => Observable<T[]> {
	return (poSource: Observable<T>) => {
		let laBuffer: T[] = [];
		const loCompleteSubject = new Subject<void>();
		return new Observable<T[]>((poSubscriber: Subscriber<T[]>) => {
			return poSource
				.pipe(finalize(() => loCompleteSubject.next(undefined))) // On gère la complétion de l'observable source.
				.pipe(
					tap((poValue: T) => laBuffer.push(poValue)),
					switchMap((poValue: T) => { // A chaque valeur émise par la source
						const loResult: ObservableInput<any> | boolean = pfCallback(poValue);

						if (typeof loResult === "boolean") // On effectue le test
							return loResult ? of(true) : EMPTY;

						return from(loResult).pipe(take(1), takeUntil(loCompleteSubject.asObservable())); // Ou on attends un événement.
					}),
					map(() => { // Si on passe on envoie le buffer et on le remet à 0.
						const laSendBuffer: T[] = laBuffer;
						laBuffer = [];
						return laSendBuffer;
					}),
					filter((paBuffer: T[]) => ArrayHelper.hasElements(paBuffer)) // Il faut qu'il y ai au moins une valeur dans le buffer pour pouvoir le consommer.
				).subscribe(
					(paSendBuffer: T[]) => poSubscriber.next(paSendBuffer), // On envoie le buffer dans le nouveau flux en sortie.
					(poError: any) => poSubscriber.error(poError), // On envoie l'erreur.
					() => {
						if (ArrayHelper.hasElements(laBuffer)) { // On envoie le buffer si il reste des éléments et on le remet à 0 pour libérer la mémoire.
							poSubscriber.next(laBuffer);
							laBuffer = undefined;
						}
						loCompleteSubject.complete();
						poSubscriber.complete();
					}
				);
		});
	};
}
