import {CollectionViewer, DataSource} from '@angular/cdk/collections';
import {BehaviorSubject, Observable, of} from 'rxjs';
import {catchError, finalize} from 'rxjs/operators';
import {KPaging} from '../../models';
import {PageResponse} from '../../models';

export interface IDatasourceService<T> {
  search(paging: KPaging, filter: any): Observable<PageResponse<T>>;
}

export class KDatasource<T> implements DataSource<T> {

  private contentSubject = new BehaviorSubject<T[]>([]);
  private loadingSubject = new BehaviorSubject<boolean>(false);
  private totalElementsSubject = new BehaviorSubject<number>(0);

  public loading$ = this.loadingSubject.asObservable();
  public totalElements$ = this.totalElementsSubject.asObservable();
  public content: any;

  constructor(private service: IDatasourceService<T>) {

  }

  load(paging: KPaging, filter: any) {
    this.loadingSubject.next(true);
    this.service.search(paging, filter).pipe(
      catchError(() => of([])),
      finalize(() => this.loadingSubject.next(false))
    ).subscribe((page: PageResponse<T>) => {
      this.content = page.content;
      this.contentSubject.next(this.content);
      this.totalElementsSubject.next(page.totalElements);
    });
  }

  getObservable(): Observable<T[]> {
    console.log('Connecting data source');
    return this.contentSubject.asObservable();
  }

  connect(collectionViewer: CollectionViewer): Observable<T[]> {
    console.log('Connecting data source');
    return this.contentSubject.asObservable();
  }

  updateContent(content) {
    this.contentSubject.next(content);
  }

  getContent() {
    return this.content;
  }

  disconnect(collectionViewer: CollectionViewer): void {
    this.contentSubject.complete();
    this.loadingSubject.complete();
  }
}
