一尘不染

在Elasticsearch中插入多个文档-批量文档格式化程序

elasticsearch

TLDR;如何批量格式化JSON文件以提取到Elasticsearch?

我试图将一些NOAA数据吸收到Elasticsearch中,并一直在使用 NOAA Python
SDK

我编写了以下Python脚本来加载数据并以JSON格式存储。

from noaa_sdk import noaa
import json

n = noaa.NOAA()
alerts = n.alerts()
f = open('nhc_alerts.json', 'w')
json.dump(alerts, f)
f.write('\n')

JSON输出:

{"@context": ["https://raw.githubusercontent.com/geojson/geojson-ld/master/contexts/geojson-base.jsonld", {"wx": "https://api.weather.gov/ontology#", "@vocab": "https://api.weather.gov/ontology#"}], "type": "FeatureCollection", "features": [{"id": "https://api.weather.gov/alerts/NWS-IDP-PROD-KEEPALIVE-5246", "type": "Feature", "geometry": null, "properties": {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-KEEPALIVE-5246", "@type": "wx:Alert", "id": "NWS-IDP-PROD-KEEPALIVE-5246", "areaDesc": "Montgomery", "geocode": {"UGC": ["MDC031"], "SAME": ["024031"]}, "affectedZones": ["https://api.weather.gov/zones/county/MDC031"], "references": [], "sent": "2020-04-25T19:21:03+00:00", "effective": "2020-04-25T19:21:03+00:00", "onset": null, "expires": "2020-04-25T19:31:03+00:00", "ends": null, "status": "Test", "messageType": "Alert", "category": "Met", "severity": "Unknown", "certainty": "Unknown", "urgency": "Unknown", "event": "Test Message", "sender": "w-nws.webmaster@noaa.gov", "senderName": "NWS", "headline": null, "description": "Monitoring message only. Please disregard.", "instruction": "Monitoring message only. Please disregard.", "response": "None", "parameters": {"PIL": ["NWSKEPWBC"], "BLOCKCHANNEL": ["CMAS", "EAS", "NWEM"]}}}, {"id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179499-3536427", "type": "Feature", "geometry": null, "properties": {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179499-3536427", "@type": "wx:Alert", "id": "NWS-IDP-PROD-4179499-3536427", "areaDesc": "La Salle; Livingston", "geocode": {"UGC": ["ILZ019", "ILZ032"], "SAME": ["017099", "017105"]}, "affectedZones": ["https://api.weather.gov/zones/forecast/ILZ019", "https://api.weather.gov/zones/forecast/ILZ032"], "references": [{"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179245-3536278", "identifier": "NWS-IDP-PROD-4179245-3536278", "sender": "w-nws.webmaster@noaa.gov", "sent": "2020-04-25T10:02:00-05:00"}, {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4178935-3536074", "identifier": "NWS-IDP-PROD-4178935-3536074", "sender": "w-nws.webmaster@noaa.gov", "sent": "2020-04-25T03:09:00-05:00"}], "sent": "2020-04-25T14:21:00-05:00", "effective": "2020-04-25T14:21:00-05:00", "onset": "2020-04-25T14:21:00-05:00", "expires": "2020-04-25T22:30:00-05:00", "ends": "2020-04-26T01:00:00-05:00", "status": "Actual", "messageType": "Update", "category": "Met", "severity": "Severe", "certainty": "Possible", "urgency": "Future", "event": "Flood Watch", "sender": "w-nws.webmaster@noaa.gov", "senderName": "NWS Chicago IL", "headline": "Flood Watch issued April 25 at 2:21PM CDT until April 26 at 1:00AM CDT by NWS Chicago IL", "description": "The Flood Watch is now in effect for\n\n* Livingston and La Salle counties in north central Illinois\n\n* Until 1 AM CDT Sunday\n\n* WHAT...Steady rain. One to two inches of rain has already\nfallen. Additional rainfall amounts of one inch or locally more\nare possible which may lead to total rainfall amounts in excess\nof three inches.\n\n* IMPACTS...Rises in rivers and small streams will occur with\nflooding possible. This especially includes the Vermilion River\nand its tributary streams, and the Illinois River. Roadways,\nviaducts, ditches, agricultural land, and other poor drainage\nareas may become flooded.", "instruction": "A Flood Watch means there is a potential for flooding based on\ncurrent forecasts.\n\nYou should monitor later forecasts and be alert for possible\nFlood Warnings. Those living in areas prone to flooding should be\nprepared to take action should flooding develop.", "response": "Prepare", "parameters": {"NWSheadline": ["FLOOD WATCH NOW IN EFFECT UNTIL 1 AM CDT SUNDAY"], "VTEC": ["/O.EXT.KLOT.FA.A.0002.000000T0000Z-200426T0600Z/"], "EAS-ORG": ["WXR"], "PIL": ["LOTFFALOT"], "BLOCKCHANNEL": ["CMAS", "EAS", "NWEM"], "eventEndingTime": ["2020-04-26T01:00:00-05:00"]}}}, {"id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179497-3536425", "type": "Feature", "geometry": null, "properties": {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179497-3536425", "@type": "wx:Alert", "id": "NWS-IDP-PROD-4179497-3536425", "areaDesc": "San Luis Obispo County Central Coast; Santa Barbara County Central Coast; Santa Ynez Valley", "geocode": {"UGC": ["CAZ034", "CAZ035", "CAZ036"], "SAME": ["006079", "006083"]}, "affectedZones": ["https://api.weather.gov/zones/forecast/CAZ034", "https://api.weather.gov/zones/forecast/CAZ035", "https://api.weather.gov/zones/forecast/CAZ036"], "references": [{"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4177692-3535278", "identifier": "NWS-IDP-PROD-4177692-3535278", "sender": "w-nws.webmaster@noaa.gov", "sent": "2020-04-24T08:54:00-07:00"}, {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4178774-3535999", "identifier": "NWS-IDP-PROD-4178774-3535999", "sender": "w-nws.webmaster@noaa.gov", "sent": "2020-04-24T21:37:00-07:00"}, {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179040-3536147", "identifier": "NWS-IDP-PROD-4179040-3536147", "sender": "w-nws.webmaster@noaa.gov", "sent":

这个脚本解决了我遇到的一些格式化问题,我的下一个障碍是尝试对其进行格式化,以便可以在elasticsearch中利用批量导入功能。我偶然发现了一个在一定程度上可行的答案,我遇到的问题是它将插入适当的Index字符串,但它在每个字符之后都会执行。

批量转换脚本:

import json


JSON_FILE_IN = "nhc_alerts.json"
JSON_FILE_OUT = "nhc_bulk.json"


out = open(JSON_FILE_OUT, 'w')
with open(JSON_FILE_IN, 'r') as json_in:
    docs = json.dumps(json_in.read())
    for doc in docs:
        out.write('%s\n' % json.dumps({'index': {}}));
        out.write('%s\n' % json.dumps(doc, indent=0).replace('\n', ''))

批量脚本的输出:

{"index": {}}
"\""
{"index": {}}
"{"
{"index": {}}
"\\"
{"index": {}}
"\""
{"index": {}}
"@"
{"index": {}}
"c"
{"index": {}}
"o"
{"index": {}}
"n"
{"index": {}}
"t"
{"index": {}}
"e"
{"index": {}}
"x"
{"index": {}}
"t"
{"index": {}}
"\\"
{"index": {}}
"\""
{"index": {}}
":"
{"index": {}}
" "
{"index": {}}
"["
{"index": {}}
"\\"
{"index": {}}
"\""
{"index": {}}
"h"
{"index": {}}
"t"
{"index": {}}
"t"
{"index": {}}
"p"
{"index": {}}
"s"
{"index": {}}
":"
{"index": {}}
"/"
{"index": {}}
"/"
{"index": {}}
"r"
{"index": {}}
"a"
{"index": {}}
"w"
{"index": {}}
"."
{"index": {}}
"g"
{"index": {}}
"i"
{"index": {}}
"t"
{"index": {}}
"h"
{"index": {}}
"u"
{"index": {}}
"b"
{"index": {}}
"u"
{"index": {}}
"s"
{"index": {}}
"e"
{"index": {}}
"r"
{"index": {}}
"c"
{"index": {}}
"o"
{"index": {}}
"n"
{"index": {}}

理想情况下,我想将这两个脚本合并为一个,但在这一点上,如果能够完成工作,我将运行两个单独的脚本。


阅读 257

收藏
2020-06-22

共1个答案

一尘不染

如何利用bulk官方python客户端的方法?

import json

from noaa_sdk import noaa
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk


noaa_client = noaa.NOAA()
alerts = noaa_client.alerts()['features']

es = Elasticsearch()


def save_alerts():
    with open('nhc_alerts.json', 'w') as f:
        f.write(json.dumps(alerts))


def bulk_sync():
    actions = [
        {
            "_index": "my_noaa_index",
            "_source": alert
        } for alert in alerts
    ]

    bulk(es, actions)


save_alerts()
bulk_sync()
2020-06-22