4

I have tried running a flatMap on a Flux range followed by subscribeOn and it seems all operations run on the same thread. Is this normal?

Flux.range(0, 1000000).log().flatMap{ it + 1 }.subscribeOn(Schedulers.parallel()).subscribe()
honk
  • 9,137
  • 11
  • 75
  • 83
Andrew
  • 1,109
  • 1
  • 15
  • 27

2 Answers2

4

You can create a ParallelFlux as follows:

Flux.range(0, 100000).parallel(2).runOn(Schedulers.parallel()).log().map{ it + 1 }.subscribe()
                      ^^^^^^^^^^^  ^^^^^^use runOn ^^^^^^^^^^^
Rene
  • 5,730
  • 17
  • 20
  • So flatMap doesn't break your streams into sub-streams that are processed in parallel? This is how the method was documented and thought only subscribeOn is necessary. – Andrew Nov 21 '18 at 14:21
  • 1
    `flatmap` will only parallelize if you create a new Observable(Flux, Mono) inside the block. In your example with `it +1` the operator `flatMap` will not compile and you should use `map`. I just updated my answer. See https://stackoverflow.com/questions/43269275/parallelflux-vs-flatmap-for-a-blocking-i-o-task – Rene Nov 22 '18 at 10:39
0

Use .subscribeOn(Schedulers.X) to manage subscription

Use .publishOn(Schedulers.X) to manage publishing

Use .parallel(N).runOn(Schedulers.X) when using ParallelFlux.

Christian Meyer
  • 605
  • 8
  • 15