Python elasticsearch.helpers 模块,bulk() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用elasticsearch.helpers.bulk()。
def index_bulk(self, docs, step=None):
index_name = self.ES_INDEX_NAME
doc_type = self.ES_INDEX_TYPE
step = step or self.step
def _get_bulk(doc):
doc.update({
"_index": index_name,
"_type": doc_type,
})
return doc
actions = (_get_bulk(doc) for doc in docs)
try:
return helpers.bulk(self.conn, actions, chunk_size=step)
except helpers.BulkIndexError as e:
# try again...
print("Bulk error, try again...")
return self.index_bulk(docs,step)
##return helpers.bulk(self.conn, actions, chunk_size=step)
except Exception as e:
print("Err...")
import pickle
pickle.dump(e,open("err","wb"))
def delete_docs(self, ids, step=None):
index_name = self.ES_INDEX_NAME
doc_type = self.ES_INDEX_TYPE
step = step or self.step
def _get_bulk(_id):
doc = {
'_op_type': 'delete',
"_index": index_name,
"_type": doc_type,
"_id": _id
}
return doc
actions = (_get_bulk(_id) for _id in ids)
return helpers.bulk(self.conn, actions, chunk_size=step,
stats_only=True, raise_on_error=False)
def handle(self, **options):
self._initialize(**options)
if (options['rebuild'] and
not options['dry_run'] and
self.es.indices.exists(self.INDEX_NAME)):
self.es.indices.delete(index=self.INDEX_NAME)
if (not options['dry_run'] and
not self.es.indices.exists(self.INDEX_NAME)):
self.es.indices.create(index=self.INDEX_NAME)
if self.is_local_tm:
self._set_latest_indexed_revision(**options)
helpers.bulk(self.es, self._parse_translations(**options))
def upload_data(events_df, es_write_index, es_write):
# Uploading info to the new ES
rows = events_df.to_dict("index")
docs = []
for row_index in rows.keys():
row = rows[row_index]
item_id = row[Events.PERCEVAL_UUID] + "_" + row[Git.FILE_PATH] +\
"_" + row[Git.FILE_EVENT]
header = {
"_index": es_write_index,
"_type": "item",
"_id": item_id,
"_source": row
}
docs.append(header)
helpers.bulk(es_write, docs)
logging.info("Written: " + str(len(docs)))
def upload_data(events_df, es_write_index, es_write, uniq_id):
# Uploading info to the new ES
test = events_df.to_dict("index")
docs = []
for i in test.keys():
header = {
"_index": es_write_index,
"_type": "item",
"_id": int(uniq_id),
"_source": test[i]
}
docs.append(header)
uniq_id = uniq_id + 1
print (len(docs))
helpers.bulk(es_write, docs)
items = []
return uniq_id
def upload_data(events_df, es_write_index, es_write, uniq_id):
# Uploading info to the new ES
test = events_df.to_dict("index")
docs = []
for i in test.keys():
header = {
"_index": es_write_index,
"_type": "item",
"_id": int(uniq_id),
"_source": test[i]
}
docs.append(header)
uniq_id = uniq_id + 1
print (len(docs))
helpers.bulk(es_write, docs)
items = []
return uniq_id
def update(typeNameES, listId):
logger.info('bulkOp.update launched')
hippoCfg = getHippoConf()
es = getES()
now = strftime("%Y%m%dT%H%M%S%z")
indexNameES = hippoCfg.get('elasticsearch', 'indexNameES')
# k is a generator expression that produces
# dict to update every doc wich id is in listId
k = ({'_op_type': 'update', '_index':indexNameES, '_type':typeNameES, 'doc':{'lastQuery': now}, '_id': id}
for id in listId)
res = helpers.bulk(es, k)
logger.info('bulkOp.update res: %s', res)
#res looks like
#(2650, [])
logger.info('bulkOp.update end')
return res[0]
def index(cfgPath, listData):
logger.info('bulkOp.index launched')
hippoCfg = getHippoConf()
indexNameES = hippoCfg.get('elasticsearch', 'indexNameES')
cfg = getConf(cfgPath)
typeNameES = cfg.get('elasticsearch', 'typeIntel')
#creating the index, only if does not exist
index = IndexIntel(cfgPath)
index.createIndexIntel()
es = getES()
k = ({'_op_type': 'index', '_index':indexNameES, '_type':typeNameES, '_source': data}
for data in listData)
res = helpers.bulk(es,k, raise_on_error=False)
#res = helpers.bulk(es,k, raise_on_exception=False)
#res = helpers.bulk(es,k)
logger.info('bulkOp.index res: %s', res)
logger.info('bulkOp.index end')
return res
def indexNew(coreIntelligence, listData):
logger.info('bulkOp.indexNew launched')
hippoCfg = getHippoConf()
indexNameES = hippoCfg.get('elasticsearch', 'indexNameES')
typeNameES = hippoCfg.get('elasticsearch', 'typeNameESNew')
indexNew = IndexNew()
indexNew.createIndexNew()
es = getES()
k = ({'_op_type': 'index', '_index':indexNameES, '_type':typeNameES, '_source': {'type': coreIntelligence, 'toSearch': data[coreIntelligence]}}
for data in listData)
#k.next() gives:
#{'_op_type': 'index', '_index':'hippocampe', '_type':'new', '_source': {'typeIntel': 'ip', 'intelligence': '1.1.1.1'}
res = helpers.bulk(es,k)
logger.info('bulkOp.index res: %s', res)
logger.info('bulkOp.indexNew end')
return res[0]
def genAddToES(self, msgs, component):
def menuAddToES(e):
progress = ProgressMonitor(component, "Feeding ElasticSearch", "", 0, len(msgs))
i = 0
docs = list()
for msg in msgs:
if not Burp_onlyResponses or msg.getResponse():
docs.append(self.genESDoc(msg, timeStampFromResponse=True).to_dict(True))
i += 1
progress.setProgress(i)
success, failed = bulk(self.es, docs, True, raise_on_error=False)
progress.close()
JOptionPane.showMessageDialog(self.panel, "<html><p style='width: 300px'>Successful imported %d messages, %d messages failed.</p></html>" % (success, failed), "Finished", JOptionPane.INFORMATION_MESSAGE)
return menuAddToES
### Interface to ElasticSearch ###
def update_commits(self, source_it, field='repos'):
""" Take the sha from each doc and use
it to reference the doc to update. This method only
support updating a single field for now. The default one
is repos because that's the only one to make sense in
this context.
"""
def gen(it):
for source in it:
d = {}
d['_index'] = self.index
d['_type'] = self.dbname
d['_op_type'] = 'update'
d['_id'] = source['sha']
d['_source'] = {'doc': {field: source[field]}}
yield d
bulk(self.es, gen(source_it))
self.es.indices.refresh(index=self.index)
def load_data(input_file, index, doc_type, seed):
doc_no = seed
successful = 0
docs = []
with open(input_file) as ifp:
for line in ifp:
doc_id = str(doc_no)
doc = csv2json(index, doc_type, doc_id, line.rstrip())
docs.append(doc)
doc_no += 1
if len(docs) == batch_size:
docs_iter = iter(docs)
(added, tmp) = helpers.bulk(es, docs_iter)
successful += added
docs = []
if doc_no % 100000 == 0:
print 'success: %d failed: %s' % (successful, doc_no - successful - seed)
if len(docs) > 0:
docs_iter = iter(docs)
(added, tmp) = helpers.bulk(es, docs_iter)
successful += added
print 'Finished! Inserted: %d Failed: %d' % (successful, doc_no - successful - seed)
def single_bulk_to_es(bulk, config, attempt_retry):
bulk = bulk_builder(bulk, config)
max_attempt = 1
if attempt_retry:
max_attempt += 3
for attempt in range(1, max_attempt+1):
try:
helpers.bulk(config['es_conn'], bulk, chunk_size=config['bulk_size'])
except Exception as e:
if attempt < max_attempt:
wait_seconds = attempt*3
log('warn', 'attempt [%s/%s] got exception, will retry after %s seconds' % (attempt,max_attempt,wait_seconds) )
time.sleep(wait_seconds)
continue
log('error', 'attempt [%s/%s] got exception, it is a permanent data loss, no retry any more' % (attempt,max_attempt) )
raise e
if attempt > 1:
log('info', 'attempt [%s/%s] succeed. we just get recovered from previous error' % (attempt,max_attempt) )
# completed succesfully
break
def add_docs_bulk(self, docs):
"""Adds a set of documents to the index in a bulk.
:param docs: dictionary {doc_id: doc}
"""
actions = []
for doc_id, doc in docs.items():
action = {
"_index": self.__index_name,
"_type": self.DOC_TYPE,
"_id": doc_id,
"_source": doc
}
actions.append(action)
if len(actions) > 0:
helpers.bulk(self.__es, actions)
def store(self, df, table, **kwargs):
if isinstance(df, pd.DataFrame):
es = self.open()
records = df.to_dict(orient='records')
if df.index.name:
actions = [{
"_index": self.datasource.db,
"_type": table,
"_id": record[df.index.name],
"_source": record
} for record in records]
else:
actions = [{
"_index": self.datasource.db,
"_type": table,
"_source": record
} for record in records]
if len(actions) > 0:
helpers.bulk(es, actions)
def BulkIndexRecords(self,records):
'''
Bulk Index Records
IN
self: EsHandler
records: a list of records to bulk index
'''
ELASTIC_LOGGER.debug('[starting] Indexing Bulk Records')
success_count,failed_items = es_bulk(
self.esh,
records,
chunk_size=10000,
raise_on_error=False
)
if len(failed_items) > 0:
ELASTIC_LOGGER.error('[PID {}] {} index errors'.format(
os.getpid(),len(failed_items)
))
for failed_item in failed_items:
ELASTIC_LOGGER.error(unicode(failed_item))
ELASTIC_LOGGER.debug('[finished] Indexing Bulk Records')
def send_buffered_operations(self):
"""Send buffered operations to Elasticsearch.
This method is periodically called by the AutoCommitThread.
"""
with self.lock:
try:
action_buffer = self.BulkBuffer.get_buffer()
if action_buffer:
successes, errors = bulk(self.elastic, action_buffer)
LOG.debug("Bulk request finished, successfully sent %d "
"operations", successes)
if errors:
LOG.error(
"Bulk request finished with errors: %r", errors)
except es_exceptions.ElasticsearchException:
LOG.exception("Bulk request failed with exception")
def __init__(self, docman):
# Parent object
self.docman = docman
# Action buffer for bulk indexing
self.action_buffer = []
# Docs to update
# Dict stores all documents for which firstly
# source has to be retrieved from Elasticsearch
# and then apply_update needs to be performed
# Format: [ (doc, update_spec, action_buffer_index, get_from_ES) ]
self.doc_to_update = []
# Below dictionary contains ids of documents
# which need to be retrieved from Elasticsearch
# It prevents from getting same document multiple times from ES
# Format: {"_index": {"_type": {"_id": True}}}
self.doc_to_get = {}
# Dictionary of sources
# Format: {"_index": {"_type": {"_id": {"_source": actual_source}}}}
self.sources = {}
def load_data(self):
"""
https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-suggesters-completion.html
"""
es = self.connect()
items = self._load_item_data()
helpers.bulk(es, items)
# id_field = "id"
# es_index = self.es_config["es_index"]
# es_type = self.es_config["es_type"]
# for item in items:
# item = item['_source']
# logging.info(json.dumps(item, ensure_ascii=False, indent=4))
#
# ret = es.index(index=es_index, doc_type=es_type, id=item[id_field], body=item)
# logging.info(ret)
def update(self, thing, refresh=None, action='index', **kwargs):
"""
Update each document in ES for a model, iterable of models or queryset
"""
if refresh is True or (
refresh is None and self._doc_type.auto_refresh
):
kwargs['refresh'] = True
if isinstance(thing, models.Model):
object_list = [thing]
else:
object_list = thing
return self.bulk(
self._get_actions(object_list, action), **kwargs
)
def bulk(chunk_size=100, filepath=None, **kwargs):
if sys.stdin.isatty() is False:
infile = sys.stdin
elif filepath is not None:
infile = open(filepath, "r")
else:
abort(bulk.__doc__)
es = get_client(env.elasticsearch_alias)
actions = []
for action in infile.readlines():
actions.append(json.loads(action))
success, errors = helpers.bulk(es, actions, ignore=IGNORE, **kwargs)
res = {
"success": success, "errors": errors,
"bulk": {
"host": es.transport.get_connection().host
}
}
infile.close()
jsonprint(res)
return res
def send_buffered_operations(self):
"""Send buffered operations to Elasticsearch.
This method is periodically called by the AutoCommitThread.
"""
with self.lock:
try:
action_buffer = self.BulkBuffer.get_buffer()
if action_buffer:
successes, errors = bulk(self.elastic, action_buffer)
LOG.debug("Bulk request finished, successfully sent %d "
"operations", successes)
if errors:
LOG.error(
"Bulk request finished with errors: %r", errors)
except es_exceptions.ElasticsearchException:
LOG.exception("Bulk request failed with exception")
def __init__(self, docman):
# Parent object
self.docman = docman
# Action buffer for bulk indexing
self.action_buffer = []
# Docs to update
# Dict stores all documents for which firstly
# source has to be retrieved from Elasticsearch
# and then apply_update needs to be performed
# Format: [ (doc, update_spec, action_buffer_index, get_from_ES) ]
self.doc_to_update = []
# Below dictionary contains ids of documents
# which need to be retrieved from Elasticsearch
# It prevents from getting same document multiple times from ES
# Format: {"_index": {"_type": {"_id": True}}}
self.doc_to_get = {}
# Dictionary of sources
# Format: {"_index": {"_type": {"_id": {"_source": actual_source}}}}
self.sources = {}
def bulk_update(es, actions, batch_size=250):
indexed = 0
for i in range(0, len(actions), batch_size):
resp = bulk(es, actions[i:(i+batch_size)])
indexed += resp[0]
print('\tindexed %s / %s' % (indexed, len(actions)))
return indexed
def index_worker(self, queue, size=200):
actions = []
indexed = 0
while True:
item = queue.get()
if item is None:
break
id_submission, analysis = item
doc = {
'_index': 'fcc-comments',
'_type': 'document',
'_op_type': 'update',
'_id': id_submission,
'doc': {'analysis': analysis},
}
actions.append(doc)
if len(actions) == size:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
try:
response = bulk(self.es, actions)
indexed += response[0]
print('\tanalyzed %s/%s\t%s%%' % (indexed, self.limit,
int(indexed / self.limit * 100)))
actions = []
except ConnectionTimeout:
print('error indexing: connection timeout')
with warnings.catch_warnings():
warnings.simplefilter('ignore')
response = bulk(self.es, actions)
indexed += response[0]
print('indexed %s' % (indexed))
def bulk_index(self, queue, size=20):
actions = []
indexed = 0
ids = set()
while True:
item = queue.get()
if item is None:
break
doc_id = item
doc = {
'_index': 'fcc-comments',
'_type': 'document',
'_op_type': 'update',
'_id': doc_id,
'doc': {'analysis.sentiment_sig_terms_ordered': True},
}
actions.append(doc)
ids.add(doc_id)
if len(actions) == size:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
try:
response = bulk(self.es, actions)
indexed += response[0]
if not indexed % 200:
print('\tindexed %s/%s\t%s%%' % (indexed, self.limit,
int(indexed / self.limit * 100)))
actions = []
except ConnectionTimeout:
print('error indexing: connection timeout')
with warnings.catch_warnings():
warnings.simplefilter('ignore')
response = bulk(self.es, actions)
indexed += response[0]
print('indexed %s' % (indexed))
ids = list(ids)
#print('%s\n%s' % (len(ids), ' '.join(ids))
def bulk_index_from_it(
self, index, it, transform=lambda x: x, last_updated=True):
gc.collect()
err_ids = []
def _it():
for doc_body in it:
try:
log.debug('Working on record: %s', doc_body)
_id = doc_body.get(self.id_field)
try:
doc_body = transform(doc_body)
except Exception as e:
log.warn(
'Error while transforming doc ID = %s: %s',
_id, e)
raise e
if doc_body:
if last_updated:
doc_body['last_updated'] = datetime.now()
op = self.partial_index_op(
doc_id=_id,
index=index,
doc_body=doc_body,
doc_type=self.doc_type)
yield op
except Exception as e:
log.warn('Cannot process doc ID = %s: %s', _id, e)
err_ids.append(_id)
try:
self.bulk(_it())
log.info('Invoked self.bulk(_it())')
except Exception as e:
log.warn('Error in bulk index because: %s', e)
return err_ids
def bulk(self, it):
try:
log.info('Sending bulk request on iterable/generator')
args = dict(client=self.client,
actions=it,
chunk_size=self.bulk_size,
raise_on_exception=False,
raise_on_error=False,
stats_only=False,
request_timeout=self.timeout)
res_succ, res_err = helpers.bulk(**args)
log.info(
'Sent bulk request on queue iterator: '
'successfull ops = %d, failed ops = %d',
res_succ, len(res_err))
for res in res_err:
log.warn('Error response: %s', res)
except Exception as e:
log.error('Error in storing: %s', e, exc_info=True)
def bulk_index(self, index_name, dict_list):
res = helpers.bulk( self.es, dict_list)
print(" response: '%s'" % (str(res)))
print res
print str(res)
return
def main():
parser = ArgumentParser()
parser.add_argument('-d', '--dump-file')
parser.add_argument('-e', '--elasticsearch-host', default='localhost:9200')
parser.add_argument('-i', '--index', default='wikipedia')
parser.add_argument('-l', '--limit', default=0, type=int)
parser.add_argument('-p', '--id-prefix')
opts = parser.parse_args()
dump_fn = opts.dump_file
es_host = opts.elasticsearch_host
es_index = opts.index
limit = opts.limit if opts.limit > 0 else None
prefix = opts.id_prefix
if not dump_fn:
logging.error('missing filenames ...')
sys.exit(1)
gen = articles(dump_fn, limit=limit)
es = Elasticsearch(hosts=[es_host])
ic = IndicesClient(es)
if not ic.exists(es_index):
ic.create(es_index)
while True:
chunk = islice(gen, 0, 1000)
actions = [{'_index': es_index,
'_type': 'article',
'_id': article['id'] if not prefix else '%s-%s' % (prefix, article['id']),
'_source': article}
for article in chunk]
if not actions:
break
helpers.bulk(es, actions)
def update(self, id, extra_doc, index_type=None, bulk=False):
'''update an existing doc with extra_doc.'''
conn = self.conn
index_name = self.ES_INDEX_NAME
index_type = index_type or self.ES_INDEX_TYPE
# old way, update locally and then push it back.
# return self.conn.update(extra_doc, self.ES_INDEX_NAME,
# index_type, id)
if not bulk:
body = {'doc': extra_doc}
return conn.update(index_name, index_type, id, body)
else:
raise NotImplementedError
'''
# ES supports bulk update since v0.90.1.
op_type = 'update'
cmd = {op_type: {"_index": index_name,
"_type": index_type,
"_id": id}
}
doc = json.dumps({"doc": extra_doc}, cls=conn.encoder)
command = "%s\n%s" % (json.dumps(cmd, cls=conn.encoder), doc)
conn.bulker.add(command)
return conn.flush_bulk()
'''
def update_docs(self, partial_docs, **kwargs):
index_name = self.ES_INDEX_NAME
doc_type = self.ES_INDEX_TYPE
def _get_bulk(doc):
doc = {
'_op_type': 'update',
"_index": index_name,
"_type": doc_type,
"_id": doc['_id'],
"doc": doc
}
return doc
actions = (_get_bulk(doc) for doc in partial_docs)
return helpers.bulk(self.conn, actions, chunk_size=self.step, **kwargs)
def create_all_dictionary_data(connection, index_name, doc_type, logger, entity_data_directory_path=None,
csv_file_paths=None, **kwargs):
"""
Indexes all entity data from csv files stored at entity_data_directory_path, one file at a time
Args:
connection: Elasticsearch client object
index_name: The name of the index
doc_type: The type of the documents being indexed
logger: logging object to log at debug and exception level
entity_data_directory_path: Optional, Path of the directory containing the entity data csv files.
Default is None
csv_file_paths: Optional, list of file paths to csv files. Default is None
kwargs:
Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
"""
logger.debug('%s: +++ Started: create_all_dictionary_data() +++' % log_prefix)
if entity_data_directory_path:
logger.debug('%s: \t== Fetching from variants/ ==' % log_prefix)
csv_files = get_files_from_directory(entity_data_directory_path)
for csv_file in csv_files:
csv_file_path = os.path.join(entity_data_directory_path, csv_file)
create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
csv_file_path=csv_file_path, update=False, logger=logger, **kwargs)
if csv_file_paths:
for csv_file_path in csv_file_paths:
if csv_file_path and csv_file_path.endswith('.csv'):
create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
csv_file_path=csv_file_path, update=False, logger=logger, **kwargs)
logger.debug('%s: +++ Finished: create_all_dictionary_data() +++' % log_prefix)
def recreate_all_dictionary_data(connection, index_name, doc_type, logger, entity_data_directory_path=None,
csv_file_paths=None, **kwargs):
"""
Re-indexes all entity data from csv files stored at entity_data_directory_path, one file at a time
Args:
connection: Elasticsearch client object
index_name: The name of the index
doc_type: The type of the documents being indexed
logger: logging object to log at debug and exception level
entity_data_directory_path: Optional, Path of the directory containing the entity data csv files.
Default is None
csv_file_paths: Optional, list of file paths to csv files. Default is None
kwargs:
Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
"""
logger.debug('%s: +++ Started: recreate_all_dictionary_data() +++' % log_prefix)
if entity_data_directory_path:
logger.debug('%s: \t== Fetching from variants/ ==' % log_prefix)
csv_files = get_files_from_directory(entity_data_directory_path)
for csv_file in csv_files:
csv_file_path = os.path.join(entity_data_directory_path, csv_file)
create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
csv_file_path=csv_file_path, update=True, logger=logger, **kwargs)
if csv_file_paths:
for csv_file_path in csv_file_paths:
if csv_file_path and csv_file_path.endswith('.csv'):
create_dictionary_data_from_file(connection=connection, index_name=index_name, doc_type=doc_type,
csv_file_path=csv_file_path, update=True, logger=logger, **kwargs)
logger.debug('%s: +++ Finished: recreate_all_dictionary_data() +++' % log_prefix)
def get_variants_dictionary_value_from_key(csv_file_path, dictionary_key, logger, **kwargs):
"""
Reads the csv file at csv_file_path and create a dictionary mapping entity value to a list of their variants.
the entity values are first column of the csv file and their corresponding variants are stored in the second column
delimited by '|'
Args:
csv_file_path: absolute file path of the csv file populate entity data from
dictionary_key: name of the entity to be put the values under
logger: logging object to log at debug and exception level
kwargs:
Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
Returns:
Dictionary mapping entity value to a list of their variants.
"""
dictionary_value = defaultdict(list)
try:
csv_reader = read_csv(csv_file_path)
next(csv_reader)
for data_row in csv_reader:
try:
data = map(str.strip, data_row[1].split('|'))
# remove empty strings
data = [variant for variant in data if variant]
dictionary_value[data_row[0].strip().replace('.', ' ')].extend(data)
except Exception as e:
logger.exception('%s: \t\t== Exception in dict creation for keyword: %s -- %s -- %s =='
% (log_prefix, dictionary_key, data_row, e))
except Exception as e:
logger.exception(
'%s: \t\t\t=== Exception in __get_variants_dictionary_value_from_key() Dictionary Key: %s \n %s ===' % (
log_prefix,
dictionary_key, e.message))
return dictionary_value
def create_dictionary_data_from_file(connection, index_name, doc_type, csv_file_path, update, logger, **kwargs):
"""
Indexes all entity data from the csv file at path csv_file_path
Args:
connection: Elasticsearch client object
index_name: The name of the index
doc_type: The type of the documents being indexed
csv_file_path: absolute file path of the csv file to populate entity data from
update: boolean, True if this is a update type operation, False if create/index type operation
logger: logging object to log at debug and exception level
kwargs:
Refer http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk
"""
base_file_name = os.path.basename(csv_file_path)
dictionary_key = os.path.splitext(base_file_name)[0]
if update:
delete_entity_by_name(connection=connection, index_name=index_name, doc_type=doc_type,
entity_name=dictionary_key, logger=logger, **kwargs)
dictionary_value = get_variants_dictionary_value_from_key(csv_file_path=csv_file_path,
dictionary_key=dictionary_key, logger=logger,
**kwargs)
if dictionary_value:
add_data_elastic_search(connection=connection, index_name=index_name, doc_type=doc_type,
dictionary_key=dictionary_key,
dictionary_value=remove_duplicate_data(dictionary_value), logger=logger, **kwargs)
if os.path.exists(csv_file_path) and os.path.splitext(csv_file_path)[1] == '.csv':
os.path.basename(csv_file_path)
def _index_chunk(chunk, doc_type, index):
"""
Add/update a list of records in Elasticsearch
Args:
chunk (list):
List of serialized items to index
doc_type (str):
The doc type for each item
index (str): An Elasticsearch index
Returns:
int: Number of items inserted into Elasticsearch
"""
conn = get_conn(verify_index=index)
insert_count, errors = bulk(
conn,
chunk,
index=index,
doc_type=doc_type,
)
if len(errors) > 0:
raise ReindexException("Error during bulk insert: {errors}".format(
errors=errors
))
refresh_index(index)
return insert_count
def output_es(es, records, start_record_num, end_record_num, total_records, per_batch):
print("Inserting records %d through %d of %s" % (start_record_num, end_record_num,
(str(total_records) if total_records > 0 else '???')))
num_success, error_list = helpers.bulk(es, records, chunk_size=1000)
if num_success != per_batch:
print("[ERROR] %d of %d inserts succeeded!" % (num_success,per_batch))
print("[ERROR] Errors:")
print error_list
def insert_image(chunk_size, max_results=5000, from_file=None):
count = 0
success_count = 0
es = search.init()
search.Image.init()
mapping = search.Image._doc_type.mapping
mapping.save(settings.ELASTICSEARCH_INDEX)
for chunk in grouper_it(chunk_size, import_from_file(from_file)):
if not from_file and count >= max_results: # Load everything if loading from file
break
else:
images = []
for result in chunk:
images.append(result)
if len(images) > 0:
try:
# Bulk update the search engine too
search_objs = [search.db_image_to_index(img).to_dict(include_meta=True) for img in images]
models.Image.objects.bulk_create(images)
helpers.bulk(es, search_objs)
log.debug("*** Committed set of %d images", len(images))
success_count += len(images)
except IntegrityError as e:
log.warn("Got one or more integrity errors on batch: %s", e)
finally:
count += len(images)
return success_count
def write2ES(AllData, indexName, typeName , elasticPort=9220, elasticHost="localhost"):
es = Elasticsearch([{'host': elasticHost, 'port': elasticPort}], http_auth=(elasticUsername, elasticPassword))
messages = []
logging.debug('Running preset')
for record in AllData:
# print(record)
tmpMap = {"_op_type": "index", "_index": indexName, "_type": typeName }
if len(record)>2:
tmpMap.update({"label":record[0].replace("http://dbpedia.org/resource/",""), "entity_linking":{"URI":record[0].replace("/resource/","/page/"),"connection":record[1] ,"target":record[2].replace("/resource/","/page/")}})
else:
tmpMap.update({"label":record[0].replace("http://dbpedia.org/resource/",""), "entity_linking":{"URI":record[0].replace("/resource/","/page/"),"target":record[1]}})
messages.append(tmpMap)
#print messages
result = bulk(es, messages)
return result
def writeDashboard2ES(name, indexName=".kibi", typeName="dashboard" , elasticPort=9220, elasticHost="localhost"):
es = Elasticsearch([{'host': elasticHost, 'port': elasticPort}], http_auth=(elasticUsername, elasticPassword))
messages = []
PercentageEmotions = "PercentageEmotions"
logging.debug('Writing dashboard')
print('Writing dashboard')
PercentageEmotions = "PercentageEmotions"
EmotionDistribution = "EmotionDistribution"
cloudVisualization= name[:name.find("_")]+"Cloud"
nameFull= name[:name.find("_")]
tmpMap = {"_op_type": "index", "_index": indexName, "_type": typeName, "_id":name }
tmpMap.update({"defaultIndex": "reviews", "kibi:relationalPanel": "true"})
tmpMap.update({"savedSearchId": name})
tmpMap.update({"sort": ["_score", "desc" ], "version": 1,"description": "", "hits": 0, "optionsJSON": "{\"darkTheme\":false}", "uiStateJSON": "{}", "timeRestore": "false"})
tmpMap.update({"kibanaSavedObjectMeta": { "searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}}}]}"}})
if name.find("tweets")>= 0:
tmpMap.update({ "title":name, "panelsJSON": "[{\"id\":\""+name+"\",\"type\":\"search\",\"panelIndex\":3,\"size_x\":7,\"size_y\":6,\"col\":1,\"row\":1,\"columns\":[\"emotions.emotion\",\"detected_entities\",\"text\"],\"sort\":[\"entity_linking.URI\",\"desc\"]},{\"id\":\""+EmotionDistribution+"\",\"type\":\"visualization\",\"panelIndex\":4,\"size_x\":5,\"size_y\":4,\"col\":8,\"row\":3},{\"id\":\""+PercentageEmotions+"\",\"type\":\"visualization\",\"panelIndex\":5,\"size_x\":5,\"size_y\":2,\"col\":8,\"row\":1}]"})
else:
# tmpMap.update({ "title":name,"panelsJSON": "[{\"id\":\""+name+"\",\"type\":\"search\",\"panelIndex\":1,\"size_x\":6,\"size_y\":6,\"col\":1,\"row\":1,\"columns\":[\"label\",\"entity_linking.target\"],\"sort\":[\"_score\",\"desc\"]},{\"id\":\"entityEmotion\",\"type\":\"visualization\",\"panelIndex\":3,\"size_x\":6,\"size_y\":4,\"col\":7,\"row\":6},{\"id\":\"Location\",\"type\":\"search\",\"panelIndex\":2,\"size_x\":6,\"size_y\":2,\"col\":1,\"row\":7,\"columns\":[\"label\",\"entity_linking.connection\",\"entity_linking.target\"],\"sort\":[\"connection\",\"asc\"]},{\"id\":\"PercentageEmotions\",\"type\":\"visualization\",\"panelIndex\":4,\"size_x\":6,\"size_y\":5,\"col\":7,\"row\":1}]"})
tmpMap.update({ "title":name,"panelsJSON": "[{\"col\":1,\"columns\":[\"label\",\"entity_linking.target\"],\"id\":\""+name+"\",\"panelIndex\":1,\"row\":1,\"size_x\":6,\"size_y\":2,\"sort\":[\"_score\",\"desc\"],\"type\":\"search\"},{\"col\":7,\"id\":\""+PercentageEmotions+"\",\"panelIndex\":4,\"row\":1,\"size_x\":6,\"size_y\":3,\"type\":\"visualization\"},{\"col\":1,\"columns\":[\"label\",\"entity_linking.connection\",\"entity_linking.target\"],\"id\":\""+nameFull+"\",\"panelIndex\":2,\"row\":3,\"size_x\":6,\"size_y\":2,\"sort\":[\"connection\",\"asc\"],\"type\":\"search\"},{\"id\":\""+cloudVisualization+"\",\"type\":\"visualization\",\"panelIndex\":5,\"size_x\":3,\"size_y\":2,\"col\":1,\"row\":5},{\"id\":\""+EmotionDistribution+"\",\"type\":\"visualization\",\"panelIndex\":6,\"size_x\":6,\"size_y\":3,\"col\":7,\"row\":4}]"})
#"[{\"id\":\""+name+"\",\"type\":\"search\",\"panelIndex\":1,\"size_x\":12,\"size_y\":6,\"col\":1,\"row\":1,\"columns\":[\"label\", \"entity_linking.connection\", \"entity_linking.target\"],\"sort\":[\"_score\",\"desc\"]}]"})
messages.append(tmpMap)
print (messages)
result = bulk(es, messages)
return result
def add_commits(self, source_it):
def gen(it):
for source in it:
d = {}
d['_index'] = self.index
d['_type'] = self.dbname
d['_op_type'] = 'create'
d['_id'] = source['sha']
d['_source'] = source
yield d
bulk(self.es, gen(source_it))
self.es.indices.refresh(index=self.index)
def del_commits(self, sha_list):
def gen(it):
for sha in it:
d = {}
d['_index'] = self.index
d['_type'] = self.dbname
d['_op_type'] = 'delete'
d['_id'] = sha
yield d
bulk(self.es, gen(sha_list))
self.es.indices.refresh(index=self.index)
def execute(self):
"""
Index data of specified queryset
"""
client = elasticsearch.Elasticsearch(
hosts=settings.ELASTIC_SEARCH_HOSTS,
# sniff_on_start=True,
retry_on_timeout=True,
refresh=True
)
start_time = time.time()
duration = time.time()
loop_time = elapsed = duration - start_time
for batch_i, total_batches, start, end, total, qs in self.batch_qs():
loop_start = time.time()
total_left = ((total_batches - batch_i) * loop_time)
progres_msg = \
'%s of %s : %8s %8s %8s duration: %.2f left: %.2f' % (
batch_i, total_batches, start, end, total, elapsed,
total_left
)
log.debug(progres_msg)
helpers.bulk(
client, (self.convert(obj).to_dict(include_meta=True)
for obj in qs),
raise_on_error=True,
refresh=True
)
now = time.time()
elapsed = now - start_time
loop_time = now - loop_start
def save_to_elasticsearch(chars, es, es_index):
print('\r', "[elasticsearch] %s charities to save" % len(chars))
print('\r', "[elasticsearch] saving %s charities to %s index" % (len(chars), es_index))
results = bulk(es, list(chars.values()))
print('\r', "[elasticsearch] saved %s charities to %s index" % (results[0], es_index))
print('\r', "[elasticsearch] %s errors reported" % len(results[1]))
def load(lines, config):
bulks = grouper(lines, config['bulk_size'] * 3)
if config['progress']:
bulks = [x for x in bulks]
with click.progressbar(bulks) as pbar:
for i, bulk in enumerate(pbar):
try:
single_bulk_to_es(bulk, config, config['with_retry'])
except Exception as e:
log('warn', 'Chunk {i} got exception ({e}) while processing'.format(e=e, i=i))
def gen_action(self, **kwargs):
"""
single ES doc that gets consumed inside bulk uploads
"""
action = {
"_op_type": _op_type,
"_index": kwargs['index_name'],
"_type": kwargs['doc_type'],
"_id": kwargs['uid'],
# "@timestamp": kwargs['timestamp'],
"_source": kwargs['data']
}
return action
def buildEvent(timestamp = None):
event = {}
#if we don't have a desired timestamp passed in for the event, use the current time in UTC
if timestamp == None:
event['timestamp'] = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
else:
event['timestamp'] = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
#TODO make some of these random inputs from seed lists
#add these 2 for bulk API goodness
event['_index'] = 'smoke_event'
event['_type'] = 'smoke_event'
event['request'] = '/index.html'
event['response'] = '200'
event['agent'] = 'Firefox'
event['remote_ip'] = '1.1.1.1'
event['remote_user'] = ''
event['bytes'] = '1234'
event['referrer'] = 'http://example.com'
json_event = json.dumps(event)
return json_event
def main():
bulkSize = 10000 # elasticsearch bulk size
daysBack = 7
anomalyPeriod = 30 # period for anomaly to last, in minutes
anomalyMagnification = 10 # e.g. 10x more than the normal
buildEventSeries(daysBack, bulkSize)
buildAnomalyEventSeries(daysBack, anomalyPeriod, anomalyMagnification, bulkSize)