node.js - Highland.js for CSV parsing -
i'm trying write functional manner. we're using highland.js managing stream processing, because i'm new think i'm getting confused how can deal unique situation.
the issue here data in file stream not consistent. first line in file typically header, want store memory , zip rows in stream afterwards.
here's first go @ it:
var _ = require('highland'); var fs = require('fs'); var stream = fs.createreadstream('./data/gigfile.txt'); var output = fs.createwritestream('output.txt'); var headers = []; var through = _.pipeline( _.split(), _.head(), _.doto(function(col) { headers = col.split(','); return headers; }), ...... _.splitby(','), _.zip(headers), _.wrapcallback(process) ); _(stream) .pipe(through) .pipe(output);
the first command in pipeline split files lines. next grabs header , doto declares global variable. problem next few lines in stream don't exist , process blocked...likely because head() command above it.
i've tried few other variations feel example give sense of need go it.
any guidance on helpful -- brings question of if have different values in each of rows how can splinter process stream amongst number of different stream operations of variable length/complexity.
thanks.
edit: i've produced better result i'm questioning efficiency of -- there way can optimize on every run i'm not checking if headers recorded? still feels sloppy.
var through = _.pipeline( _.split(), _.filter(function(row) { // filter out bogus values if (! row || headers) { return true; } headers = row.split(','); return false; }), _.map(function(row) { return row.split(',') }), _.batch(500), _.compact(), _.map(function(row) { return json.stringify(row) + "\n"; }) ); _(stream) .pipe(through)
you can use stream.observe()
or stream.fork()
split stream.
var _ = require('highland'); var fs = require('fs'); var stream = fs.createreadstream('./data/gigfile.txt'); var output = fs.createwritestream('output.txt'); var through = highland.pipeline(function(s) { var headerstream, headers; // setup shared variable store headers headers = []; // setup csv processing s = s // split input lines .split() // remove empty lines .compact() // split lines arrays .map(function(row) { return row.split(','); }); // create new stream grab header headerstream = s.observe(); // pause original stream s.pause(); // setup processing of non-header rows s = s // drop header row .drop(1) // convert rest of rows objects .map(function(row) { var obj = headers.reduce(function(obj, key, i) { obj[key] = row[i]; return obj; }, {}); return json.stringify(obj) + "\n"; }); // grab first row header stream // save headers , resume normal stream headerstream.head().toarray(function(rows) { headers = rows[0]; s.resume(); }); return s; }); _(stream) .pipe(through) .pipe(output);
that being said, csv parsing doesn't account escaping newlines , commas within values. typically, done in csv files wrapping values in double quotes. , double quotes escaped putting 2 next each other. it's bit tricky right, recommend using package handles such fast-csv.
then code this:
var _ = require('highland'); var fs = require('fs'); var csv = require('fast-csv'); var stream = fs.createreadstream('./data/gigfile.txt'); var output = fs.createwritestream('output.txt'); _(stream.pipe(csv({headers: true, ignoreempty: true}))) .map(function(row) { return json.stringify(row) + "\n"; }) .pipe(output);
Comments
Post a Comment