import { Inject, Injectable, InjectionToken, OnDestroy } from '@angular/core';
import { IRingMessage } from '@bk/jscommondatas';
import { MESSAGE_TOPICS, SingleAPIConfig, WebsocketConfigurationModel } from '@libs/shared/models';
import { Observable, Subject } from 'rxjs';
import { filter, takeUntil } from 'rxjs/operators';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Base64, parseRingMessage } from '../utils';
import { ConfigurationService } from './configuration.service';

const ParamsURL = new InjectionToken<string>('client url');
const ParamsWebsocketConfig = new InjectionToken<WebsocketConfigurationModel>('websocket configuration');

@Injectable()
export class WebsocketService<T extends IRingMessage<T>> implements OnDestroy {
	private socket$: WebSocketSubject<T> | WebSocketSubject<IRingMessage<T>> | undefined = undefined;
	private url: string = '';
	private observables$: Record<string, Observable<T> | Observable<IRingMessage<T>>> = {};
	private readonly ngUnsubscribe$ = new Subject<void>();

	constructor(
		@Inject(ParamsURL) private configurationService: ConfigurationService<any>,
		@Inject(ParamsWebsocketConfig) private websocketConfigurationModel: WebsocketConfigurationModel
	) {
		this.configurationService
			.getApiDefinition$('websockets', 'ring')
			.pipe(
				takeUntil(this.ngUnsubscribe$),
				filter((data) => !!data)
			)
			.subscribe((data: SingleAPIConfig) => {
				this.url = data.url;

				const deserializer = (msg: MessageEvent): IRingMessage<T> => {
					return this.processMessage(msg);
				};

				if (this.url) {
					this.socket$ = webSocket({ url: this.url, deserializer: (msg) => deserializer(msg) });
				}
			});
	}

	private processMessage(msg: MessageEvent): T | IRingMessage<T> {
		if (this.websocketConfigurationModel.parseRingMessage) {
			return parseRingMessage(msg.data);
		} else {
			return this.websocketConfigurationModel.deserialize ? JSON.parse(msg.data) : msg.data;
		}
	}

	// identifier is used for distinguishing websocket streams
	public addNewMultiplexListener(identifier: string, filterRules: (data: any) => boolean): Observable<IRingMessage<T> | T> | undefined {
		if (!this.observables$[identifier]) {
			const sub$ = this.socket$?.multiplex(
				() => ({ subscribe: identifier }),
				() => ({ unsubscribe: identifier }),
				(message) => {
					return filterRules(message);
				}
			);

			if (sub$) {
				this.observables$[identifier] = sub$;
			}
			return sub$;
		} else {
			console.warn(`Websocket connection for identifier: ${identifier} already exists to client ${this.url}`);
		}

		return undefined;
	}

	removeMultiplexListener(identifier: string): void {
		if (this.observables$?.[identifier]) {
			delete this.observables$[identifier];
		} else {
			console.warn(`Websocket connection for identifier: ${identifier} does not exist to client ${this.url}`);
		}
	}

	sendMessage(topic: MESSAGE_TOPICS, message: Record<any, any> | string): void {
		if (this.socket$) {
			if (typeof message === 'string') {
				try {
					// if its base64, i can decode it without error, thus send stomp message as base 64
					const base64 = Base64.encode(message);
					this.socket$.next(base64 as any);
				} catch (e) {
					//if message was not
					this.socket$.next(Base64.decode(message) as any);
				}
			} else if (typeof message === 'object' && message !== null) {
				const msg = JSON.stringify(message);
				const msgToSend = this.getMessageInS3dPostFormat(topic, msg);
				this.socket$.next(msgToSend as any);
			}
		}
	}

	private getMessageInS3dPostFormat(topic: MESSAGE_TOPICS, message: string): string {
		if (topic.startsWith('/topic/')) {
			topic = topic.slice(7) as MESSAGE_TOPICS;
		}

		//only ring messages has to be base64 encoded
		const shouldEncode = topic === MESSAGE_TOPICS.TO_BK_GROUP;

		let contentToSend = message;
		if (shouldEncode) {
			contentToSend = Base64.encode(message);
		}

		return '[s3d_post]:' + topic + '|' + contentToSend;
	}

	ngOnDestroy() {
		this.ngUnsubscribe$.next();
		this.ngUnsubscribe$.complete();

		Object.keys(this.observables$).forEach(this.removeMultiplexListener);
	}
}
