First time with RxJS. Basically, I'm trying to make a twitter scraper which retrieves tweets from a query string. The search url allows specifying of a min_position parameter which can be the last id of the previous search to sort of paginate.
The process kind of looks like this (where it loops back at the end):
get page -> next() each scraped tweet -> set min_position -> get page (until !has_more_items)
Requesting the page returns a promise and so I somehow have to wait until this is completed until I can proceed. I was hoping to pass an async function to Observable.create()
but that doesn't seem to work, it's only called a single time.
EDIT
I've had a play around after reading your resources as best as I could. I came up with the following abstraction of my problem.
import { from, Observable } from 'rxjs'
import { concatMap, map, switchMap } from 'rxjs/operators'
let pageNumber = 0
const PAGE_SIZE = 3, MAX_PAGES = 3
async function nextPage() {
if (pageNumber >= MAX_PAGES) {
throw new Error('No more pages available')
}
await new Promise(res => setTimeout(res, 500)) // delay 500ms
const output = []
const base = pageNumber++ * PAGE_SIZE
for (let i = 0; i < PAGE_SIZE; i++) {
output.push(base + i)
}
return output
}
function parseTweet(tweet: number): string {
// simply prepend 'tweet' to the tweet
return 'tweet ' + tweet
}
const getTweets = (): Observable<string> => {
return from(nextPage()) // gets _html_ of next page
.pipe(
concatMap(page => page), // spreads out tweet strings in page
map(tweet => parseTweet(tweet)), // parses each tweet's html
switchMap(() => getTweets()) // concat to next page's tweets
// stop/finish observable when getTweets() observable returns an error
)
}
getTweets()
.subscribe(val => console.log(val))
It's quite close to working but now whenever nextPage()
returns a rejected promise, the entire observable breaks (nothing logged to the console).
I've attempted inserting a catchError
after the pipe
to finish the observable instead of running through and throwing an error but I can't get it to work.
Also this implementation is recursive which I was hoping to avoid because it's not scalable. I don't know how many tweets/pages will be processed in the observable in future. It also seems that tweets from all 3 pages must be processed before the observable starts emitting values which of course is not how it should work.
Thanks for your help! :)