import {
    RealTimeDataSource,
    Listeners,
    Unsubscribe,
    Message,
    RealtimeActionTypes,
} from '../types';
import { bindModuleLogger } from 'utils/logger';
import sendTraceReport from '../sendTraceReport';
import { DATA_SOURCE } from 'utils/constants/featureNames';
import Ably from 'ably';

const logger = bindModuleLogger('Ably DataSource');

class AblyDataSource implements RealTimeDataSource {
    private ablyClient: Ably.Realtime;
    private perf: any;

    constructor(ablyClient: Ably.Realtime, perf: any) {
        this.ablyClient = ablyClient;
        this.perf = perf;
    }

    subscribe(
        channel: string,
        listeners: Listeners,
        options: any = {}
    ): Unsubscribe {
        if (!this.ablyClient) {
            throw new Error('Invalid AblyClient');
        }

        let activeListeners = [];
        logger.debug(
            'Adding Listeners',
            Object.keys(listeners),
            channel,
            options
        );

        const ablyChannel = this.ablyClient.channels.get(channel);

        const subscription = ablyChannel.subscribe(function (ablyMessage) {
            const key = ablyMessage.id;
            const value = ablyMessage.data;
            const message: Message = {
                metadata: { key },
                payload: value,
                action: ablyMessage.name,
            };

            if (message?.payload?.metrics) {
                message.payload.metrics.tr = Date.now();
                sendTraceReport({
                    perf: this.perf,
                    source: DATA_SOURCE.ABLY,
                    key,
                    metrics: message.payload.metrics,
                });
            }

            switch (ablyMessage.name) {
                case RealtimeActionTypes.ADDED:
                    if (listeners.onAdd) {
                        listeners.onAdd(message);
                    }
                    if (listeners.onMessage) {
                        listeners.onMessage(message);
                    }
                    break;
                case RealtimeActionTypes.CHANGED:
                    if (listeners.onChange) {
                        listeners.onChange(message);
                    }
                    if (listeners.onMessage) {
                        listeners.onMessage(message);
                    }
                    break;
                case RealtimeActionTypes.REMOVED:
                    if (listeners.onRemove) {
                        listeners.onRemove(message);
                    }
                    if (listeners.onMessage) {
                        listeners.onMessage(message);
                    }
                    break;

                case RealtimeActionTypes.SNAPSHOT:
                    if (!listeners.onSnapshot) {
                        logger.debug('No on snapshot listener added');
                        return;
                    } else {
                        listeners.onSnapshot(message);
                    }
                    break;

                default:
                    listeners.onMessage && listeners.onMessage(message);
            }
        });

        activeListeners.push(subscription);

        return () => {
            logger.debug('Removing listeners', channel);
            activeListeners.forEach((subscription) => {
                ablyChannel.unsubscribe(subscription);
            });
        };
    }

    fetch(query: object): Promise<any> {
        throw new Error('Not Yet implemented');
    }

    registerOnDisconnect(channel: string, message: any): Promise<boolean> {
        throw new Error('Not Yet implemented');
    }

    cancelOnDisconnect(channel: string): Promise<boolean> {
        throw new Error('Not Yet implemented');
    }
}

export default AblyDataSource;
