Java 类com.facebook.presto.testing.QueryRunner 实例源码
项目:presto-rest
文件:TestSlackQueries.java
public static QueryRunner createLocalQueryRunner()
throws Exception
{
Session defaultSession = testSessionBuilder()
.setCatalog("slack")
.setSchema("default")
.build();
QueryRunner queryRunner = new DistributedQueryRunner(defaultSession, 1);
queryRunner.installPlugin(new SlackPlugin());
queryRunner.createCatalog(
"slack",
"slack",
ImmutableMap.of("token", System.getenv("SLACK_TOKEN")));
return queryRunner;
}
项目:presto-rest
文件:TestGithubQueries.java
public static QueryRunner createLocalQueryRunner()
throws Exception
{
Session defaultSession = testSessionBuilder()
.setCatalog("github")
.setSchema("default")
.build();
QueryRunner queryRunner = new DistributedQueryRunner(defaultSession, 1);
queryRunner.installPlugin(new GithubPlugin());
queryRunner.createCatalog(
"github",
"github",
ImmutableMap.of("token", System.getenv("GITHUB_TOKEN")));
return queryRunner;
}
项目:presto-rest
文件:TestTwitterQueries.java
private static QueryRunner createLocalQueryRunner()
throws Exception
{
Session defaultSession = TestingSession.testSessionBuilder()
.setCatalog("twitter")
.setSchema("default")
.build();
QueryRunner queryRunner = new DistributedQueryRunner(defaultSession, 1);
queryRunner.installPlugin(new TwitterPlugin());
queryRunner.createCatalog(
"twitter",
"twitter",
ImmutableMap.of(
"customer_key", System.getenv("TWITTER_CUSTOMER_KEY"),
"customer_secret", System.getenv("TWITTER_CUSTOMER_SECRET"),
"token", System.getenv("TWITTER_TOKEN"),
"secret", System.getenv("TWITTER_SECRET")));
return queryRunner;
}
项目:presto
文件:QueryAssertions.java
public static void assertUpdate(QueryRunner queryRunner, Session session, @Language("SQL") String sql, OptionalLong count)
{
long start = System.nanoTime();
MaterializedResult results = queryRunner.execute(session, sql);
log.info("FINISHED in presto: %s", nanosSince(start));
if (!results.getUpdateType().isPresent()) {
fail("update type is not set");
}
if (results.getUpdateCount().isPresent()) {
if (!count.isPresent()) {
fail("update count should not be present");
}
assertEquals(results.getUpdateCount().isPresent(), count.isPresent(), "update count");
}
else if (count.isPresent()) {
fail("update count is not present");
}
}
项目:presto-kinesis
文件:TestUtils.java
/**
* Install the plugin into the given query runner, using the mock client and the given table descriptions.
*
* The plug in is returned so that the injector can be accessed and other setup items tested.
*
* @param queryRunner
* @param streamDescriptions
* @return
*/
public static KinesisPlugin installKinesisPlugin(QueryRunner queryRunner, Map<SchemaTableName, KinesisStreamDescription> streamDescriptions)
{
KinesisPlugin kinesisPlugin = createPluginInstance();
// Note: function literal with provided descriptions instead of KinesisTableDescriptionSupplier:
kinesisPlugin.setTableDescriptionSupplier(() -> streamDescriptions);
kinesisPlugin.setAltProviderClass(KinesisTestClientManager.class);
queryRunner.installPlugin(kinesisPlugin);
Map<String, String> kinesisConfig = ImmutableMap.of(
"kinesis.default-schema", "default",
"kinesis.access-key", "",
"kinesis.secret-key", "");
queryRunner.createCatalog("kinesis", "kinesis", kinesisConfig);
return kinesisPlugin;
}
项目:presto
文件:TestUtils.java
public static void installKafkaPlugin(EmbeddedKafka embeddedKafka, QueryRunner queryRunner, Map<SchemaTableName, KafkaTopicDescription> topicDescriptions)
{
KafkaPlugin kafkaPlugin = new KafkaPlugin();
kafkaPlugin.setTableDescriptionSupplier(() -> topicDescriptions);
queryRunner.installPlugin(kafkaPlugin);
Map<String, String> kafkaConfig = ImmutableMap.of(
"kafka.nodes", embeddedKafka.getConnectString(),
"kafka.table-names", Joiner.on(",").join(topicDescriptions.keySet()),
"kafka.connect-timeout", "120s",
"kafka.default-schema", "default");
queryRunner.createCatalog("kafka", "kafka", kafkaConfig);
}
项目:presto
文件:RedisTestUtils.java
public static void installRedisPlugin(EmbeddedRedis embeddedRedis, QueryRunner queryRunner, Map<SchemaTableName, RedisTableDescription> tableDescriptions)
{
RedisPlugin redisPlugin = new RedisPlugin();
redisPlugin.setTableDescriptionSupplier(Suppliers.ofInstance(tableDescriptions));
queryRunner.installPlugin(redisPlugin);
Map<String, String> redisConfig = ImmutableMap.of(
"redis.nodes", embeddedRedis.getConnectString() + ":" + embeddedRedis.getPort(),
"redis.table-names", Joiner.on(",").join(tableDescriptions.keySet()),
"redis.default-schema", "default",
"redis.hide-internal-columns", "true",
"redis.key-prefix-schema-table", "true");
queryRunner.createCatalog("redis", "redis", redisConfig);
}
项目:presto
文件:QueryAssertions.java
public static void assertApproximateQuery(
QueryRunner queryRunner,
Session session,
@Language("SQL") String actual,
H2QueryRunner h2QueryRunner,
@Language("SQL") String expected)
throws Exception
{
long start = System.nanoTime();
MaterializedResult actualResults = queryRunner.execute(session, actual);
log.info("FINISHED in %s", nanosSince(start));
MaterializedResult expectedResults = h2QueryRunner.execute(session, expected, actualResults.getTypes());
assertApproximatelyEqual(actualResults.getMaterializedRows(), expectedResults.getMaterializedRows());
}
项目:presto
文件:QueryAssertions.java
public static void copyTpchTables(
QueryRunner queryRunner,
String sourceCatalog,
String sourceSchema,
Session session,
Iterable<TpchTable<?>> tables)
throws Exception
{
log.info("Loading data from %s.%s...", sourceCatalog, sourceSchema);
long startTime = System.nanoTime();
for (TpchTable<?> table : tables) {
copyTable(queryRunner, sourceCatalog, sourceSchema, table.getTableName().toLowerCase(ENGLISH), session);
}
log.info("Loading from %s.%s complete in %s", sourceCatalog, sourceSchema, nanosSince(startTime).toString(SECONDS));
}
项目:presto
文件:QueryAssertions.java
public static void copyAllTables(QueryRunner queryRunner, String sourceCatalog, String sourceSchema, Session session)
throws Exception
{
for (QualifiedObjectName table : queryRunner.listTables(session, sourceCatalog, sourceSchema)) {
if (table.getObjectName().equalsIgnoreCase("dual")) {
continue;
}
copyTable(queryRunner, table, session);
}
}
项目:presto
文件:QueryAssertions.java
public static void copyTable(QueryRunner queryRunner, QualifiedObjectName table, Session session)
{
long start = System.nanoTime();
log.info("Running import for %s", table.getObjectName());
@Language("SQL") String sql = format("CREATE TABLE %s AS SELECT * FROM %s", table.getObjectName(), table);
long rows = checkType(queryRunner.execute(session, sql).getMaterializedRows().get(0).getField(0), Long.class, "rows");
log.info("Imported %s rows for %s in %s", rows, table.getObjectName(), nanosSince(start).convertToMostSuccinctTimeUnit());
}
项目:presto
文件:TestMemoryManager.java
@Test(timeOut = 240_000, expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*Query exceeded max memory size of 1kB.*")
public void testQueryMemoryLimit()
throws Exception
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("query.max-memory", "1kB")
.put("task.operator-pre-allocated-memory", "0B")
.build();
try (QueryRunner queryRunner = createQueryRunner(SESSION, properties)) {
queryRunner.execute(SESSION, "SELECT COUNT(*), clerk FROM orders GROUP BY clerk");
}
}
项目:presto
文件:TestMemoryManager.java
@Test(timeOut = 240_000, expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*Query exceeded local memory limit of 1kB.*")
public void testQueryMemoryPerNodeLimit()
throws Exception
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("query.max-memory-per-node", "1kB")
.put("task.operator-pre-allocated-memory", "0B")
.build();
try (QueryRunner queryRunner = createQueryRunner(SESSION, properties)) {
queryRunner.execute(SESSION, "SELECT COUNT(*), clerk FROM orders GROUP BY clerk");
}
}
项目:presto
文件:PostgreSqlQueryRunner.java
public static QueryRunner createPostgreSqlQueryRunner(TestingPostgreSqlServer server, Iterable<TpchTable<?>> tables)
throws Exception
{
DistributedQueryRunner queryRunner = null;
try {
queryRunner = new DistributedQueryRunner(createSession(), 3);
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("connection-url", server.getJdbcUrl())
.put("allow-drop-table", "true")
.build();
createSchema(server.getJdbcUrl(), "tpch");
queryRunner.installPlugin(new PostgreSqlPlugin());
queryRunner.createCatalog("postgresql", "postgresql", properties);
copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), tables);
return queryRunner;
}
catch (Throwable e) {
closeAllSuppress(e, queryRunner, server);
throw e;
}
}
项目:presto
文件:MySqlQueryRunner.java
public static QueryRunner createMySqlQueryRunner(TestingMySqlServer server, Iterable<TpchTable<?>> tables)
throws Exception
{
DistributedQueryRunner queryRunner = null;
try {
queryRunner = new DistributedQueryRunner(createSession(), 3);
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("connection-url", server.getJdbcUrl())
.put("allow-drop-table", "true")
.build();
queryRunner.installPlugin(new MySqlPlugin());
queryRunner.createCatalog("mysql", "mysql", properties);
copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), tables);
return queryRunner;
}
catch (Throwable e) {
closeAllSuppress(e, queryRunner, server);
throw e;
}
}
项目:presto-kinesis
文件:TestUtils.java
/**
* Install the plug in into the given query runner, using normal setup but with the given table descriptions.
*
* Note that this uses the actual client and will incur charges from AWS when run. Mainly for full
* integration tests.
*
* @param queryRunner
* @param streamDescriptions
* @param accessKey
* @param secretKey
*/
public static void installKinesisPlugin(QueryRunner queryRunner, Map<SchemaTableName, KinesisStreamDescription> streamDescriptions, String accessKey, String secretKey)
{
KinesisPlugin kinesisPlugin = createPluginInstance();
// Note: function literal with provided descriptions instead of KinesisTableDescriptionSupplier:
kinesisPlugin.setTableDescriptionSupplier(() -> streamDescriptions);
queryRunner.installPlugin(kinesisPlugin);
Map<String, String> kinesisConfig = ImmutableMap.of(
"kinesis.default-schema", "default",
"kinesis.access-key", accessKey,
"kinesis.secret-key", secretKey);
queryRunner.createCatalog("kinesis", "kinesis", kinesisConfig);
}
项目:presto
文件:QueryAssertions.java
public static void assertQuery(QueryRunner actualQueryRunner,
Session session,
@Language("SQL") String actual,
H2QueryRunner h2QueryRunner,
@Language("SQL") String expected,
boolean ensureOrdering,
boolean compareUpdate)
throws Exception
{
long start = System.nanoTime();
MaterializedResult actualResults = actualQueryRunner.execute(session, actual).toJdbcTypes();
Duration actualTime = nanosSince(start);
long expectedStart = System.nanoTime();
MaterializedResult expectedResults = h2QueryRunner.execute(session, expected, actualResults.getTypes());
log.info("FINISHED in presto: %s, h2: %s, total: %s", actualTime, nanosSince(expectedStart), nanosSince(start));
if (actualResults.getUpdateType().isPresent() || actualResults.getUpdateCount().isPresent()) {
if (!actualResults.getUpdateType().isPresent()) {
fail("update count present without update type");
}
if (!compareUpdate) {
fail("update type should not be present (use assertUpdate)");
}
}
List<MaterializedRow> actualRows = actualResults.getMaterializedRows();
List<MaterializedRow> expectedRows = expectedResults.getMaterializedRows();
if (compareUpdate) {
if (!actualResults.getUpdateType().isPresent()) {
fail("update type not present");
}
if (!actualResults.getUpdateCount().isPresent()) {
fail("update count not present");
}
assertEquals(actualRows.size(), 1);
assertEquals(expectedRows.size(), 1);
MaterializedRow row = expectedRows.get(0);
assertEquals(row.getFieldCount(), 1);
assertEquals(row.getField(0), actualResults.getUpdateCount().getAsLong());
}
if (ensureOrdering) {
if (!actualRows.equals(expectedRows)) {
assertEquals(actualRows, expectedRows);
}
}
else {
assertEqualsIgnoreOrder(actualRows, expectedRows);
}
}
项目:presto
文件:QueryAssertions.java
public static void copyTable(QueryRunner queryRunner, String sourceCatalog, String sourceSchema, String sourceTable, Session session)
throws Exception
{
QualifiedObjectName table = new QualifiedObjectName(sourceCatalog, sourceSchema, sourceTable);
copyTable(queryRunner, table, session);
}
项目:presto
文件:AbstractTestQueries.java
protected AbstractTestQueries(QueryRunner queryRunner)
{
super(queryRunner);
}
项目:presto
文件:AbstractTestQueryFramework.java
protected AbstractTestQueryFramework(QueryRunner queryRunner)
{
this.queryRunner = queryRunner;
h2QueryRunner = new H2QueryRunner();
sqlParser = new SqlParser();
}
项目:presto
文件:AbstractTestIntegrationSmokeTest.java
protected AbstractTestIntegrationSmokeTest(QueryRunner queryRunner)
{
this(queryRunner, Optional.empty());
}
项目:presto
文件:AbstractTestIntegrationSmokeTest.java
protected AbstractTestIntegrationSmokeTest(QueryRunner queryRunner, Session sampledSession)
{
this(queryRunner, Optional.of(requireNonNull(sampledSession, "sampledSession is null")));
}
项目:presto
文件:AbstractTestIntegrationSmokeTest.java
private AbstractTestIntegrationSmokeTest(QueryRunner queryRunner, Optional<Session> sampledSession)
{
super(queryRunner);
this.sampledSession = requireNonNull(sampledSession, "sampledSession is null");
}
项目:presto
文件:AbstractTestApproximateQueries.java
protected AbstractTestApproximateQueries(QueryRunner queryRunner)
{
this(queryRunner, Optional.empty());
}
项目:presto
文件:AbstractTestApproximateQueries.java
protected AbstractTestApproximateQueries(QueryRunner queryRunner, Session sampledSession)
{
this(queryRunner, Optional.of(requireNonNull(sampledSession, "sampledSession is null")));
}
项目:presto
文件:AbstractTestApproximateQueries.java
private AbstractTestApproximateQueries(QueryRunner queryRunner, Optional<Session> sampledSession)
{
super(queryRunner);
this.sampledSession = requireNonNull(sampledSession, "sampledSession is null");
}
项目:presto
文件:AbstractTestDistributedQueries.java
protected AbstractTestDistributedQueries(QueryRunner queryRunner)
{
super(queryRunner);
}
项目:presto
文件:AbstractTestDistributedQueries.java
protected AbstractTestDistributedQueries(QueryRunner queryRunner, Session sampledSession)
{
super(queryRunner, sampledSession);
}
项目:presto
文件:AbstractTestIndexedQueries.java
protected AbstractTestIndexedQueries(QueryRunner queryRunner)
{
super(queryRunner);
}
项目:presto
文件:PostgreSqlQueryRunner.java
public static QueryRunner createPostgreSqlQueryRunner(TestingPostgreSqlServer server, TpchTable<?>... tables)
throws Exception
{
return createPostgreSqlQueryRunner(server, ImmutableList.copyOf(tables));
}
项目:presto
文件:MySqlQueryRunner.java
public static QueryRunner createMySqlQueryRunner(TestingMySqlServer server, TpchTable<?>... tables)
throws Exception
{
return createMySqlQueryRunner(server, ImmutableList.copyOf(tables));
}
项目:presto
文件:HiveQueryRunner.java
public static QueryRunner createQueryRunner(TpchTable<?>... tables)
throws Exception
{
return createQueryRunner(ImmutableList.copyOf(tables));
}
项目:presto
文件:HiveQueryRunner.java
public static QueryRunner createQueryRunner(Iterable<TpchTable<?>> tables)
throws Exception
{
assertEquals(DateTimeZone.getDefault(), TIME_ZONE, "Timezone not configured correctly. Add -Duser.timezone=Asia/Katmandu to your JVM arguments");
DistributedQueryRunner queryRunner = new DistributedQueryRunner(createSession(), 4);
try {
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
queryRunner.installPlugin(new SampledTpchPlugin());
queryRunner.createCatalog("tpch_sampled", "tpch_sampled");
File baseDir = queryRunner.getCoordinator().getBaseDataDir().resolve("hive_data").toFile();
InMemoryHiveMetastore metastore = new InMemoryHiveMetastore(baseDir);
metastore.createDatabase(createDatabaseMetastoreObject(baseDir, "tpch"));
metastore.createDatabase(createDatabaseMetastoreObject(baseDir, "tpch_sampled"));
queryRunner.installPlugin(new HivePlugin(HIVE_CATALOG, metastore));
Map<String, String> hiveProperties = ImmutableMap.<String, String>builder()
.put("hive.metastore.uri", "thrift://localhost:8080")
.put("hive.allow-add-column", "true")
.put("hive.allow-drop-table", "true")
.put("hive.allow-rename-table", "true")
.put("hive.allow-rename-column", "true")
.put("hive.time-zone", TIME_ZONE.getID())
.put("hive.security", "sql-standard")
.build();
queryRunner.createCatalog(HIVE_CATALOG, HIVE_CATALOG, hiveProperties);
copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), tables);
copyTpchTables(queryRunner, "tpch_sampled", TINY_SCHEMA_NAME, createSampledSession(), tables);
return queryRunner;
}
catch (Exception e) {
queryRunner.close();
throw e;
}
}
项目:presto-bloomfilter
文件:TestBloomFilterQueries.java
public QueryRunner get()
{
return queryRunner;
}