Python elasticsearch.helpers 模块,streaming_bulk() 实例源码
我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用elasticsearch.helpers.streaming_bulk()。
def index_all(self, index_name):
"""Index all available documents, using streaming_bulk for speed
Args:
index_name (string): The index
"""
oks = 0
notoks = 0
for ok, item in streaming_bulk(
self.es_client,
self._iter_documents(index_name)
):
if ok:
oks += 1
else:
notoks += 1
logging.info(
"Import results: %d ok, %d not ok",
oks,
notoks
)
def write(self, bucket, doc_type, rows, primary_key, update=False, as_generator=False):
if primary_key is None or len(primary_key) == 0:
raise ValueError('primary_key cannot be an empty list')
def actions(rows_, doc_type_, primary_key_, update_):
if update_:
for row_ in rows_:
yield {
'_op_type': 'update',
'_index': bucket,
'_type': doc_type_,
'_id': self.generate_doc_id(row_, primary_key_),
'_source': {
'doc': row_,
'doc_as_upsert': True
}
}
else:
for row_ in rows_:
yield {
'_op_type': 'index',
'_index': bucket,
'_type': doc_type_,
'_id': self.generate_doc_id(row_, primary_key_),
'_source': row_
}
iterables = itertools.tee(rows)
actions_iterable = actions(iterables[0], doc_type, primary_key, update)
iter = zip(streaming_bulk(self.__es, actions=actions_iterable), iterables[1])
if as_generator:
for result, row in iter:
yield row
else:
collections.deque(iter, maxlen=0)
self.__es.indices.flush(bucket)
def record_events(self, events):
def _build_bulk_index(event_list):
for ev in event_list:
traits = {t.name: t.value for t in ev.traits}
yield {'_op_type': 'create',
'_index': '%s_%s' % (self.index_name,
ev.generated.date().isoformat()),
'_type': ev.event_type,
'_id': ev.message_id,
'_source': {'timestamp': ev.generated.isoformat(),
'traits': traits,
'raw': ev.raw}}
error = None
for ok, result in helpers.streaming_bulk(
self.conn, _build_bulk_index(events)):
if not ok:
__, result = result.popitem()
if result['status'] == 409:
LOG.info('Duplicate event detected, skipping it: %s',
result)
else:
LOG.exception('Failed to record event: %s', result)
error = storage.StorageUnknownWriteError(result)
if self._refresh_on_write:
self.conn.indices.refresh(index='%s_*' % self.index_name)
while self.conn.cluster.pending_tasks(local=True)['tasks']:
pass
if error:
raise error
def index(self, points):
for p in points:
p['_index'] = self.config['indexer']['idx_name']
p['_type'] = self.es_type
results = helpers.streaming_bulk(self.client, points)
for status, r in results:
if not status:
log.debug("index err result %s", r)
def index_events(client, events):
results = helpers.streaming_bulk(client, events)
for status, r in results:
if not status:
log.debug("index err result %s", r)
def handle_command(self, doc, namespace, timestamp):
# Flush buffer before handle command
self.commit()
db = namespace.split('.', 1)[0]
if doc.get('dropDatabase'):
dbs = self.command_helper.map_db(db)
for _db in dbs:
self.elastic.indices.delete(index=_db.lower())
if doc.get('renameCollection'):
raise errors.OperationFailed(
"elastic_doc_manager does not support renaming a mapping.")
if doc.get('create'):
db, coll = self.command_helper.map_collection(db, doc['create'])
if db and coll:
self.elastic.indices.put_mapping(
index=db.lower(), doc_type=coll,
body={
"_source": {"enabled": True}
})
if doc.get('drop'):
db, coll = self.command_helper.map_collection(db, doc['drop'])
if db and coll:
# This will delete the items in coll, but not get rid of the
# mapping.
warnings.warn("Deleting all documents of type %s on index %s."
"The mapping definition will persist and must be"
"removed manually." % (coll, db))
responses = streaming_bulk(
self.elastic,
(dict(result, _op_type='delete') for result in scan(
self.elastic, index=db.lower(), doc_type=coll)))
for ok, resp in responses:
if not ok:
LOG.error(
"Error occurred while deleting ElasticSearch docum"
"ent during handling of 'drop' command: %r" % resp)
def bulk_upsert(self, docs, namespace, timestamp):
"""Insert multiple documents into Elasticsearch."""
def docs_to_upsert():
doc = None
for doc in docs:
# Remove metadata and redundant _id
index, doc_type = self._index_and_mapping(namespace)
doc_id = u(doc.pop("_id"))
document_action = {
'_index': index,
'_type': doc_type,
'_id': doc_id,
'_source': self._formatter.format_document(doc)
}
document_meta = {
'_index': self.meta_index_name,
'_type': self.meta_type,
'_id': doc_id,
'_source': {
'ns': namespace,
'_ts': timestamp
}
}
yield document_action
yield document_meta
if doc is None:
raise errors.EmptyDocsError(
"Cannot upsert an empty sequence of "
"documents into Elastic Search")
try:
kw = {}
if self.chunk_size > 0:
kw['chunk_size'] = self.chunk_size
responses = streaming_bulk(client=self.elastic,
actions=docs_to_upsert(),
**kw)
for ok, resp in responses:
if not ok:
LOG.error(
"Could not bulk-upsert document "
"into ElasticSearch: %r" % resp)
if self.auto_commit_interval == 0:
self.commit()
except errors.EmptyDocsError:
# This can happen when mongo-connector starts up, there is no
# config file, but nothing to dump
pass
def bulk_upsert(self, docs, namespace, timestamp):
"""Insert multiple documents into Elasticsearch."""
def docs_to_upsert():
doc = None
for doc in docs:
# Remove metadata and redundant _id
index, doc_type = self._index_and_mapping(namespace)
doc_id = u(doc.pop("_id"))
document_action = {
'_index': index,
'_type': doc_type,
'_id': doc_id,
'_source': self._formatter.format_document(doc)
}
document_meta = {
'_index': self.meta_index_name,
'_type': self.meta_type,
'_id': doc_id,
'_source': {
'ns': namespace,
'_ts': timestamp
}
}
yield document_action
yield document_meta
if doc is None:
raise errors.EmptyDocsError(
"Cannot upsert an empty sequence of "
"documents into Elastic Search")
try:
kw = {}
if self.chunk_size > 0:
kw['chunk_size'] = self.chunk_size
responses = streaming_bulk(client=self.elastic,
actions=docs_to_upsert(),
**kw)
for ok, resp in responses:
if not ok:
LOG.error(
"Could not bulk-upsert document "
"into ElasticSearch: %r" % resp)
if self.auto_commit_interval == 0:
self.commit()
except errors.EmptyDocsError:
# This can happen when mongo-connector starts up, there is no
# config file, but nothing to dump
pass
def record_events(self, events):
datetime_trait_fields = [
'audit_period_beginning',
'audit_period_ending',
'deleted_at',
'created_at',
'launched_at',
'modify_at'
]
def _build_bulk_index(event_list):
for ev in event_list:
traits = {}
for t in ev.traits:
name = t.name
value = t.value
if name in datetime_trait_fields:
try:
ts = timeutils.parse_isotime(value)
ts = timeutils.normalize_time(ts)
value = timeutils.strtime(ts)
except ValueError:
LOG.exception(
_LE('Could not parse timestamp [%s] from [%s] traits field' % (value, name))
)
value = t.value
traits[name] = value
yield {'_op_type': 'create',
'_index': '%s_%s' % (self.index_name,
ev.generated.date().isoformat()),
'_type': ev.event_type,
'_id': ev.message_id,
'_source': {'timestamp': ev.generated.isoformat(),
'traits': traits,
'raw': ev.raw}}
error = None
for ok, result in helpers.streaming_bulk(
self.conn, _build_bulk_index(events)):
if not ok:
__, result = result.popitem()
if result['status'] == 409:
LOG.info(_LI('Duplicate event detected, skipping it: %s')
% result)
else:
LOG.exception(_LE('Failed to record event: %s') % result)
error = storage.StorageUnknownWriteError(result)
if self._refresh_on_write:
self.conn.indices.refresh(index='%s_*' % self.index_name)
while self.conn.cluster.pending_tasks(local=True)['tasks']:
pass
if error:
raise error