我正在使用Node.js运行服务器,并且需要从正在运行的另一台服务器(localhost:3001)请求数据。我需要向数据服务器发出许多请求(〜200)并收集数据(响应大小从〜20Kb到〜20Mb不等)。每个请求都是独立的,我想将响应保存为以下形式的一个巨大数组:
localhost:3001
[{"urlAAA": responseAAA}, {"urlCCC": responseCCC}, {"urlBBB": responseBBB}, etc ]
请注意,项目的顺序并不重要,理想情况下,它们应该以数据可用的顺序填充数组。
var express = require('express'); var router = express.Router(); var async = require("async"); var papa = require("papaparse"); var sync_request = require('sync-request'); var request = require("request"); var pinnacle_data = {}; var lookup_list = []; for (var i = 0; i < 20; i++) { lookup_list.push(i); } function write_delayed_files(object, key, value) { object[key] = value; return; } var show_file = function (file_number) { var file_index = Math.round(Math.random() * 495) + 1; var pinnacle_file_index = 'http://localhost:3001/generate?file=' + file_index.toString(); var response_json = sync_request('GET', pinnacle_file_index); var pinnacle_json = JSON.parse(response_json.getBody('utf8')); var object_key = "file_" + file_number.toString(); pinnacle_data[object_key] = pinnacle_json; console.log("We've handled file: " + file_number); return; }; async.each(lookup_list, show_file, function (err) {}); console.log(pinnacle_data); /* GET contact us page. */ router.get('/', function (req, res, next) { res.render('predictionsWtaLinks', {title: 'Async Trial'}); }); module.exports = router;
现在,当运行该程序时,它将显示:
We've handled file: 0 We've handled file: 1 We've handled file: 2 We've handled file: 3 We've handled file: 4 We've handled file: 5 etc
现在,由于文件的大小如此可变,我期望它可以“并行”执行请求,但是似乎按顺序执行它们,这是我试图通过使用避免的async.each()。当前,连接到数据服务器大约需要1-2秒,因此要对许多文件执行此操作会花费很长时间。
async.each()
我意识到我正在使用同步请求,因此希望替换为:
var response_json = sync_request('GET', pinnacle_file_index);
与类似
request(pinnacle_file_index, function (error, response, body) { if (!error && response.statusCode == 200) { pinnacle_data[object_key] = JSON.parse(body); } });
任何帮助将非常感激。
另外,我还尝试过尝试:
async.parallel(function_list, function (err, results) { //add results to pinnacle_data[]});
现在,以下代码完成了该任务(每个请求大约需要80毫秒,包括必须使用进行重复请求npmrequestretry)。同样,这很好地扩展,平均请求时间约为80毫秒,以使总共5个请求之间达到1000个请求。
npmrequestretry
var performance = require("performance-now"); var time_start = performance(); var async = require("async"); var request_retry = require('requestretry'); var lookup_list = []; var total_requests = 50; for (var i = 0; i < total_requests; i++) { lookup_list.push(i); } var pinnacle_data = {}; async.map(lookup_list, function (item, callback) { var file_index = Math.round(Math.random() * 495) + 1; var pinnacle_file_index = 'http://localhost:3001/generate?file=' + file_index; request_retry({ url: pinnacle_file_index, maxAttempts: 20, retryDelay: 20, retryStrategy: request_retry.RetryStrategies.HTTPOrNetworkError }, function (error, response, body) { if (!error && response.statusCode == 200) { body = JSON.parse(body); var data_array = {}; data_array[file_index.toString()] = body; callback(null, data_array); } else { console.log(error); callback(error || response.statusCode); } }); }, function (err, results) { var time_finish = performance(); console.log("It took " + (time_finish - time_start).toFixed(3) + "ms to complete " + total_requests + " requests."); console.log("This gives an average rate of " + ((time_finish - time_start) / total_requests).toFixed(3) + " ms/request"); if (!err) { for (var i = 0; i < results.length; i++) { for (key in results[i]) { pinnacle_data[key] = results[i][key]; } } var length_array = Object.keys(pinnacle_data).length.toString(); console.log("We've got all the data, totalling " + length_array + " unique entries."); } else { console.log("We had an error somewhere."); } });
谢谢您的帮助。
您已经发现,async.parallel()只能并行化本身是异步的操作。如果操作是同步的,则由于node.js具有单线程特性,因此这些操作将一个接一个地运行,而不是并行运行。但是,如果操作本身是异步的,则async.parallel()(或其他异步方法)将立即启动所有操作并为您协调结果。
async.parallel()
这是使用的一般思路async.map()。我async.map()之所以使用它,是因为这样的想法是,它将一个数组作为输入并以与原始数组相同的顺序生成一个结果数组,但是并行运行所有请求,这似乎与您想要的内容一致:
async.map()
var async = require("async"); var request = require("request"); // create list of URLs var lookup_list = []; for (var i = 0; i < 20; i++) { var index = Math.round(Math.random() * 495) + 1; var url = 'http://localhost:3001/generate?file=' + index; lookup_list.push(url); } async.map(lookup_list, function(url, callback) { // iterator function request(url, function (error, response, body) { if (!error && response.statusCode == 200) { var body = JSON.parse(body); // do any further processing of the data here callback(null, body); } else { callback(error || response.statusCode); } }); }, function(err, results) { // completion function if (!err) { // process all results in the array here console.log(results); for (var i = 0; i < results.length; i++) { // do something with results[i] } } else { // handle error here } });
而且,这是一个使用Bluebird Promise的版本,并且有点类似地Promise.map()用于迭代初始数组:
Promise.map()
var Promise = require("bluebird"); var request = Promise.promisifyAll(require("request"), {multiArgs: true}); // create list of URLs var lookup_list = []; for (var i = 0; i < 20; i++) { var index = Math.round(Math.random() * 495) + 1; var url = 'http://localhost:3001/generate?file=' + index; lookup_list.push(url); } Promise.map(lookup_list, function(url) { return request.getAsync(url).spread(function(response, body) { if response.statusCode !== 200) { throw response.statusCode; } return JSON.parse(body); }); }).then(function(results) { console.log(results); for (var i = 0; i < results.length; i++) { // process results[i] here } }, function(err) { // process error here });