reactive programming - RXJS : Idiomatic way to create an observable stream from a paged interface -


i have paged interface. given starting point request produce list of results , continuation indicator.

i've created observable built constructing , flat mapping observable reads page. result of observable contains both data page , value continue with. pluck data , flat map subscriber. producing stream of values.

to handle paging i've created subject next page values. it's seeded initial value each time receive response valid next page push pages subject , trigger read until such time there no more read.

is there more idiomatic way of doing this?

function records(start = 'latest', limit = 1000) {   let pages = new rx.subject();    this.connect(start)     .subscribe(page => pages.onnext(page));    let records = pages     .flatmap(page => {       return this.read(page, limit)         .doonnext(result => {           let next = result.next;           if (next === undefined) {             pages.oncompleted();           } else {             pages.onnext(next);           }         });     })     .pluck('data')     .flatmap(data => data);    return records; } 

that's reasonable way it. has couple of potential flaws in (that may or may not impact depending upon use case):

  1. you provide no way observe errors occur in this.connect(start)
  2. your observable hot. if caller not immediately subscribe observable (perhaps store , subscribe later), they'll miss completion of this.connect(start) , observable appear never produce anything.
  3. you provide no way unsubscribe initial connect call if caller changes mind , unsubscribes early. not real big deal, when 1 constructs observable, 1 should try chain disposables call cleans if caller unsubscribes.

here's modified version:

  1. it passes errors this.connect observer.
  2. it uses observable.create create cold observable starts business when caller subscribes there no chance of missing initial page value , stalling stream.
  3. it combines this.connect subscription disposable overall subscription disposable

code:

function records(start = 'latest', limit = 1000) {     return rx.observable.create(observer => {         let pages = new rx.subject();         let connectsub = new rx.singleassignmentdisposable();         let resultssub = new rx.singleassignmentdisposable();         let sub = new rx.compositedisposable(connectsub, resultssub);          // make sure subscribe pages before issue this.connect()         // in case this.connect() finishes synchronously (possible if caches values or something?)         let results = pages             .flatmap(page => this.read(page, limit))             .doonnext(r => this.next !== undefined ? pages.onnext(this.next) : pages.oncompleted())             .flatmap(r => r.data);         resultssub.setdisposable(results.subscribe(observer));          // query first page         connectsub.setdisposable(this.connect(start)             .subscribe(p => pages.onnext(p), e => observer.onerror(e)));          return sub;     }); } 

note: i've not used es6 syntax before, didn't mess here.


Comments

Popular posts from this blog

Payment information shows nothing in one page checkout page magento -

tcpdump - How to check if server received packet (acknowledged) -