Mongoose 4.5.0 is just around the corner (current ETA is June 3), and it's bringing several powerful new features. If you want to get a sneak peak, check out the branch on GitHub In this article, I'll highlight one feature I'm especially pumped for: the new cursor API, or, modern streaming for mongoose.
A Brief History of Mongoose Streams
Mongoose has supported a streaming interface for queries since v2.4.0 in 2011. It supported streaming before the underlying mongodb driver did. The point of streaming is to process documents one at a time for query results that are too big to fit into memory at once. Here's an example of how the streaming API works in mongoose.
const stream = Customer.find({ name: 'Axl' }).stream();
// Print every document that matches the query, one at a time
stream.on('data', doc => { console.log(doc); });
The bad news is that mongoose's stream code has been
largely stagnant since 2012.
Since then, Node.js has released 2 major overhauls of the stream API,
which means that mongoose's streaming API is obsolete as well as
formally deprecated.
With 4.5.0, we're introducing a new query method, .cursor()
, that behaves
like .stream()
, but with a few subtle differences.
What are Cursors?
In MongoDB parlance, a cursor is an object that
you can use to iterate through the results of a query. If you execute a query
against a MongoDB server directly, the result is a cursor rather than a bunch
of documents. Similarly, the
MongoDB Node.js driver will return a cursor from find()
. In most cases the
cursor API is overkill, so mongoose hides it from you by default.
The most important part of the MongoDB Node.js driver's cursor API is the
next()
function, which gets the next document that matches the query. The mongoose 4.5 cursor
API wraps the underlying driver's cursor API and gives you a next()
function
that you can use to iterate through each document:
const cursor = Customer.find({ name: 'Axl' }).cursor();
// Print the first document. Can also use callbacks
cursor.next.then(doc => { console.log(doc); });
Iterating over documents one-at-a-time using next()
is cumbersome if
you're using callbacks and promise chaining. It's doable:
next(cursor.next);
function next(promise) {
promise.then(doc => {
if (doc) {
console.log(doc);
next(cursor.next());
}
})
}
However, if you prefer to use streams, cursors in mongoose 4.5 are streams3-compatible streams:
cursor.on('data', function(doc) {
console.log(doc);
});
New Alternatives to Streams
Streams are not necessarily the perfect way to model pulling documents one-at-a-time from MongoDB. A readable stream is a push-based concurrency primitive: by default, it will keep spitting documents at you as fast as it possibly can. However, MongoDB cursors are inherently pull-based. In other words, to get more data from MongoDB, you have to explicitly ask the server for more data. Mongoose cursors introduce a couple new ways to process documents one at a time.
Since the next()
function returns a promise, you can use co to iterate through the cursor using a
for
loop:
co(function*() {
const cursor = User.find({ name: 'Axl' }).populate('band').cursor();
for (let doc = yield cursor.next(); doc != null; doc = yield cursor.next()) {
// Print the user, with the `band` field populated
console.log(doc);
}
});
If you're not using co or a similar concurrency framework, I would highly recommend you start doing so. Co makes complex async tasks like iterating through a cursor as simple as writing a for loop. If you're uncomfortable with co, check out my ebook, The 80/20 Guide to ES2015 Generators. It'll help you master co by writing your own co from scratch.
If you're not using co, mongoose 4.5 cursors also have an eachAsync()
function akin to RethinkDB's. The
point of eachAsync()
is to make it easy to wait for an async operation
to complete before processing the next document. For example, let's say you
wanted to read documents from your mongoose collection and send them over
the network one at a time:
cursor.on('data', function(doc) {
superagent.post('/saveDoc', doc).exec(function() {
console.log('Saved', doc._id);
});
});
Looks simple enough, but what if your documents are huge and the network
is really slow? The stream will keep reading data from the cursor and you'll
eventually run out of memory. The ideal solution would only load the next
document once the POST request has completed. Streams have a pause() method,
but it buffers internally, so you'll run out of memory anyway.
The eachAsync()
function is the way to go:
cursor.eachAsync(doc => superagent.post('/saveDoc', doc)).
then(() => console.log('done!'));
The eachAsync()
function takes a function fn
that gets executed for
every document in the cursor. If fn
returns a promise, eachAsync()
will
wait until the promise resolves before pulling the next document from the
collection. In the above case, fn
returns a superagent HTTP request, which has
a .then()
function. If fn
returns something that isn't a promise, eachAsync()
will pull the next document immediately.
Conclusion
The cursor()
function is the future of streaming in mongoose. The current
stream()
function is now formally deprecated and will be removed in 5.0.
In the majority of cases, cursor()
is a drop-in replacement for stream()
.
However, cursor()
maps more naturally to how MongoDB drivers work
under the hood, and has cool new features that let you process query results
one-at-a-time.