4

I have some code which polls until a task is complete

See below

this.simulationStatus =
  interval(2000).pipe(
    switchMap(
      () => from(this.simulationService.getSimulationStatus(this.route.snapshot.paramMap.get('jobId')))),
    takeUntil(this.stopPoll),
    tap(simulation => {
      if (simulation && simulation.complete) {
        if (this.stopCount == 1) {
          // Get once after complete
          this.stopPoll.next(true);
        }
        this.stopCount++;
      }
    })
  );

I have tried using takeUntil and takeWhile the problem is that that the last value is never published once the task is complete.

To get around this I have to include the tap method to with the stopPoll subject and incrementing the stopCount to get the last value.

So the above works but just feels a bit messy, I'm sure there must be a better way of achieving this?

I would have expected takeUntil to publish the last value or have an override to tell it to e.g takeUntil(observable, {publishLast: true})

BTW Update, the observable is subscribed to by an Angular 6 template Thanks in advance

Lenny D
  • 1,734
  • 4
  • 22
  • 43

3 Answers3

2

One thing you can do is use a custom takeWhile-like operator like this:

const completeWith = <T>(predicate: (arg: T) => boolean) => (
  source: Observable<T>,
) =>
  new Observable<T>(observer =>
    source.subscribe(
      value => {
        observer.next(value);
        if (predicate(value)) {
          observer.complete();
        }
      },
      error => observer.error(error),
      () => observer.complete(),
    ),
  );

It doesn't seem like a good idea to see it as a variation of takeWhite because it's not just taking values while a condition holds, but also emits an extra value.

It might be that a more elegant solution would be make the simulation status observable emit two kinds of values: next notifications and completion notifications, similarly to how materialize/dematerialize operators work.

Ivan
  • 1,317
  • 13
  • 23
  • Looks good, I will probably use this. I agree with your comments regarding the name of takeWhile. However I'm using takeUntil and I think given it's name it makes sense that this has an overload option to emit the last value. Looking on StackOverflow others have certainly come across this situation and need this functionality built in. – Lenny D Aug 10 '18 at 19:11
  • The most elegant solution that I've found. Thanks! – lord5et Apr 09 '21 at 10:53
2

This has in the meantime been implemented in rxjs as takeWhile(condition, ?inclusive):

timer(0, 10).pipe(
    takeWhile((x) => x < 3, true)
)

emits 0, 1, 2, 3

vbraun
  • 1,851
  • 17
  • 14
1

You can also create subject and emit using next() if you want to complete the observable.

this.stopPoll: Subject<any> = new Subject<any>();

If you want to do complete the subscription. you can call this.stopPoll.next(true);

you can access the data in subscribe()

this.simulationStatus.subscribe(success=>{}, failure=>{}, complete=>{});
Suresh Kumar Ariya
  • 9,516
  • 1
  • 18
  • 27
  • Yes this is exactly what I'm doing. and calling this.stopPoll.next(true) in the tap method, but I have 7 even 9 extra lines of code including declarations of stopCount and stopPoll just because the last value isn't published by default. – Lenny D Aug 09 '18 at 12:47
  • You can get the last value in the complete method. i.e third options of subscribe() – Suresh Kumar Ariya Aug 09 '18 at 12:47
  • Also this is an angular template that is subscribing to the observable , probably should mention that also, but yes I can subscribe manually, it might be cleaner – Lenny D Aug 09 '18 at 12:48
  • Subscribing to complete => {} will work, but I lose the async binding to my template so I need to push it again onto another subject that the template binds to. So still a bit messy :/ ! Either that or lose the OnPush changeDetectionStrategy which becomes inefficient – Lenny D Aug 09 '18 at 12:56
  • Found this with takeWhile https://stackoverflow.com/questions/44641246/rxjs-takewhile-but-include-the-last-value all solutions are hacky. I would suggest this is missing functionality in rxjs – Lenny D Aug 09 '18 at 13:04
  • Yes, will raise Issue to Ben Lesh RxJS Github. – Suresh Kumar Ariya Aug 09 '18 at 13:07
  • Was just creating this issue https://github.com/ReactiveX/rxjs/issues/4000, point him at that. But thanks for your time and effort! – Lenny D Aug 09 '18 at 13:13