0

First time with RxJS. Basically, I'm trying to make a twitter scraper which retrieves tweets from a query string. The search url allows specifying of a min_position parameter which can be the last id of the previous search to sort of paginate.

The process kind of looks like this (where it loops back at the end):

get page -> next() each scraped tweet -> set min_position -> get page (until !has_more_items)

Requesting the page returns a promise and so I somehow have to wait until this is completed until I can proceed. I was hoping to pass an async function to Observable.create() but that doesn't seem to work, it's only called a single time.

EDIT

I've had a play around after reading your resources as best as I could. I came up with the following abstraction of my problem.

import { from, Observable } from 'rxjs'
import { concatMap, map, switchMap } from 'rxjs/operators'

let pageNumber = 0
const PAGE_SIZE = 3, MAX_PAGES = 3
async function nextPage() {
    if (pageNumber >= MAX_PAGES) {
        throw new Error('No more pages available')
    }

    await new Promise(res => setTimeout(res, 500)) // delay 500ms

    const output = []
    const base = pageNumber++ * PAGE_SIZE
    for (let i = 0; i < PAGE_SIZE; i++) {
        output.push(base + i)
    }
    return output
}

function parseTweet(tweet: number): string {
    // simply prepend 'tweet' to the tweet
    return 'tweet ' + tweet
}

const getTweets = (): Observable<string> => {
    return from(nextPage()) // gets _html_ of next page
        .pipe(
          concatMap(page => page), // spreads out tweet strings in page
          map(tweet => parseTweet(tweet)), // parses each tweet's html
          switchMap(() => getTweets()) // concat to next page's tweets
          // stop/finish observable when getTweets() observable returns an error
        )
}

getTweets()
    .subscribe(val => console.log(val))

It's quite close to working but now whenever nextPage() returns a rejected promise, the entire observable breaks (nothing logged to the console).

I've attempted inserting a catchError after the pipe to finish the observable instead of running through and throwing an error but I can't get it to work.

Also this implementation is recursive which I was hoping to avoid because it's not scalable. I don't know how many tweets/pages will be processed in the observable in future. It also seems that tweets from all 3 pages must be processed before the observable starts emitting values which of course is not how it should work.

Thanks for your help! :)

James Middleton
  • 679
  • 1
  • 5
  • 8

2 Answers2

0

We need to loadTwits until some condition and somehow work with Promise? Take a look at example:

function loadTwits(id) {
  // Observable that replay last value and have default one
  twitId$ = new BehaviorSubject(id);

  return twitId$.pipe(
    // concatMap - outside stream emit in order inner do
    // from - convert Promise to Observable
    concatMap(id => from(fetchTwits(id))),
    map(parseTwits),
    // load more twits or comlete
    tap(twits => getLastTwitId(twits) ? twitId$.next(getLastTwitId(twits)) : twitId$.complete())
  )
}
Buggy
  • 3,539
  • 1
  • 21
  • 38
  • Hi Buggy, thanks for your answer. With the final line, `tap`, instead of circling back after each page, would this not do it after each tweet? – James Middleton Sep 12 '18 at 08:40
  • Hi James, i'm assume that `fetchTwits(id)` do request for a new page. if it is not correct - please describe more clearly what do you have and what do you want to get. – Buggy Sep 12 '18 at 08:47
0

I figured it out after looking further into expand and realising it was recursion that I needed in my observable. This is the code that creates the observable:

const nextPage$f = () => from(nextPage()) // gets _html_ of next page
    .pipe(
        concatMap(page => page), // spreads out tweet strings in page
        map(tweet => parseTweet(tweet)) // parses each tweet's html
    )

const tweets$ = nextPage$f()
    .pipe(
        expand(() => morePages() ? nextPage$f() : empty())
    )
James Middleton
  • 679
  • 1
  • 5
  • 8