一尘不染

使用Node.js实时读取文件

node.js

我需要找到一种最佳方法,即使用node.js实时读取正在写入文件的数据。麻烦的是,Node是一艘飞速前进的船,这使得很难找到解决问题的最佳方法。

我要做什么我
有一个Java进程正在执行某项操作,然后将其执行的结果写入文本文件。运行通常需要5分钟到5个小时的时间,整个时间都会写入数据,并且可以达到相当高的吞吐率(大约1000行/秒)。

我想实时读取此文件,然后使用节点聚合数据并将其写入套接字,以便在客户端上以图形方式显示。

客户端,图形,套接字和聚合逻辑都已完成,但是我对读取文件的最佳方法感到困惑。

我尝试过(或至少尝试过)的内容
FIFO
-我可以告诉我的Java进程写入fifo并使用节点读取此内容,实际上这是我们目前使用Perl时遇到的问题,但是因为其他所有操作都在节点中运行移植代码很有意义。

Unix Sockets - 如上。

fs.watchFile -可以满足我们的需求吗?

fs.createReadStream -这比watchFile好吗?

fstail -f-好像是hack。

实际上,这是我
要使用Unix套接字的问题,这似乎是最快的选择。但是,节点是否具有更好的内置功能可实时从fs读取文件?


阅读 409

收藏
2020-07-07

共1个答案

一尘不染

如果您希望将文件作为数据的永久存储,以防止系统崩溃或正在运行的进程网络中的某个成员死亡,则流丢失,您仍然可以继续写入文件并读取从中。

如果您不需要此文件作为Java流程产生的结果的持久存储,那么使用Unix套接字在简便性和性能上要好得多。

fs.watchFile() 并不是您所需要的,因为它在文件系统报告文件时就可以处理文件统计信息,并且由于您要读取已写入的文件,因此这不是您想要的。

简短更新:
很遗憾地意识到,尽管我fs.watchFile()在上一段中指责过使用文件统计信息,但我自己在下面的示例代码中做了同样的事情!尽管我已经警告过读者“保重!”
因为我在短短几分钟内就写了它,甚至还没有很好的测试;仍然可以通过使用fs.watch()代替,watchFile或者fstatSync如果基础系统支持它,则可以做得更好。

为了读取/写入文件,我在下面写了些有趣的文章:

test-fs-writer.js :[由于在Java进程中写入文件,因此您将不需要此文件]

var fs = require('fs'),
    lineno=0;

var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'});

stream.on('open', function() {
    console.log('Stream opened, will start writing in 2 secs');
    setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000);
});

test-fs-reader.js :[请注意,这只是演示,请检查err对象!]

var fs = require('fs'),
    bite_size = 256,
    readbytes = 0,
    file;

fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); });

function readsome() {
    var stats = fs.fstatSync(file); // yes sometimes async does not make sense!
    if(stats.size<readbytes+1) {
        console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!');
        setTimeout(readsome, 3000);
    }
    else {
        fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome);
    }
}

function processsome(err, bytecount, buff) {
    console.log('Read', bytecount, 'and will process it now.');

    // Here we will process our incoming data:
        // Do whatever you need. Just be careful about not using beyond the bytecount in buff.
        console.log(buff.toString('utf-8', 0, bytecount));

    // So we continue reading from where we left:
    readbytes+=bytecount;
    process.nextTick(readsome);
}

您可以放心地避免使用nextTickreadsome()而是直接致电。由于我们仍在此处进行同步,因此从任何意义上讲都是没有必要的。我喜欢 :p

以上面的示例为例,但将其扩展为读取CSV数据可得到:

var lastLineFeed,
    lineArray;
function processsome(err, bytecount, buff) {
    lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n');

    if(lastLineFeed > -1){

        // Split the buffer by line
        lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n');

        // Then split each line by comma
        for(i=0;i<lineArray.length;i++){
            // Add read rows to an array for use elsewhere
            valueArray.push(lineArray[i].split(','));
        }

        // Set a new position to read from
        readbytes+=lastLineFeed+1;
    } else {
        // No complete lines were read
        readbytes+=bytecount;
    }
    process.nextTick(readFile);
}
2020-07-07