在将 Streaming CloudWatch Logs Data to Amazon Elasticsearch Service, 将云监视日志流传输到具有一个日志组和 一个Lambda功能的ELK可以正常工作。
但是现在我想为其他日志组更改目标lambda函数,但 由于在AWS控制台中没有选项,因此无法执行此操作。在此处输入图片说明
任何帮助将不胜感激。
谢谢
我使用AWS控制台选项(开始流式传输 到Amazon Elasticsearch Service)流式传输到ELK ,但是我无法更改或选择其他 lambda函数,因为 使用此选项只能为任何日志组选择lambda函数。
因此,我创建了新的lambda函数并将流目标设置为AWS lambda函数,
这是您所需的全部代码,lambda函数的Node版本为4. *,因为 新版本存在一些问题,但最重要的是它 不需要任何额外的NPM软件包。 require any extra NPM packages.
// v1.1.2 var https = require('https'); var zlib = require('zlib'); var crypto = require('crypto'); var endpoint = 'search-my-test.us-west-2.es.amazonaws.com'; exports.handler = function(input, context) { // decode input from base64 var zippedInput = new Buffer(input.awslogs.data, 'base64'); // decompress the input zlib.gunzip(zippedInput, function(error, buffer) { if (error) { context.fail(error); return; } // parse the input from JSON var awslogsData = JSON.parse(buffer.toString('utf8')); // transform the input to Elasticsearch documents var elasticsearchBulkData = transform(awslogsData); // skip control messages if (!elasticsearchBulkData) { console.log('Received a control message'); context.succeed('Control message handled successfully'); return; } // post documents to the Amazon Elasticsearch Service post(elasticsearchBulkData, function(error, success, statusCode, failedItems) { console.log('Response: ' + JSON.stringify({ "statusCode": statusCode })); if (error) { console.log('Error: ' + JSON.stringify(error, null, 2)); if (failedItems && failedItems.length > 0) { console.log("Failed Items: " + JSON.stringify(failedItems, null, 2)); } context.fail(JSON.stringify(error)); } else { console.log('Success: ' + JSON.stringify(success)); context.succeed('Success'); } }); }); }; function transform(payload) { if (payload.messageType === 'CONTROL_MESSAGE') { return null; } var bulkRequestBody = ''; payload.logEvents.forEach(function(logEvent) { var timestamp = new Date(1 * logEvent.timestamp); // index name format: cwl-YYYY.MM.DD var indexName = [ 'prod-background-wo-' + timestamp.getUTCFullYear(), // year ('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month ('0' + timestamp.getUTCDate()).slice(-2) // day ].join('.'); var source = buildSource(logEvent.message, logEvent.extractedFields); source['response_time'] = source["end"] - source["start"]; source['@id'] = logEvent.id; source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString(); source['@message'] = logEvent.message; source['@owner'] = payload.owner; source['@log_group'] = payload.logGroup; source['@log_stream'] = payload.logStream; var action = { "index": {} }; action.index._index = indexName; action.index._type = payload.logGroup; action.index._id = logEvent.id; bulkRequestBody += [ JSON.stringify(action), JSON.stringify(source), ].join('\n') + '\n'; }); return bulkRequestBody; } function buildSource(message, extractedFields) { if (extractedFields) { var source = {}; for (var key in extractedFields) { if (extractedFields.hasOwnProperty(key) && extractedFields[key]) { var value = extractedFields[key]; if (isNumeric(value)) { source[key] = 1 * value; continue; } jsonSubString = extractJson(value); if (jsonSubString !== null) { source['$' + key] = JSON.parse(jsonSubString); } source[key] = value; } } return source; } jsonSubString = extractJson(message); if (jsonSubString !== null) { return JSON.parse(jsonSubString); } return {}; } function extractJson(message) { var jsonStart = message.indexOf('{'); if (jsonStart < 0) return null; var jsonSubString = message.substring(jsonStart); return isValidJson(jsonSubString) ? jsonSubString : null; } function isValidJson(message) { try { JSON.parse(message); } catch (e) { return false; } return true; } function isNumeric(n) { return !isNaN(parseFloat(n)) && isFinite(n); } function post(body, callback) { var requestParams = buildRequest(endpoint, body); var request = https.request(requestParams, function(response) { var responseBody = ''; response.on('data', function(chunk) { responseBody += chunk; }); response.on('end', function() { var info = JSON.parse(responseBody); var failedItems; var success; if (response.statusCode >= 200 && response.statusCode < 299) { failedItems = info.items.filter(function(x) { return x.index.status >= 300; }); success = { "attemptedItems": info.items.length, "successfulItems": info.items.length - failedItems.length, "failedItems": failedItems.length }; } var error = response.statusCode !== 200 || info.errors === true ? { "statusCode": response.statusCode, "responseBody": responseBody } : null; callback(error, success, response.statusCode, failedItems); }); }).on('error', function(e) { callback(e); }); request.end(requestParams.body); } function buildRequest(endpoint, body) { var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/); var region = endpointParts[2]; var service = endpointParts[3]; var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, ''); var date = datetime.substr(0, 8); var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date); var kRegion = hmac(kDate, region); var kService = hmac(kRegion, service); var kSigning = hmac(kService, 'aws4_request'); var request = { host: endpoint, method: 'POST', path: '/_bulk', body: body, headers: { 'Content-Type': 'application/json', 'Host': endpoint, 'Content-Length': Buffer.byteLength(body), 'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN, 'X-Amz-Date': datetime } }; var canonicalHeaders = Object.keys(request.headers) .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; }) .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; }) .join('\n'); var signedHeaders = Object.keys(request.headers) .map(function(k) { return k.toLowerCase(); }) .sort() .join(';'); var canonicalString = [ request.method, request.path, '', canonicalHeaders, '', signedHeaders, hash(request.body, 'hex'), ].join('\n'); var credentialString = [ date, region, service, 'aws4_request' ].join('/'); var stringToSign = [ 'AWS4-HMAC-SHA256', datetime, credentialString, hash(canonicalString, 'hex') ] .join('\n'); request.headers.Authorization = [ 'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString, 'SignedHeaders=' + signedHeaders, 'Signature=' + hmac(kSigning, stringToSign, 'hex') ].join(', '); return request; } function hmac(key, str, encoding) { return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding); } function hash(str, encoding) { return crypto.createHash('sha256').update(str, 'utf8').digest(encoding); }