/** * Wait 60 seconds or until Elasticsearch is up and running, whatever * comes first * @param client the client to use to check if Elasticsearch is running * @return an observable that emits exactly one item when Elasticsearch * is running */ public Observable<Void> waitUntilElasticsearchRunning( ElasticsearchClient client) { final Throwable repeat = new NoStackTraceThrowable(""); return Observable.defer(client::isRunning).flatMap(running -> { if (!running) { return Observable.error(repeat); } return Observable.just(running); }).retryWhen(errors -> { Observable<Throwable> o = errors.flatMap(t -> { if (t == repeat) { // Elasticsearch is still not up, retry return Observable.just(t); } // forward error return Observable.error(t); }); // retry for 60 seconds return RxUtils.makeRetry(60, 1000, null).call(o); }).map(r -> null); }
@Override public Observable<Long> count(String type, JsonObject query) { String uri = "/" + index + "/" + type + "/_count"; JsonObject source = new JsonObject(); if (query != null) { source.put("query", query); } return performRequestRetry(HttpMethod.GET, uri, source.encode()) .flatMap(sr -> { Long l = sr.getLong("count"); if (l == null) { return Observable.error(new NoStackTraceThrowable( "Could not count documents")); } return Observable.just(l); }); }
/** * Ensure the Elasticsearch index exists * @return an observable that will emit a single item when the index has * been created or if it already exists */ @Override public Observable<Void> ensureIndex() { // check if the index exists return indexExists().flatMap(exists -> { if (exists) { return Observable.just(null); } else { // index does not exist. create it. return createIndex().flatMap(ack -> { if (ack) { return Observable.just(null); } return Observable.error(new NoStackTraceThrowable("Index creation " + "was not acknowledged by Elasticsearch")); }); } }); }
/** * Ensure the Elasticsearch mapping exists * @param type the target type for the mapping * @return an observable that will emit a single item when the mapping has * been created or if it already exists */ @Override public Observable<Void> ensureMapping(String type, JsonObject mapping) { // check if the target type exists return typeExists(type).flatMap(exists -> { if (exists) { return Observable.just(null); } else { // target type does not exist. create the mapping. return putMapping(type, mapping).flatMap(ack -> { if (ack) { return Observable.just(null); } return Observable.error(new NoStackTraceThrowable("Mapping creation " + "was not acknowledged by Elasticsearch")); }); } }); }
@Override public void doRun(String[] remainingArgs, InputReader in, PrintWriter out, Handler<Integer> handler) throws OptionParserException, IOException { GeoRocketClient client = createClient(); client.getStore().setProperties(query, layer, properties, ar -> { if (ar.failed()) { client.close(); Throwable t = ar.cause(); error(t.getMessage()); if (!(t instanceof NoStackTraceThrowable)) { log.error("Could not set properties", t); } handler.handle(1); } else { handler.handle(0); } }); }
@Override public void doRun(String[] remainingArgs, InputReader in, PrintWriter out, Handler<Integer> handler) throws OptionParserException, IOException { GeoRocketClient client = createClient(); client.getStore().delete(query, layer, ar -> { if (ar.failed()) { client.close(); Throwable t = ar.cause(); error(t.getMessage()); if (!(t instanceof NoStackTraceThrowable)) { log.error("Could not delete from store", t); } handler.handle(1); } else { handler.handle(0); } }); }
@Override public void doRun(String[] remainingArgs, InputReader in, PrintWriter out, Handler<Integer> handler) throws OptionParserException, IOException { GeoRocketClient client = createClient(); client.getStore().appendTags(query, layer, tags, ar -> { if (ar.failed()) { client.close(); Throwable t = ar.cause(); error(t.getMessage()); if (!(t instanceof NoStackTraceThrowable)) { log.error("Could not add the tags", t); } handler.handle(1); } else { handler.handle(0); } }); }
@Override public void doRun(String[] remainingArgs, InputReader in, PrintWriter out, Handler<Integer> handler) throws OptionParserException, IOException { GeoRocketClient client = createClient(); client.getStore().removeProperties(query, layer, properties, ar -> { if (ar.failed()) { client.close(); Throwable t = ar.cause(); error(t.getMessage()); if (!(t instanceof NoStackTraceThrowable)) { log.error("Could not remove properties", t); } handler.handle(1); } else { handler.handle(0); } }); }
@Override public void doRun(String[] remainingArgs, InputReader in, PrintWriter out, Handler<Integer> handler) throws OptionParserException, IOException { GeoRocketClient client = createClient(); client.getStore().getPropertyValues(property, query, layer, ar -> { if (ar.failed()) { Throwable t = ar.cause(); error(t.getMessage()); if (!(t instanceof NoStackTraceThrowable)) { log.error("Could not get values of property " + property, t); } handler.handle(1); } else { ar.result().handler(buf -> { out.write(buf.toString("utf-8")); }); ar.result().endHandler(v -> { client.close(); handler.handle(0); }); } }); }
@Override public void doRun(String[] remainingArgs, InputReader in, PrintWriter out, Handler<Integer> handler) throws OptionParserException, IOException { GeoRocketClient client = createClient(); client.getStore().removeTags(query, layer, tags, ar -> { if (ar.failed()) { client.close(); Throwable t = ar.cause(); error(t.getMessage()); if (!(t instanceof NoStackTraceThrowable)) { log.error("Could not remove the tags", t); } handler.handle(1); } else { handler.handle(0); } }); }
/** * Write an error and code to the response. * * @param exception the exception that caused the error. */ default void error(Throwable exception) { if (exception instanceof CoreException || exception instanceof CoreRuntimeException) { write(Protocol.response(((CoreExceptionFormat) exception).status(), exception)); } else if (exception instanceof DecodeException || exception instanceof NoStackTraceThrowable) { write(Protocol.response(ResponseStatus.ERROR, exception)); } else { write(Protocol.response(ResponseStatus.ERROR, new UnmappedException(exception))); } }
private Future<RpcResponse> circuitBreakerWrapper(String traceId, Future<RpcResponse> circuitBreakerFuture) { Future<RpcResponse> future = Future.future(); circuitBreakerFuture.setHandler(ar -> { if (ar.failed()) { if (ar.cause() instanceof NoStackTraceThrowable && "operation timeout".equals(ar.cause().getMessage())) { future.fail(SystemException.create(DefaultErrorCode.TIME_OUT) .set("timeout", config.getLong("timeout", 10000l))); return; } if (ar.cause() instanceof RuntimeException && "open circuit".equals(ar.cause().getMessage())) { Log.create(LOGGER) .setTraceId(traceId) .setModule("RPC") .setEvent("BreakerTripped") .error(); future.fail(SystemException.create(DefaultErrorCode.BREAKER_TRIPPED) .set("details", String.format("Please try again after %ds", config.getLong("resetTimeout", 30000l) / 1000))); return; } future.fail(ar.cause()); } else { future.complete(ar.result()); } }); return future; }
/** * Open a chunk and convert it to an Elasticsearch document. Retry operation * several times before failing. * @param path the path to the chunk to open * @param chunkMeta metadata about the chunk * @param indexMeta metadata used to index the chunk * @return an observable that emits the document */ private Observable<Map<String, Object>> openChunkToDocument( String path, ChunkMeta chunkMeta, IndexMeta indexMeta) { return Observable.defer(() -> store.rxGetOne(path) .flatMapObservable(chunk -> { List<? extends IndexerFactory> factories; Operator<? extends StreamEvent, Buffer> parserOperator; // select indexers and parser depending on the mime type String mimeType = chunkMeta.getMimeType(); if (belongsTo(mimeType, "application", "xml") || belongsTo(mimeType, "text", "xml")) { factories = xmlIndexerFactories; parserOperator = new XMLParserOperator(); } else if (belongsTo(mimeType, "application", "json")) { factories = jsonIndexerFactories; parserOperator = new JsonParserOperator(); } else { return Observable.error(new NoStackTraceThrowable(String.format( "Unexpected mime type '%s' while trying to index " + "chunk '%s'", mimeType, path))); } // call meta indexers Map<String, Object> metaResults = new HashMap<>(); for (MetaIndexerFactory metaIndexerFactory : metaIndexerFactories) { MetaIndexer metaIndexer = metaIndexerFactory.createIndexer(); metaIndexer.onIndexChunk(path, chunkMeta, indexMeta); metaResults.putAll(metaIndexer.getResult()); } // convert chunk to document and close it return chunkToDocument(chunk, indexMeta.getFallbackCRSString(), parserOperator, factories) .doAfterTerminate(chunk::close) // add results from meta indexers to converted document .doOnNext(doc -> doc.putAll(metaResults)); })) .retryWhen(makeRetry()); }
/** * Delete chunks from the index * @param body the message containing the paths to the chunks to delete * @return an observable that emits a single item when the chunks have * been deleted successfully */ private Observable<Void> onDelete(JsonObject body) { JsonArray paths = body.getJsonArray("paths"); long totalChunks = body.getLong("totalChunks", (long)paths.size()); long remainingChunks = body.getLong("remainingChunks", (long)paths.size()); // execute bulk request long startTimeStamp = System.currentTimeMillis(); onDeletingStarted(startTimeStamp, paths, totalChunks, remainingChunks); return client.bulkDelete(TYPE_NAME, paths).flatMap(bres -> { long stopTimeStamp = System.currentTimeMillis(); if (client.bulkResponseHasErrors(bres)) { String error = client.bulkResponseGetErrorMessage(bres); log.error("One or more chunks could not be deleted"); log.error(error); onDeletingFinished(stopTimeStamp - startTimeStamp, paths, totalChunks, remainingChunks, error); return Observable.error(new NoStackTraceThrowable( "One or more chunks could not be deleted")); } else { onDeletingFinished(stopTimeStamp - startTimeStamp, paths, totalChunks, remainingChunks, null); return Observable.just(null); } }); }
/** * Test no layer * @param context the test context */ @Test public void noLayer(TestContext context) { Async async = context.async(); client.getStore().delete(null, null, context.asyncAssertFailure(t -> { context.assertTrue(t instanceof NoStackTraceThrowable); async.complete(); })); }
/** * Test empty query * @param context the test context */ @Test public void emptyQuery(TestContext context) { Async async = context.async(); client.getStore().delete("", null, context.asyncAssertFailure(t -> { context.assertTrue(t instanceof NoStackTraceThrowable); async.complete(); })); }
static <R> CommandResponse<R> failure(String msg) { return failure(new NoStackTraceThrowable(msg), null); }
static <R> CommandResponse<R> failure(String msg, TxStatus txStatus) { return failure(new NoStackTraceThrowable(msg), txStatus); }
@Override public void fail(String failureMessage) { fail(new NoStackTraceThrowable(failureMessage)); }
/** * Create an Elasticsearch client. Either start an Elasticsearch instance or * connect to an external one - depending on the configuration. * @param indexName the name of the index the Elasticsearch client will * operate on * @return an observable emitting an Elasticsearch client and runner */ public Observable<ElasticsearchClient> createElasticsearchClient(String indexName) { JsonObject config = vertx.getOrCreateContext().config(); boolean embedded = config.getBoolean(ConfigConstants.INDEX_ELASTICSEARCH_EMBEDDED, true); String host = config.getString(ConfigConstants.INDEX_ELASTICSEARCH_HOST, "localhost"); int port = config.getInteger(ConfigConstants.INDEX_ELASTICSEARCH_PORT, 9200); ElasticsearchClient client = new RemoteElasticsearchClient(host, port, indexName, vertx); if (!embedded) { // just return the client return Observable.just(client); } return client.isRunning().flatMap(running -> { if (running) { // we don't have to start Elasticsearch again return Observable.just(client); } String home = config.getString(ConfigConstants.HOME); String defaultElasticsearchDownloadUrl; try { defaultElasticsearchDownloadUrl = IOUtils.toString(getClass().getResource( "/elasticsearch_download_url.txt"), StandardCharsets.UTF_8); } catch (IOException e) { return Observable.error(e); } String elasticsearchDownloadUrl = config.getString( ConfigConstants.INDEX_ELASTICSEARCH_DOWNLOAD_URL, defaultElasticsearchDownloadUrl); Pattern pattern = Pattern.compile("-([0-9]\\.[0-9]\\.[0-9])\\.zip$"); Matcher matcher = pattern.matcher(elasticsearchDownloadUrl); if (!matcher.find()) { return Observable.error(new NoStackTraceThrowable("Could not extract " + "version number from Elasticsearch download URL: " + elasticsearchDownloadUrl)); } String elasticsearchVersion = matcher.group(1); String elasticsearchInstallPath = config.getString( ConfigConstants.INDEX_ELASTICSEARCH_INSTALL_PATH, home + "/elasticsearch/" + elasticsearchVersion); // install Elasticsearch, start it and then create the client ElasticsearchInstaller installer = new ElasticsearchInstaller(vertx); ElasticsearchRunner runner = new ElasticsearchRunner(vertx); return installer.download(elasticsearchDownloadUrl, elasticsearchInstallPath) .flatMap(path -> runner.runElasticsearch(host, port, path)) .flatMap(v -> runner.waitUntilElasticsearchRunning(client)) .map(v -> new EmbeddedElasticsearchClient(client, runner)); }); }
/** * Import a file from the given read stream into the store. Inspect the file's * content type and forward to the correct import method. * @param contentType the file's content type * @param f the file to import * @param correlationId a unique identifier for this import process * @param filename the name of the file currently being imported * @param timestamp denotes when the import process has started * @param layer the layer where the file should be stored (may be null) * @param tags the list of tags to attach to the file (may be null) * @param properties the map of properties to attach to the file (may be null) * @param fallbackCRSString the CRS which should be used if the imported * file does not specify one (may be <code>null</code>) * @return a single that will emit with the number if chunks imported * when the file has been imported */ protected Single<Integer> importFile(String contentType, ReadStream<Buffer> f, String correlationId, String filename, long timestamp, String layer, List<String> tags, Map<String, Object> properties, String fallbackCRSString) { if (belongsTo(contentType, "application", "xml") || belongsTo(contentType, "text", "xml")) { return importXML(f, correlationId, filename, timestamp, layer, tags, properties, fallbackCRSString); } else if (belongsTo(contentType, "application", "json")) { return importJSON(f, correlationId, filename, timestamp, layer, tags, properties); } else { return Single.error(new NoStackTraceThrowable(String.format( "Received an unexpected content type '%s' while trying to import " + "file '%s'", contentType, filename))); } }
private void handleError(String message) { log.debug("handleError:" + message); errorHandler.handle(new NoStackTraceThrowable(message)); }
private void handleError(String message) { handleError(new NoStackTraceThrowable(message)); }
private void handleError(String message) { errorHandler.handle(new NoStackTraceThrowable(message)); }