import { BehaviorSubject } from 'rxjs';

import { Injectable, OnDestroy } from '@angular/core';
import { WebSocketSubject } from 'rxjs/webSocket';
import { environment } from 'src/environments/environment';
import { of, Subject } from 'rxjs';
import { map, switchMap, retryWhen, delay } from 'rxjs/operators';

export interface WebsocketData {
  type: string;
  data: any;
}

@Injectable({
  providedIn: 'root'
})
export class WebSocketService implements OnDestroy {
  private connection: WebSocketSubject<WebsocketData> | null = null;
  connected = new BehaviorSubject<boolean>(false);

  private RETRY_SECONDS = 10;

  private subscriptions: { [key: string]: any } = {};

  constructor() {
  }

  connect() {
    const apiAddress = environment.webApiBase + 'ws';
    const ws = of(apiAddress).pipe(
      // https becomes wws, http becomes ws
      map(apiUrl => apiUrl.replace(/^http/, 'ws')),
      switchMap(wsUrl => {
        //  console.log('connect to ' + wsUrl);
        this.connection = new WebSocketSubject({
          url: wsUrl,
          openObserver: {
            next: (val: any) => {
              this.connected.next(true);
            }
          },
          closeObserver: {
            next: (val: any) => {
              this.connected.next(false);
            }
          }
        });

        return this.connection;

      }),
      retryWhen((errors) => {
        return errors.pipe(delay(this.RETRY_SECONDS * 1000));
      })
    );

    ws.subscribe({
      next: data => this.onMessageReceived(data)
    });
  }

  observe(messageType: string): Subject<any> {
    if (!this.subscriptions[messageType]) {
      this.subscriptions[messageType] = new Subject<any>();
    }

    return this.subscriptions[messageType];
  }


  onMessageReceived(data: WebsocketData) {
    if (this.subscriptions[data.type]) {
      (this.subscriptions[data.type] as Subject<any>).next(data.data);
    }
  }


  send(data: WebsocketData) {
    if (this.connection) {
      this.connection.next(data);
    }
  }

  closeConnection() {
    if (this.connection) {
      this.connection.complete();
      this.connection = null;
    }
  }

  ngOnDestroy() {
    this.closeConnection();
  }

}
