Introducing concat-stream

In notebook:
FrontEndMasters Networking and Streams
Created at:
2017-09-24
Updated:
2017-11-08
Tags:
backend Node JS JavaScript libraries

Another userland module

concat-stream

npm install concat-stream

Note, that this goes against the idea of stream and will fit the contents in memory. So this is a compromise but sometimes necessary.

concat-stream buffers up all the data in the stream:

var concat = require('concat-stream')
process.stdin.pipe(concat(function (body) {
    console.log(body.length)
}))

concat always takes a function. This is not very memory efficient. The above example will only return the result when you stop the stream (ctrl+D for the stdin), then print out the length.

You can only write to a concat-stream. You can't read from a concat-stream.

Keep in mind that all the data will be in memory.

More useful examples of concat-stream

You have an http server and for example you need to parse url encoded data and the parser accepts only bigger chunks of data.

For example, we want to only let a certain amount of bytes pass through:

  var concat = require('concat-stream')
var http = require('http')
var qs = require('querystring')

var server = http.createServer(function (req, res) {
  req.pipe(concat({ encoding: 'string' }, function (body) {
      var params = qs.parse(body)
      console.log(params)
      res.end('ok\n')
  }))
})
server.listen(5000)

the req above is actually a readable stream. We can pass { encoding: 'string' } above.

This would work, but very easy to take down the server, if the request was too big (a 50gb file...)

So lets add a through stream to make sure, we don't get too much data. We set a limit of 20 bytes.

  var concat = require('concat-stream')
var http = require('http')
var qs = require('querystring')

var server = http.createServer(function (req, res) {
  // **** 1. first just clean up the code  ↴
  req.pipe(concat({ encoding: 'string' }, onbody))
  
  function onbody (body) {
      var params = qs.parse(body)
      console.log(params)
      res.end('ok\n')
  }
})
server.listen(5000)

Now, we add a counter function to count the number of bytes we received.

  var concat = require('concat-stream')
// **** 2. require through  ↴
var through = require('through')
var http = require('http')
var qs = require('querystring')

var server = http.createServer(function (req, res) {
  // **** 1. then add a new step to the pipeline  ↴
  req
  // **** 3. add the counter function  ↴
  .pipe(through(counter))
  .pipe(concat({ encoding: 'string' }, onbody))
  
  // **** 4. write the counter function  ↴
  function counter () {
    var size = 0 // ☛  need to save the number of bytes received so far
    return through(function(buf, enc, next) {
      size += buf.length
      if (size > 20) {
        next(null, null) // ☛ this will end the stream
      } else {
        next(null, buf)
      }
    }) 
  }
  
  function onbody (body) {
      var params = qs.parse(body)
      console.log(params)
      res.end('ok\n')
  }
})
server.listen(5000)

Now if your request is over 20 bytes, the server will still reply ok but will not parse the data you send it.