import { Injectable } from '@angular/core';
import { BehaviorSubject, Subject, Subscription } from 'rxjs';
import { filter } from 'rxjs/operators';


type CallbackFunction<T = any> = (event: T) => void;

interface EventSubject<T> {
  topic: string;
  data: T;
}


// A Message Bus Using An RxJS Subject
// credits: https://www.bennadel.com/blog/3518-trying-to-create-a-message-bus-using-an-rxjs-subject-in-angular-6-1-10.html

// @Injectable({
//   providedIn: 'root'
// })
@Injectable({ providedIn: 'root' })
export class EventService {

  private eventStream = new Subject();
  private subscriptions: { topic: string, dataStream: BehaviorSubject<any>; }[] = [];

  // Initialize the event service.
  constructor() { }

  // Push the given event onto the eventStream.
  public publish(topic: string, data: any): void {
    const payload = { topic: topic, data: data };

    this.eventStream.next(payload);
    let subscription = this.subscriptions.find(s => s.topic === topic);
    if (!subscription) {
      subscription = this.createDataStreamForTopic(topic);
    }
    subscription.dataStream.next(payload);
  }

  private createDataStreamForTopic(topic: string): { topic: string, dataStream: BehaviorSubject<any>; } {
    const subscriptionStream: BehaviorSubject<any> = new BehaviorSubject<any>(null);
    const subscription = { topic: topic, dataStream: subscriptionStream };
    this.subscriptions.push(subscription);
    return subscription;
  }
  // Subscribe to the event, but only invoke the callback when the event is
  // of the given newable type (ie, it's a Class definition, not an instance).
  // --
  public subscribe<U>(topic: string, callback: CallbackFunction<U>, canSendOnePreviousValue: boolean = false): Subscription {
    let subscription: Subscription;
    if (canSendOnePreviousValue) {
      let sub = this.subscriptions.find(s => s.topic === topic);
      if (!sub) {
        sub = this.createDataStreamForTopic(topic);
      }
      subscription = sub.dataStream
        .pipe(
          filter((event: EventSubject<U>): boolean => {
            return (event && event.topic === topic);
          }
          )
        )
        .subscribe((event: EventSubject<U>): void => {
          try {
            callback(event.data);
          } catch (error) {
            // this.errorHandler.handleError(error);
          }
        });
    } else {
      subscription = this.eventStream
        .pipe(filter((event: any) => event && event.topic === topic))
        .subscribe((event: EventSubject<U>): void => {
          try {
            callback(event.data);
          } catch (error) {
            // this.errorHandler.handleError(error);
          }
        });
    }
    return subscription;
  }

    unsubscribe(topic: string) {
        this.unsubscribeDataStreamSubject(topic);
    }
    private unsubscribeDataStreamSubject(topic: string) {
        const subscription = this.subscriptions.find(s => s.topic === topic);
        if (subscription) {
            this.subscriptions = this.subscriptions.filter(s => s.topic !== topic);
            subscription.dataStream.unsubscribe();
        }
    }
}
