13

I played around with angular2 and got stuck after a while.

Using http.get works fine for a single request, but I want to poll live-data every 4 seconds, after tinkering for quite a while and reading a lot of reactivex stuff i ended up with:

Observable.timer(0,4000)
  .flatMap(
    () => this._http.get(this._url)
       .share()
       .map(this.extractData)
       .catch(this.handleError)
  )
  .share(); 

Is there a simple way to start a (4 second) interval after the http.get-observable has emitted the result of the request? (Or will I end up in observable-hell?)

Timeline i want:

Time(s): 0 - - - - - 1 - - - - - 2 - - - - - 3 - - - - - 4 - - - - - 5 - - - - - 6
Action:  Request - - Response - - - - - - - - - - - - - - - - - - - -Request-... 
Wait:                | wait for 4 seconds -------------------------> |
frido
  • 13,065
  • 5
  • 42
  • 56
SpazzMarticus
  • 1,218
  • 1
  • 20
  • 40

4 Answers4

13

Update to RxJS 6

import { timer } from 'rxjs';
import { concatMap, map, expand, catchError } from 'rxjs/operators';

pollData$ = this._http.get(this._url)
  .pipe(
    map(this.extractData),
    catchError(this.handleError)
  );

pollData$.pipe(
  expand(_ => timer(4000).pipe(concatMap(_ => pollData$)))
).subscribe();

I'm using RxJS 5 and I'm not sure what the RxJS 4 equivalent operators are. Anyway here is my RxJS 5 solution, hope it helps:

var pollData = this._http.get(this._url)
            .map(this.extractData)
            .catch(this.handleError);
pollData.expand(
  () => Observable.timer(4000).concatMap(() => pollData)
).subscribe();

The expand operator will emit the data and recursively start a new Observable with each emission

frido
  • 13,065
  • 5
  • 42
  • 56
Can Nguyen
  • 1,470
  • 10
  • 11
  • 1
    Okay, this one is better than mine! :) Bummer that the dokumentation on `expand` is quite vague. :( – SpazzMarticus Jun 27 '16 at 08:04
  • 5
    FYI: The recursion won't hit a `Maximum call stack size exceeded.` error. Tested it with >100k network-calls. Was still fast and didn't eat up all RAM. – SpazzMarticus Jun 27 '16 at 08:12
  • 1
    @Alex Whether polling stops on http errors depends on your `this.handleError` function. If you emit an Error using `throwError` polling will terminate but if you emit a default value on errors using `of` etc. then polling will continue. – frido Jan 24 '19 at 16:34
  • Do you have some example of marble testing this scenario? I tried to test it for last 3 days without any luck. – bartosz.baczek Jan 29 '19 at 12:04
2

I managed to do it myself, with the only downside beeing that http.get can't be repeated more easily.

pollData(): Observable<any> {

  //Creating a subject
  var pollSubject = new Subject<any>();

  //Define the Function which subscribes our pollSubject to a new http.get observable (see _pollLiveData() below)
  var subscribeToNewRequestObservable = () => {
    this._pollLiveData()
      .subscribe(
      (res) => { pollSubject.next(res) }
      );
  };

  //Subscribe our "subscription-function" to custom subject (observable) with 4000ms of delay added
  pollSubject.delay(4000).subscribe(subscribeToNewRequestObservable);

  //Call the "subscription-function" to execute the first request
  subscribeToNewRequestObservable();

  //Return observable of our subject
  return pollSubject.asObservable();

}

private _pollLiveData() {

  var url = 'http://localhost:4711/poll/';

  return this._http.get(url)
    .map(
    (res) => { return res.json(); }
    );
};

Here is why you can't use the more straight forward subscription:

var subscribeToNewRequestObservable = () => {
    this._pollLiveData()
      .subscribe(pollSubject);
  };

The completion the http.get-observable would also complete your subject and prevent it from emitting further items.


This is still a cold observable, so unless you subscribe to it no requests will be made.

this._pollService.pollData().subscribe(
  (res) => { this.count = res.count; }
);
SpazzMarticus
  • 1,218
  • 1
  • 20
  • 40
1

A minor rework of the answer from Can Nguyen, in case you want polling delay to depend on previous request completion status.

var pollData = () => request()   // make request
    .do(handler, errorHandler)   // handle response data or error
    .ignoreElements()            // ignore request progress notifications
    .materialize();              // wrap error/complete notif-ns into Notification

pollData()                            // get our Observable<Notification>...
  .expand(                            // ...and recursively map...
    (n) => Rx.Observable              // ...each Notification object...
      .timer(n.error ? 1000 : 5000)   // ...(with delay depending on previous completion status)...
      .concatMap(() => pollData()))   // ...to new Observable<Notification>
  .subscribe();

Plunk.

Or alternatively:

var pollData = () => request()             // make request
    .last()                                // take last progress value
    .catch(() => Rx.Observable.of(null));  // replace error with null-value

pollData()
  .expand(
    (data) => Rx.Observable
      .timer(data ? 5000 : 1000)           // delay depends on a value
      .concatMap(() => pollData()))
  .subscribe((d) => {console.log(d);});    // can subscribe to the value stream at the end

Plunk.

Alex Che
  • 6,659
  • 4
  • 44
  • 53
-1

You can try using interval if that is more convenient. Calling subscribe gives you Subscription that lets you cancel the polling after sometime.

let observer = Observable.interval(1000 * 4);
let subscription = observer.subsscribe(x => {
    this._http.get(this._url)
     .share()
     .map(this.extractData)
     .catch(this.handleError)
});

....
// if you don't require to poll anymore..
subscription.unsubscribe();
Jagannath
  • 3,995
  • 26
  • 30
  • 2
    `interval` will start after 4 seconds, not like timer at once for the first request and then wait. And it also doesn't count from when the `http.get`-request is done. – SpazzMarticus Jun 21 '16 at 08:31