1

I created a source function according to this manual.

public static void main(String[] args) throws Exception {
    DirectProvider dp = new DirectProvider();
    Topology top = dp.newTopology();

    final URL url = new URL("http://finance.yahoo.com/d/quotes.csv?s=BAC+COG+FCX&f=snabl");

    TStream<String> linesOfWebsite = top.source(queryWebsite(url));
}

Now I'd like to filter this stream. I had something like this in mind:

TStream<Iterable<String>> simpleFiltered = source.filter(item-> item.contains("BAX");

Which is not working. Does anybody has an idea how to filter the stream? I don't want to change the request url to do the filtering upfront.

approxiblue
  • 6,982
  • 16
  • 51
  • 59

1 Answers1

0

It's difficult to tell from the info provided. dp.submit(top) is needed to run the topology. The filter code isn't specifying an item that occurs using the URL that's being specified. e.g.,

...
TStream<String> linesOfWebsite = top.source(queryWebsite(url));
linesOfWebsite.print(); // show what's received

TStream<String> filtered = linesOfWebsite.filter(t -> t.contains("BAC"));
filtered.sink(t -> System.out.println("filtered: " + t));

dp.submit(top);  // required
dlaboss
  • 21
  • 2