public static QueryRunner createLocalQueryRunner() throws Exception { Session defaultSession = testSessionBuilder() .setCatalog("slack") .setSchema("default") .build(); QueryRunner queryRunner = new DistributedQueryRunner(defaultSession, 1); queryRunner.installPlugin(new SlackPlugin()); queryRunner.createCatalog( "slack", "slack", ImmutableMap.of("token", System.getenv("SLACK_TOKEN"))); return queryRunner; }
public static QueryRunner createLocalQueryRunner() throws Exception { Session defaultSession = testSessionBuilder() .setCatalog("github") .setSchema("default") .build(); QueryRunner queryRunner = new DistributedQueryRunner(defaultSession, 1); queryRunner.installPlugin(new GithubPlugin()); queryRunner.createCatalog( "github", "github", ImmutableMap.of("token", System.getenv("GITHUB_TOKEN"))); return queryRunner; }
private static QueryRunner createLocalQueryRunner() throws Exception { Session defaultSession = TestingSession.testSessionBuilder() .setCatalog("twitter") .setSchema("default") .build(); QueryRunner queryRunner = new DistributedQueryRunner(defaultSession, 1); queryRunner.installPlugin(new TwitterPlugin()); queryRunner.createCatalog( "twitter", "twitter", ImmutableMap.of( "customer_key", System.getenv("TWITTER_CUSTOMER_KEY"), "customer_secret", System.getenv("TWITTER_CUSTOMER_SECRET"), "token", System.getenv("TWITTER_TOKEN"), "secret", System.getenv("TWITTER_SECRET"))); return queryRunner; }
public static void assertUpdate(QueryRunner queryRunner, Session session, @Language("SQL") String sql, OptionalLong count) { long start = System.nanoTime(); MaterializedResult results = queryRunner.execute(session, sql); log.info("FINISHED in presto: %s", nanosSince(start)); if (!results.getUpdateType().isPresent()) { fail("update type is not set"); } if (results.getUpdateCount().isPresent()) { if (!count.isPresent()) { fail("update count should not be present"); } assertEquals(results.getUpdateCount().isPresent(), count.isPresent(), "update count"); } else if (count.isPresent()) { fail("update count is not present"); } }
/** * Install the plugin into the given query runner, using the mock client and the given table descriptions. * * The plug in is returned so that the injector can be accessed and other setup items tested. * * @param queryRunner * @param streamDescriptions * @return */ public static KinesisPlugin installKinesisPlugin(QueryRunner queryRunner, Map<SchemaTableName, KinesisStreamDescription> streamDescriptions) { KinesisPlugin kinesisPlugin = createPluginInstance(); // Note: function literal with provided descriptions instead of KinesisTableDescriptionSupplier: kinesisPlugin.setTableDescriptionSupplier(() -> streamDescriptions); kinesisPlugin.setAltProviderClass(KinesisTestClientManager.class); queryRunner.installPlugin(kinesisPlugin); Map<String, String> kinesisConfig = ImmutableMap.of( "kinesis.default-schema", "default", "kinesis.access-key", "", "kinesis.secret-key", ""); queryRunner.createCatalog("kinesis", "kinesis", kinesisConfig); return kinesisPlugin; }
public static void installKafkaPlugin(EmbeddedKafka embeddedKafka, QueryRunner queryRunner, Map<SchemaTableName, KafkaTopicDescription> topicDescriptions) { KafkaPlugin kafkaPlugin = new KafkaPlugin(); kafkaPlugin.setTableDescriptionSupplier(() -> topicDescriptions); queryRunner.installPlugin(kafkaPlugin); Map<String, String> kafkaConfig = ImmutableMap.of( "kafka.nodes", embeddedKafka.getConnectString(), "kafka.table-names", Joiner.on(",").join(topicDescriptions.keySet()), "kafka.connect-timeout", "120s", "kafka.default-schema", "default"); queryRunner.createCatalog("kafka", "kafka", kafkaConfig); }
public static void installRedisPlugin(EmbeddedRedis embeddedRedis, QueryRunner queryRunner, Map<SchemaTableName, RedisTableDescription> tableDescriptions) { RedisPlugin redisPlugin = new RedisPlugin(); redisPlugin.setTableDescriptionSupplier(Suppliers.ofInstance(tableDescriptions)); queryRunner.installPlugin(redisPlugin); Map<String, String> redisConfig = ImmutableMap.of( "redis.nodes", embeddedRedis.getConnectString() + ":" + embeddedRedis.getPort(), "redis.table-names", Joiner.on(",").join(tableDescriptions.keySet()), "redis.default-schema", "default", "redis.hide-internal-columns", "true", "redis.key-prefix-schema-table", "true"); queryRunner.createCatalog("redis", "redis", redisConfig); }
public static void assertApproximateQuery( QueryRunner queryRunner, Session session, @Language("SQL") String actual, H2QueryRunner h2QueryRunner, @Language("SQL") String expected) throws Exception { long start = System.nanoTime(); MaterializedResult actualResults = queryRunner.execute(session, actual); log.info("FINISHED in %s", nanosSince(start)); MaterializedResult expectedResults = h2QueryRunner.execute(session, expected, actualResults.getTypes()); assertApproximatelyEqual(actualResults.getMaterializedRows(), expectedResults.getMaterializedRows()); }
public static void copyTpchTables( QueryRunner queryRunner, String sourceCatalog, String sourceSchema, Session session, Iterable<TpchTable<?>> tables) throws Exception { log.info("Loading data from %s.%s...", sourceCatalog, sourceSchema); long startTime = System.nanoTime(); for (TpchTable<?> table : tables) { copyTable(queryRunner, sourceCatalog, sourceSchema, table.getTableName().toLowerCase(ENGLISH), session); } log.info("Loading from %s.%s complete in %s", sourceCatalog, sourceSchema, nanosSince(startTime).toString(SECONDS)); }
public static void copyAllTables(QueryRunner queryRunner, String sourceCatalog, String sourceSchema, Session session) throws Exception { for (QualifiedObjectName table : queryRunner.listTables(session, sourceCatalog, sourceSchema)) { if (table.getObjectName().equalsIgnoreCase("dual")) { continue; } copyTable(queryRunner, table, session); } }
public static void copyTable(QueryRunner queryRunner, QualifiedObjectName table, Session session) { long start = System.nanoTime(); log.info("Running import for %s", table.getObjectName()); @Language("SQL") String sql = format("CREATE TABLE %s AS SELECT * FROM %s", table.getObjectName(), table); long rows = checkType(queryRunner.execute(session, sql).getMaterializedRows().get(0).getField(0), Long.class, "rows"); log.info("Imported %s rows for %s in %s", rows, table.getObjectName(), nanosSince(start).convertToMostSuccinctTimeUnit()); }
@Test(timeOut = 240_000, expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*Query exceeded max memory size of 1kB.*") public void testQueryMemoryLimit() throws Exception { Map<String, String> properties = ImmutableMap.<String, String>builder() .put("query.max-memory", "1kB") .put("task.operator-pre-allocated-memory", "0B") .build(); try (QueryRunner queryRunner = createQueryRunner(SESSION, properties)) { queryRunner.execute(SESSION, "SELECT COUNT(*), clerk FROM orders GROUP BY clerk"); } }
@Test(timeOut = 240_000, expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*Query exceeded local memory limit of 1kB.*") public void testQueryMemoryPerNodeLimit() throws Exception { Map<String, String> properties = ImmutableMap.<String, String>builder() .put("query.max-memory-per-node", "1kB") .put("task.operator-pre-allocated-memory", "0B") .build(); try (QueryRunner queryRunner = createQueryRunner(SESSION, properties)) { queryRunner.execute(SESSION, "SELECT COUNT(*), clerk FROM orders GROUP BY clerk"); } }
public static QueryRunner createPostgreSqlQueryRunner(TestingPostgreSqlServer server, Iterable<TpchTable<?>> tables) throws Exception { DistributedQueryRunner queryRunner = null; try { queryRunner = new DistributedQueryRunner(createSession(), 3); queryRunner.installPlugin(new TpchPlugin()); queryRunner.createCatalog("tpch", "tpch"); Map<String, String> properties = ImmutableMap.<String, String>builder() .put("connection-url", server.getJdbcUrl()) .put("allow-drop-table", "true") .build(); createSchema(server.getJdbcUrl(), "tpch"); queryRunner.installPlugin(new PostgreSqlPlugin()); queryRunner.createCatalog("postgresql", "postgresql", properties); copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), tables); return queryRunner; } catch (Throwable e) { closeAllSuppress(e, queryRunner, server); throw e; } }
public static QueryRunner createMySqlQueryRunner(TestingMySqlServer server, Iterable<TpchTable<?>> tables) throws Exception { DistributedQueryRunner queryRunner = null; try { queryRunner = new DistributedQueryRunner(createSession(), 3); queryRunner.installPlugin(new TpchPlugin()); queryRunner.createCatalog("tpch", "tpch"); Map<String, String> properties = ImmutableMap.<String, String>builder() .put("connection-url", server.getJdbcUrl()) .put("allow-drop-table", "true") .build(); queryRunner.installPlugin(new MySqlPlugin()); queryRunner.createCatalog("mysql", "mysql", properties); copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), tables); return queryRunner; } catch (Throwable e) { closeAllSuppress(e, queryRunner, server); throw e; } }
/** * Install the plug in into the given query runner, using normal setup but with the given table descriptions. * * Note that this uses the actual client and will incur charges from AWS when run. Mainly for full * integration tests. * * @param queryRunner * @param streamDescriptions * @param accessKey * @param secretKey */ public static void installKinesisPlugin(QueryRunner queryRunner, Map<SchemaTableName, KinesisStreamDescription> streamDescriptions, String accessKey, String secretKey) { KinesisPlugin kinesisPlugin = createPluginInstance(); // Note: function literal with provided descriptions instead of KinesisTableDescriptionSupplier: kinesisPlugin.setTableDescriptionSupplier(() -> streamDescriptions); queryRunner.installPlugin(kinesisPlugin); Map<String, String> kinesisConfig = ImmutableMap.of( "kinesis.default-schema", "default", "kinesis.access-key", accessKey, "kinesis.secret-key", secretKey); queryRunner.createCatalog("kinesis", "kinesis", kinesisConfig); }
public static void assertQuery(QueryRunner actualQueryRunner, Session session, @Language("SQL") String actual, H2QueryRunner h2QueryRunner, @Language("SQL") String expected, boolean ensureOrdering, boolean compareUpdate) throws Exception { long start = System.nanoTime(); MaterializedResult actualResults = actualQueryRunner.execute(session, actual).toJdbcTypes(); Duration actualTime = nanosSince(start); long expectedStart = System.nanoTime(); MaterializedResult expectedResults = h2QueryRunner.execute(session, expected, actualResults.getTypes()); log.info("FINISHED in presto: %s, h2: %s, total: %s", actualTime, nanosSince(expectedStart), nanosSince(start)); if (actualResults.getUpdateType().isPresent() || actualResults.getUpdateCount().isPresent()) { if (!actualResults.getUpdateType().isPresent()) { fail("update count present without update type"); } if (!compareUpdate) { fail("update type should not be present (use assertUpdate)"); } } List<MaterializedRow> actualRows = actualResults.getMaterializedRows(); List<MaterializedRow> expectedRows = expectedResults.getMaterializedRows(); if (compareUpdate) { if (!actualResults.getUpdateType().isPresent()) { fail("update type not present"); } if (!actualResults.getUpdateCount().isPresent()) { fail("update count not present"); } assertEquals(actualRows.size(), 1); assertEquals(expectedRows.size(), 1); MaterializedRow row = expectedRows.get(0); assertEquals(row.getFieldCount(), 1); assertEquals(row.getField(0), actualResults.getUpdateCount().getAsLong()); } if (ensureOrdering) { if (!actualRows.equals(expectedRows)) { assertEquals(actualRows, expectedRows); } } else { assertEqualsIgnoreOrder(actualRows, expectedRows); } }
public static void copyTable(QueryRunner queryRunner, String sourceCatalog, String sourceSchema, String sourceTable, Session session) throws Exception { QualifiedObjectName table = new QualifiedObjectName(sourceCatalog, sourceSchema, sourceTable); copyTable(queryRunner, table, session); }
protected AbstractTestQueries(QueryRunner queryRunner) { super(queryRunner); }
protected AbstractTestQueryFramework(QueryRunner queryRunner) { this.queryRunner = queryRunner; h2QueryRunner = new H2QueryRunner(); sqlParser = new SqlParser(); }
protected AbstractTestIntegrationSmokeTest(QueryRunner queryRunner) { this(queryRunner, Optional.empty()); }
protected AbstractTestIntegrationSmokeTest(QueryRunner queryRunner, Session sampledSession) { this(queryRunner, Optional.of(requireNonNull(sampledSession, "sampledSession is null"))); }
private AbstractTestIntegrationSmokeTest(QueryRunner queryRunner, Optional<Session> sampledSession) { super(queryRunner); this.sampledSession = requireNonNull(sampledSession, "sampledSession is null"); }
protected AbstractTestApproximateQueries(QueryRunner queryRunner) { this(queryRunner, Optional.empty()); }
protected AbstractTestApproximateQueries(QueryRunner queryRunner, Session sampledSession) { this(queryRunner, Optional.of(requireNonNull(sampledSession, "sampledSession is null"))); }
private AbstractTestApproximateQueries(QueryRunner queryRunner, Optional<Session> sampledSession) { super(queryRunner); this.sampledSession = requireNonNull(sampledSession, "sampledSession is null"); }
protected AbstractTestDistributedQueries(QueryRunner queryRunner) { super(queryRunner); }
protected AbstractTestDistributedQueries(QueryRunner queryRunner, Session sampledSession) { super(queryRunner, sampledSession); }
protected AbstractTestIndexedQueries(QueryRunner queryRunner) { super(queryRunner); }
public static QueryRunner createPostgreSqlQueryRunner(TestingPostgreSqlServer server, TpchTable<?>... tables) throws Exception { return createPostgreSqlQueryRunner(server, ImmutableList.copyOf(tables)); }
public static QueryRunner createMySqlQueryRunner(TestingMySqlServer server, TpchTable<?>... tables) throws Exception { return createMySqlQueryRunner(server, ImmutableList.copyOf(tables)); }
public static QueryRunner createQueryRunner(TpchTable<?>... tables) throws Exception { return createQueryRunner(ImmutableList.copyOf(tables)); }
public static QueryRunner createQueryRunner(Iterable<TpchTable<?>> tables) throws Exception { assertEquals(DateTimeZone.getDefault(), TIME_ZONE, "Timezone not configured correctly. Add -Duser.timezone=Asia/Katmandu to your JVM arguments"); DistributedQueryRunner queryRunner = new DistributedQueryRunner(createSession(), 4); try { queryRunner.installPlugin(new TpchPlugin()); queryRunner.createCatalog("tpch", "tpch"); queryRunner.installPlugin(new SampledTpchPlugin()); queryRunner.createCatalog("tpch_sampled", "tpch_sampled"); File baseDir = queryRunner.getCoordinator().getBaseDataDir().resolve("hive_data").toFile(); InMemoryHiveMetastore metastore = new InMemoryHiveMetastore(baseDir); metastore.createDatabase(createDatabaseMetastoreObject(baseDir, "tpch")); metastore.createDatabase(createDatabaseMetastoreObject(baseDir, "tpch_sampled")); queryRunner.installPlugin(new HivePlugin(HIVE_CATALOG, metastore)); Map<String, String> hiveProperties = ImmutableMap.<String, String>builder() .put("hive.metastore.uri", "thrift://localhost:8080") .put("hive.allow-add-column", "true") .put("hive.allow-drop-table", "true") .put("hive.allow-rename-table", "true") .put("hive.allow-rename-column", "true") .put("hive.time-zone", TIME_ZONE.getID()) .put("hive.security", "sql-standard") .build(); queryRunner.createCatalog(HIVE_CATALOG, HIVE_CATALOG, hiveProperties); copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), tables); copyTpchTables(queryRunner, "tpch_sampled", TINY_SCHEMA_NAME, createSampledSession(), tables); return queryRunner; } catch (Exception e) { queryRunner.close(); throw e; } }
public QueryRunner get() { return queryRunner; }