import { Injectable } from '@angular/core';
import PCancelable from 'p-cancelable';
import { fromEvent, merge, Observable, throwError } from 'rxjs';
import { catchError, finalize, mergeMap, scan, take, takeUntil } from 'rxjs/operators';
import { NumberHelper } from '../../../../helpers/numberHelper';
import { ObjectHelper } from '../../../../helpers/objectHelper';
import { StoreHelper } from '../../../../helpers/storeHelper';
import { StringHelper } from '../../../../helpers/stringHelper';
import { EReplicationStyle } from '../../../../model/store/ereplication-style.enum';
import { IStoreDocument } from '../../../../model/store/IStoreDocument';
import { IStoreReplicationOptions } from '../../../../model/store/IStoreReplicationOptions';
import { IStoreReplicationResponse } from '../../../../model/store/IStoreReplicationResponse';
import { NetworkService } from '../../../../services/network.service';
import { PerformanceManager } from '../../../performance/PerformanceManager';
import { ISynchronizationEvent } from '../../model/isynchronization-event';
import { ReplicatorError } from '../models/errors/replicator-error';
import { IOnProgressFunction } from '../models/ion-progress-function';
import { IReplicatorParamsBase } from '../models/ireplicator-params-base';
import { ReplicatorBase } from '../models/replicator-base';

@Injectable()
export class ClassicReplicatorService extends ReplicatorBase {

	//#region FIELDS

	private static readonly C_LOG_ID = "CLASSICREPLIC.S::";

	/** Valeur par défaut du timeout de réplication, appliqué uniquement aux réplications non live. */
	private readonly C_DEFAULT_REPLICATION_TIMEOUT_MILLISECONDS = 120000;

	//#endregion

	//#region METHODS

	constructor(psvcNetwork: NetworkService) {
		super(psvcNetwork, "Classic");
	}

	public override replicateAsync(poParams: IReplicatorParamsBase, pfOnProgress?: IOnProgressFunction): PCancelable<IStoreReplicationResponse> {
		return new PCancelable(async (pfResolve, pfReject, pfOnCancel) => {
			try {
				if (poParams.replicateOptions)
					poParams.replicateOptions.live = false;

				await this.handleNetworkAsync(poParams.database);

				pfResolve(await this.innerReplicateAsync(poParams, pfOnCancel, pfOnProgress));
			}
			catch (poError) {
				pfReject(poError);
			}
		});
	}

	public override replicateLive$(poParams: IReplicatorParamsBase): Observable<IStoreReplicationResponse> {
		poParams.replicateOptions.live = true;

		return this.handleNetwork$(poParams.database).pipe(
			mergeMap(() => this.innerReplicate$(poParams))
		);
	}

	/**
	 * @param poParams
	 * @throws `ReplicateError` si une erreur survient lors de la réplication.
	 */
	private innerReplicate$(
		poParams: IReplicatorParamsBase
	): Observable<IStoreReplicationResponse> {
		const lbIsSyncToRemote: boolean = StoreHelper.isRemoteDatabase(poParams.targetInstance.name);

		poParams.replicateOptions = this.prepareReplicateOptions(false, lbIsSyncToRemote, poParams.replicateOptions);

		const loReplication: PouchDB.Replication.Replication<any> = poParams.targetInstance.replicate.from(poParams.sourceInstance, poParams.replicateOptions);
		const loCompleteEvent$: Observable<PouchDB.Replication.ReplicationResultComplete<any>> =
			fromEvent(loReplication, "complete") as Observable<PouchDB.Replication.ReplicationResultComplete<any>>;
		const loComplete$: Observable<PouchDB.Replication.ReplicationResultComplete<any>> = loCompleteEvent$.pipe(take(1));
		const loError$: Observable<never> = this.replicateOnError(loReplication, loCompleteEvent$);
		const loChange$: Observable<PouchDB.Replication.ReplicationResult<any>> =
			fromEvent(loReplication, "change").pipe(takeUntil(loCompleteEvent$)) as Observable<PouchDB.Replication.ReplicationResult<any>>;
		let loResult: IStoreReplicationResponse | undefined;

		return merge(loChange$, loComplete$, loError$).pipe(
			scan((poAccumulator: IStoreReplicationResponse, poCurrent: PouchDB.Replication.ReplicationResult<any>) =>
				loResult = this.innerReplicate_reduceResults(poCurrent, poAccumulator)
				, {}),
			catchError(poError => {
				this.onReplicationError(poParams.sourceInstance, poParams.targetInstance, poError);
				return throwError(() => new ReplicatorError(poError, loResult?.docs));
			}),
			finalize(() => loReplication.cancel())
		);
	}

	private replicateOnError(
		poReplication: PouchDB.Replication.Replication<any>,
		poCompleteEvent$: Observable<PouchDB.Replication.ReplicationResultComplete<any>>
	): Observable<never> {
		return fromEvent(poReplication, "error")
			.pipe(
				mergeMap(poError => throwError(() => poError)),
				takeUntil(poCompleteEvent$)
			);
	}

	/**
	 * @param poParams
	 * @param pfOnCancel
	 * @param pfOnProgress
	 * @throws `ReplicateError` si une erreur survient lors de la réplication.
	 */
	private async innerReplicateAsync(poParams: IReplicatorParamsBase, pfOnCancel: (pfCallback: () => void) => void, pfOnProgress?: IOnProgressFunction)
		: Promise<IStoreReplicationResponse> {

		const lbIsSyncFromRemote: boolean = StoreHelper.isRemoteDatabase(poParams.sourceInstance.name);
		const lbIsSyncToRemote: boolean = StoreHelper.isRemoteDatabase(poParams.targetInstance.name);

		poParams.replicateOptions = this.prepareReplicateOptions(false, lbIsSyncToRemote, poParams.replicateOptions);
		const loReplication: PouchDB.Replication.Replication<any> = poParams.targetInstance.replicate.from(poParams.sourceInstance, poParams.replicateOptions);

		pfOnCancel(() => loReplication.cancel());

		const loPerformanceManager = new PerformanceManager().markStart();
		const lnTargetTotalSeqNumber: number = StoreHelper.getSequenceNumber((await poParams.sourceInstance.info()).update_seq);
		let lnOriginSeqNumber: number;
		let loResult: IStoreReplicationResponse | undefined;

		if (lbIsSyncFromRemote)
			lnOriginSeqNumber = StoreHelper.getSequenceNumber(poParams.database.syncMarker?.remoteSequenceNumber ?? "0");
		else
			lnOriginSeqNumber = poParams.database.syncMarker?.localSequenceNumber ?? 0;

		if (pfOnProgress)
			pfOnProgress(this.createSyncEvent(lnOriginSeqNumber, lnTargetTotalSeqNumber), loResult);


		return new Promise<IStoreReplicationResponse>((pfResolve: (poResponse: IStoreReplicationResponse) => void, pfReject: (poError: any) => void) => {
			loReplication.on("complete", (poReplicationResult: PouchDB.Replication.ReplicationResultComplete<IStoreDocument>) => {
				console.debug(`${ClassicReplicatorService.C_LOG_ID}Replication from '${poParams.sourceInstance.name}' to '${poParams.targetInstance.name}' with replicate mode '${this.name}' finished in ${loPerformanceManager.markEnd().measure()}ms.`);
				pfResolve(this.innerReplicate_reduceResults(poReplicationResult, loResult));
			});

			loReplication.on("change", async (poReplicationResult: PouchDB.Replication.ReplicationResult<IStoreDocument>) => {
				loResult = this.innerReplicate_reduceResults(poReplicationResult, loResult);

				if (pfOnProgress)
					pfOnProgress(this.createSyncEvent(lnOriginSeqNumber, lnTargetTotalSeqNumber, poReplicationResult), loResult);
			});

			loReplication.on("error", (poError: any) => {
				this.onReplicationError(poParams.sourceInstance, poParams.targetInstance, poError);
				pfReject(poError);
			});
		})
			.catch(poError => { throw new ReplicatorError(poError, loResult?.docs); });
	}

	private createSyncEvent(lnOriginSeqNumber: number, lnTargetTotalSeqNumber: number, poReplicationResult?: PouchDB.Replication.ReplicationResult<IStoreDocument>): ISynchronizationEvent {
		return {
			loaded: poReplicationResult ? (StoreHelper.getSequenceNumber(poReplicationResult.last_seq) - lnOriginSeqNumber) : 0,
			total: lnTargetTotalSeqNumber - lnOriginSeqNumber
		};
	}

	private onReplicationError(poSourceInstance: PouchDB.Database<{}>, poTargetInstance: PouchDB.Database<{}>, poError: any): void {
		return console.error(`${ClassicReplicatorService.C_LOG_ID}Replication from '${poSourceInstance.name}' to '${poTargetInstance.name}' with replicate mode '${this.name}' failed.`, poError);
	}

	private innerReplicate_reduceResults(
		poCurrent: PouchDB.Replication.ReplicationResult<any>,
		poAccumulator?: IStoreReplicationResponse
	): IStoreReplicationResponse {
		const ldNow = new Date;

		if (ObjectHelper.isNullOrEmpty(poAccumulator)) {
			poAccumulator = {
				...poCurrent,
				start_time: new Date(poCurrent.start_time).toISOString(),
				end_time: ldNow.toISOString(),
				status: "ok",
				replicationMode: this.name
			};
		}
		else {
			if (poCurrent.docs)
				poAccumulator.docs?.push(...poCurrent.docs);

			poAccumulator.end_time = ldNow.toISOString();
		}

		return poAccumulator;
	}

	/** Modifie/Crée et retourne l'objet des options de réplication en mettant certaines valeurs prédéfinies.
	 * @param poReplicateOptions Objet d'options de réplication, peut être non défini.
	 * @param pbIsLiveReplication Indique si les options de réplication sont pour une réplication live ou non.
	 * @param pbIsToRemote Indique si la réplication est vers le serveur.
	 */
	private prepareReplicateOptions(pbIsLiveReplication: boolean, pbIsToRemote: boolean, poReplicateOptions?: IStoreReplicationOptions): IStoreReplicationOptions {
		if (!poReplicateOptions)
			poReplicateOptions = {};

		if (StringHelper.isBlank(poReplicateOptions.style) && !pbIsToRemote) // Si le style n'est pas indiqué et que la réplication n'est pas vers le serveur, alors on utilise le style main_only
			poReplicateOptions.style = EReplicationStyle.mainOnly;

		if (!pbIsLiveReplication && poReplicateOptions.live !== true) {
			poReplicateOptions.live = false;

			// Si le 'timeout' n'est pas un nombre valide ou n'est pas à 'false' (type number|false) alors on met le 'timeout' par défaut.
			if (!NumberHelper.isValid(poReplicateOptions.timeout as number) && poReplicateOptions.timeout !== false)
				poReplicateOptions.timeout = this.C_DEFAULT_REPLICATION_TIMEOUT_MILLISECONDS;
		}
		else {
			poReplicateOptions.live = true;
			poReplicateOptions.retry = true;
		}

		return poReplicateOptions;
	}

	//#endregion

}