1

I have the source of the messages, which is an Observable. For every message I would like to make an HTTP call which will produce another Observable, so I combine them together with the flatMap and then sink them to some subscriber. Here the code of this scenario:

Rx.Observable.interval(1000)
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.subscribe (result) ->
  console.info "Processed: ", result

this example is written in coffeescript, but I think the problem statement would be valid for any other Rx implementation.

The issue I have with this approach is that loadMessages produces a lot of messages very quickly. This means, that I make a lot of HTTP requests in a very short period of time. This is not acceptable in my situation, so I would like to limit amount of the parallel HTTP requests to 10 or so. In other words I would like to throttle the pipelene or apply some kind of backpresure, when I making HTTP requests.

Is there any standard approach or best practices for the Rx to deal with this kind of situations?

Currently I implemented very simple (and pretty suboptimal) backpresure mechanism, that ignores tick if system has too many massages in processing. It looks like this (simplified version):

Rx.Observable.interval(1000)
.filter (tick) ->
  stats.applyBackpressureBasedOnTheMessagesInProcessing()
.do (tick) ->
  stats.messageIn()
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.do (tick) ->
  stats.messageOut()
.subscribe (result) ->
  console.info "Processed: ", result

I'm not sure though, whether this can be done better, or maybe Rx already has some mechanisms in-place to deal with this kind of requirements.

tenshi
  • 26,268
  • 8
  • 76
  • 90

3 Answers3

2

This isn't strictly backpressure, this is just limiting concurrency. Here's an easy way to do it (ignore my possibly wrong syntax, coding via TextArea):

Rx.Observable.interval(1000)
    .flatMap (tick) ->
        // returns an `Observable`
        loadMessages()
    .map (message) ->
        // also returns and `Observable`, but only when
        // someone first subscribes to it
        Rx.Observable.defer ->
            makeHttpRequest(message)
    .merge 10 // at a time
    .subscribe (result) ->
        console.info "Processed: ", result

In C#, the equivalent idea is, instead of SelectMany, it's Select(Defer(x)).Merge(n). Merge(int) subscribes to at most n in-flight Observables, and buffers the rest until later. The reason we have a Defer, is to make it such that we don't do any work until the Merge(n) subscribes to us.

Ana Betts
  • 73,868
  • 16
  • 141
  • 209
  • 1
    But you have a problem in that your job queue of observables builds up and are only processed 10 at a time. If you don't process them fast enough you get a memory leak. You want to replace the ``Rx.Observable.interval(1000)`` with a sampler that generates new observables when the number of active jobs drops below 10. That would be ``backpressure`` – bradgonesurfing Mar 24 '14 at 07:57
  • You're correct in the General Case, but looking at the sample they gave, I dont think that'll be an issue – Ana Betts Mar 24 '14 at 15:28
  • Nice to know about the defer. I think @bradgonesurfing has a valid point, so message buffering also should be taken into the consideration. – tenshi Mar 26 '14 at 13:42
1

It sounds like you want to pull from a queue rather than push your http requests. Is Rx really the right choice of technology here?

EDIT:

In general, I would not design a solution using Rx where I had complete imperative control over the source events. It's just not a reactive scenario.

The backpressure module in Rxjs is clearly written to deal with situations where you don't own the source stream. Here you do.

TPL Dataflow sounds like a far better fit here.

If you must use RX, you could set up a loop like this: If you want to limit to X concurrent events, set up a Subject to act as your message source and imperatively push (OnNext) X messages into it. In your subscriber, you can push a new message to the subject in each iteration of the OnNext handler until the source is exhausted. This guarantees a maximum of X messages in flight.

James World
  • 29,019
  • 9
  • 86
  • 120
  • You are right, in this particular case I'm pulling. I provided more detail just to make an example more concrete, but this does not mean that it will always remain like this. At some point I can (and would like to) switch to the push model for the message source. IMHO, it will not change much. I still have high probability, that messages will be pushed in big amounts very fast, and I need to somehow "guard" the middle part of the pipeline that calls `makeHttpRequest` method. – tenshi Mar 17 '14 at 22:50
  • "Is Rx really the right choice of technology here?" This is something that I want to find out. So far it worked pretty good for the relatively complex message processing flow I currently implementing. So I'm getting the feeling, that it suits very well for the task. I have only several small concerns, and this is one of them. I need to apply this kind of backpressure in several places. I found my own solutions for them (mostly by buffering some amount of messages and applying backpressure), but I was just wondering, whether there is more idiomatic way to solve this kind of issues. – tenshi Mar 17 '14 at 23:04
  • [This question](http://stackoverflow.com/questions/21860133/throttling-events-and-locking-methods/), while not directly addressing yours, is probably of interest. I am concerned by the need to parallelize - Rx alone is often not well suited here - TPL [Dataflow](http://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx) may work better. Otherwise, you can combine approaches by pushing the Rx pipeline onto a queue and using workers to handle requests, and push worker results out through a Subject. That's often simpler than the gymnastics of handling back-pressure directly in Rx. – James World Mar 17 '14 at 23:25
  • Updated my answer with a simple way of limiting the messages in the pipe. – James World Mar 20 '14 at 13:20
1

In RXJS you can use the backpressure submodule

http://rxjs.codeplex.com/SourceControl/latest#src/core/backpressure/

disclaimer I have never used the RX version of JS but you did ask for a standard way of implementing backpressure and the core library seems to have support for it. RX c# does not yet have this support. Not sure why.

bradgonesurfing
  • 30,949
  • 17
  • 114
  • 217