BaseRollingFileStream.js 3.54 KB
"use strict";
var fs = require('fs')
, zlib = require('zlib')
, debug = require('debug')('streamroller:BaseRollingFileStream')
, mkdirp = require('mkdirp')
, path = require('path')
, util = require('util')
, stream = require('readable-stream');

module.exports = BaseRollingFileStream;

function BaseRollingFileStream(filename, options) {
  debug("In BaseRollingFileStream");
  this.filename = filename;
  this.options = options || {};
  this.options.encoding = this.options.encoding || 'utf8';
  this.options.mode = this.options.mode || parseInt('0644', 8);
  this.options.flags = this.options.flags || 'a';

  this.currentSize = 0;

  function currentFileSize(file) {
    var fileSize = 0;
    try {
      fileSize = fs.statSync(file).size;
    } catch (e) {
      // file does not exist
    }
    return fileSize;
  }

  function throwErrorIfArgumentsAreNotValid() {
    if (!filename) {
      throw new Error("You must specify a filename");
    }
  }

  throwErrorIfArgumentsAreNotValid();
  debug("Calling BaseRollingFileStream.super");
  BaseRollingFileStream.super_.call(this);
  this.openTheStream();
  this.currentSize = currentFileSize(this.filename);
}
util.inherits(BaseRollingFileStream, stream.Writable);

BaseRollingFileStream.prototype._writeTheChunk = function(chunk, encoding, callback) {
  debug("writing the chunk to the underlying stream");
  this.currentSize += chunk.length;
  try {
    if (!this.theStream.write(chunk,encoding)) {
      debug('waiting for drain event');
      this.theStream.once('drain',callback);
    } else {
      process.nextTick(callback);
    }
    debug("chunk written");
  } catch (err) {
    debug(err);
    if (callback) {
      callback(err);
    }
  }
};

BaseRollingFileStream.prototype._write = function(chunk, encoding, callback) {
  debug("in _write");

  if (this.shouldRoll()) {
    this.currentSize = 0;
    this.roll(this.filename, this._writeTheChunk.bind(this, chunk, encoding, callback));
  } else {
    this._writeTheChunk(chunk, encoding, callback);
  }
};

BaseRollingFileStream.prototype.openTheStream = function(cb) {
  debug("opening the underlying stream");
  var that = this;
  mkdirp.sync(path.dirname(this.filename));
  this.theStream = fs.createWriteStream(this.filename, this.options);
  this.theStream.on('error', function(err) {
    that.emit('error', err);
  });
  if (cb) {
    this.theStream.on("open", cb);
  }
};

BaseRollingFileStream.prototype.closeTheStream = function(cb) {
  debug("closing the underlying stream");
  this.theStream.end(cb);
};

BaseRollingFileStream.prototype.compress = function(filename, cb) {
    debug('Compressing ', filename, ' -> ', filename, '.gz');
    var gzip = zlib.createGzip();
    var inp = fs.createReadStream(filename);
    var out = fs.createWriteStream(filename+".gz");
    inp.pipe(gzip).pipe(out);

    out.on('finish', function(err) {
      debug('Removing original ', filename);
      fs.unlink(filename, cb);
    });
};

BaseRollingFileStream.prototype.shouldRoll = function() {
  return false; // default behaviour is never to roll
};

BaseRollingFileStream.prototype.roll = function(filename, callback) {
  callback(); // default behaviour is not to do anything
};

BaseRollingFileStream.prototype.end = function(chunk, encoding, callback) {
  var self = this;
  debug('end called - first close myself');
  stream.Writable.prototype.end.call(self, function() {
    debug('writable end callback, now close underlying stream');
    self.theStream.end(chunk, encoding, function(err) {
      debug('underlying stream closed');
      if (callback) {
        callback(err);
      }
    });
  });
};