import { Injectable } from '@angular/core'; import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; import { catchError, first, timeout } from 'rxjs/operators'; import { environment } from '../environments/environment'; import { EMPTY, Observable, Observer, throwError } from 'rxjs'; @Injectable({ providedIn: 'root' }) export class HermesSocketService { private socket: WebSocketSubject | undefined = undefined public connect(): void { if (!this.socket || this.socket.closed) { this.socket = this.getNewWebSocket(); } } public first(predicate: (data: T) => boolean): Observable { if (!this.socket || this.socket.closed) { throw new Error('Socket is ' + (this.socket ? 'closed' : 'null') + '.'); } return this.socket.pipe(timeout(3000), catchError((e) => throwError(() => 'No response after 3 seconds.')), first(predicate)); } private getNewWebSocket(): WebSocketSubject { return webSocket({ url: environment.WSS_ENDPOINT }); } public sendMessage(msg: any): void { if (!this.socket || this.socket.closed) return; this.socket.next(msg); } public get$(): Observable | undefined { if (!this.socket || this.socket.closed) { throw new Error('Socket is ' + (this.socket ? 'closed' : 'null') + '.'); } return this.socket.asObservable().pipe(catchError(_ => EMPTY)); } public subscribe(subscriptions: Partial> | ((value: any) => void)) { if (!this.socket || this.socket.closed) return; return this.socket.pipe(catchError(_ => EMPTY)) .subscribe(subscriptions) } public close() { if (!this.socket || this.socket.closed) return; this.socket.complete(); } }