reactive programming - RXJS : Idiomatic way to create an observable stream from a paged interface -
i have paged interface. given starting point request produce list of results , continuation indicator.
i've created observable built constructing , flat mapping observable reads page. result of observable contains both data page , value continue with. pluck data , flat map subscriber. producing stream of values.
to handle paging i've created subject next page values. it's seeded initial value each time receive response valid next page push pages subject , trigger read until such time there no more read.
is there more idiomatic way of doing this?
function records(start = 'latest', limit = 1000) { let pages = new rx.subject(); this.connect(start) .subscribe(page => pages.onnext(page)); let records = pages .flatmap(page => { return this.read(page, limit) .doonnext(result => { let next = result.next; if (next === undefined) { pages.oncompleted(); } else { pages.onnext(next); } }); }) .pluck('data') .flatmap(data => data); return records; }
that's reasonable way it. has couple of potential flaws in (that may or may not impact depending upon use case):
- you provide no way observe errors occur in
this.connect(start)
- your observable hot. if caller not immediately subscribe observable (perhaps store , subscribe later), they'll miss completion of
this.connect(start)
, observable appear never produce anything. - you provide no way unsubscribe initial
connect
call if caller changes mind , unsubscribes early. not real big deal, when 1 constructs observable, 1 should try chain disposables call cleans if caller unsubscribes.
here's modified version:
- it passes errors
this.connect
observer. - it uses
observable.create
create cold observable starts business when caller subscribes there no chance of missing initial page value , stalling stream. - it combines
this.connect
subscription disposable overall subscription disposable
code:
function records(start = 'latest', limit = 1000) { return rx.observable.create(observer => { let pages = new rx.subject(); let connectsub = new rx.singleassignmentdisposable(); let resultssub = new rx.singleassignmentdisposable(); let sub = new rx.compositedisposable(connectsub, resultssub); // make sure subscribe pages before issue this.connect() // in case this.connect() finishes synchronously (possible if caches values or something?) let results = pages .flatmap(page => this.read(page, limit)) .doonnext(r => this.next !== undefined ? pages.onnext(this.next) : pages.oncompleted()) .flatmap(r => r.data); resultssub.setdisposable(results.subscribe(observer)); // query first page connectsub.setdisposable(this.connect(start) .subscribe(p => pages.onnext(p), e => observer.onerror(e))); return sub; }); }
note: i've not used es6 syntax before, didn't mess here.
Comments
Post a Comment