I have the source of the messages, which is an Observable
. For every message I would like to make an HTTP call which will produce another Observable
, so I combine them together with the flatMap
and then sink them to some subscriber. Here the code of this scenario:
Rx.Observable.interval(1000)
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.flatMap (message) ->
// also returns and `Observable`
makeHttpRequest(message)
.subscribe (result) ->
console.info "Processed: ", result
this example is written in coffeescript, but I think the problem statement would be valid for any other Rx implementation.
The issue I have with this approach is that loadMessages
produces a lot of messages very quickly. This means, that I make a lot of HTTP requests in a very short period of time. This is not acceptable in my situation, so I would like to limit amount of the parallel HTTP requests to 10 or so. In other words I would like to throttle the pipelene or apply some kind of backpresure, when I making HTTP requests.
Is there any standard approach or best practices for the Rx to deal with this kind of situations?
Currently I implemented very simple (and pretty suboptimal) backpresure mechanism, that ignores tick if system has too many massages in processing. It looks like this (simplified version):
Rx.Observable.interval(1000)
.filter (tick) ->
stats.applyBackpressureBasedOnTheMessagesInProcessing()
.do (tick) ->
stats.messageIn()
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.flatMap (message) ->
// also returns and `Observable`
makeHttpRequest(message)
.do (tick) ->
stats.messageOut()
.subscribe (result) ->
console.info "Processed: ", result
I'm not sure though, whether this can be done better, or maybe Rx already has some mechanisms in-place to deal with this kind of requirements.