First, a streaming loader. Provide it with a function that, given a max, returns a vector of data (aka loader<T>
). It can store internal state, but it will be copied, so store that internal state in a std::shared_ptr
. I guarantee that only one copy of it will be invoked.
You are not responsible for returning all max
data from your loader, but as written you must return at least 1 element. Returning more is gravy, and may reduce threading overhead.
You then call streaming_loader<T>( your_loader, count )
.
It returns a std::shared_ptr< std::vector< std::future< T > > >
. You can wait on these futures, but you must wait on them in order (the 2nd one is not guaranteed to be ready to be waited on until the first one has provided data).
template<class T>
using loader = std::function< std::vector<T>(size_t max) >;
template<class T>
using stream_data = std::shared_ptr< std::vector< std::future<T> > >;
namespace details {
template<class T>
T streaming_load_some( loader<T> l, size_t start, stream_data<T> data ) {
auto loaded = l(data->size()-start);
// populate the stuff after start first, so they are ready:
for( size_t i = 1; i < loaded.size(); ++i ) {
std::promise<T> promise;
promise.set_value( std::move(loaded[i]) );
(*data)[start+i] = promise.get_future();
}
if (start+loaded.size() < data->size()) {
// recurse:
std::size_t new_start = start+loaded.size();
(*data)[new_start] = std::async(
std::launch::async,
[l, new_start, data]{return streaming_load_some<T>( l, new_start, data );}
);
}
// populate the future:
return std::move(loaded.front());
}
}
template<class T>
stream_data< T >
streaming_loader( loader<T> l, size_t n ) {
auto retval = std::make_shared<std::vector< std::future<T> >>(n);
if (retval->empty()) return retval;
retval->front() = std::async(
std::launch::async,
[retval, l]()->T{return details::streaming_load_some<T>( l, 0, retval );
});
return retval;
}
For use, you take the stream_data<T>
(aka a shared pointer to vector of future data), iterate over it, and .get()
each in turn. Then do your processing. If you need a block of 50 of them, call .get()
on each in turn until you get to 50 -- do not skip to number 50.
Here is a completely toy loader and test harness:
struct loader_state {
int index = 0;
};
struct test_loader {
std::shared_ptr<loader_state> state; // current loading state stored here
std::vector<int> operator()( std::size_t max ) const {
std::size_t amt = max/2+1;// really, really stupid way to decide how much to load
std::vector<int> retval;
retval.reserve(amt);
for (size_t i = 0; i < amt; ++i) {
retval.push_back( -(int)(state->index + i) ); // populate the return value
}
state->index += amt;
return retval;
}
// in real code, make this constructor do something:
test_loader():state(std::make_shared<loader_state>()) {}
};
int main() {
auto data = streaming_loader<int>( test_loader{}, 1024 );
std::size_t count = 0;
for( std::future<int>& x : *data ) {
++count;
int value = x.get(); // get data
// process. In this case, print out 100 in blocks of 10:
if (count * 100 / data->size() > (count-1) * 100 / data->size())
std::cout << value << ", ";
if (count * 10 / data->size() > (count-1) * 10 / data->size())
std::cout << "\n";
}
std::cout << std::endl;
// your code goes here
return 0;
}
count
may or may not be worthless. The internal state of the loader above is pretty darn worthless, I just use it to demonstrate how to store some state.
You can do something similar to destroy a pile of objects without waiting for their destructors to complete. Or, you can rely on the fact that destroying your data can happen while you are working on it and waiting for the next data to load.
live example
In an industrial strength solution, you'd need to include ways to abort all this stuff, among other things. Exceptions might be one way. Also, feedback to the loader about how far behind the processing code is can be helpful (if it is at your heels, return smaller chunks -- if it is way behind, return larger chunks). In theory, that can be arranged via a back channel in loader<T>
.
Now that I have played with the above for a bit, a probably better fit is:
#include <iostream>
#include <future>
#include <functional>
#include <vector>
#include <memory>
// if it returns empty, there is nothing more to load:
template<class T>
using loader = std::function< std::vector<T>() >;
template<class T>
struct next_data;
template<class T>
struct streamer {
std::vector<T> data;
std::unique_ptr<next_data<T>> next;
};
template<class T>
struct next_data:std::future<streamer<T>> {
using parent = std::future<streamer<T>>;
using parent::parent;
next_data( parent&& o ):parent(std::move(o)){}
};
live example. It requires some infrastructure to populate that very first streamer<T>
, but the code will be simpler, and the strange requirement (of knowing how much data, and only doing a .get()
from the first element) goes away.
template<class T>
streamer<T> stream_step( loader<T> l ) {
streamer<T> retval;
retval.data = l();
if (retval.data.empty())
return retval;
retval.next.reset( new next_data<T>(std::async( std::launch::async, [l](){ return stream_step(l); })));
return retval;
}
template<class T>
streamer<T> start_stream( loader<T> l ) {
streamer<T> retval;
retval.next.reset( new next_data<T>(std::async( std::launch::async, [l](){ return stream_step(l); })));
return retval;
}
A downside is that writing a ranged-based iterator becomes a bit trickier.
Here is a sample use of the second implementation:
struct counter {
std::size_t max;
std::size_t current = 0;
counter( std::size_t m ):max(m) {}
std::vector<int> operator()() {
std::vector<int> retval;
std::size_t do_at_most = 100;
while( current < max && (do_at_most-->0)) {
retval.push_back( int(current) );
++current;
}
return retval;
}
};
int main() {
streamer<int> s = start_stream<int>( counter(1024) );
while(true) {
for (int x : s.data) {
std::cout << x << ",";
}
std::cout << std::endl;
if (!s.next)
break;
s = std::move(s.next->get());
}
// your code goes here
return 0;
}
where counter
is a trivial loader (an object that reads data into a std::vector<T>
in whatever sized chunks it feels like). The processing of the data is in the main
code, where we just print them out in get-sized chunks.
The loading happens in a different thread, and will continue asynchronously whatever the main thread does. The main thread just gets delivered std::vector<T>
to do with as they will. In your case, you'd make T
a Mat
.