public Object replaceParameter(Object parameter, Class clazz) { if (parameter instanceof Short && Short.class.equals(clazz)) { parameter = ((Short) parameter).intValue(); } else if (parameter instanceof Byte && Byte.class.equals(clazz)) { parameter = ((Byte) parameter).intValue(); } else if (parameter instanceof LocalDate && java.sql.Date.class.equals(clazz)) { parameter = new Date(java.sql.Date.valueOf(String.valueOf(parameter)).getTime()); } else if (parameter instanceof String && Time.class.equals(clazz)) { parameter = LocalTime.parse((String) parameter).toDateTimeToday().toString(); } else if (parameter instanceof Long && Time.class.equals(clazz)) { parameter = LocalTime.fromMillisOfDay((Long) parameter / 1000000L).toDateTimeToday().getMillis(); } else if (parameter instanceof Time && Time.class.equals(clazz)) { parameter = LocalTime.fromDateFields((Time) parameter).toDateTimeToday().toDate(); } else if (parameter instanceof LocalTime && Time.class.equals(clazz)) { parameter = ((LocalTime) parameter).toDateTimeToday().toString(); } return parameter; }
ResultSetFuture getFlightInfo(String origin, String dest, LocalDate flightDate) { Statement statement; if (origin != null) { statement = QueryBuilder .select() .all() .from("capstone", "flightinfo_origin") .where(eq("origin", origin)) .and(eq("flightdate", flightDate)); } else { statement = QueryBuilder .select() .all() .from("capstone", "flightinfo_dest") .allowFiltering() .where(eq("dest", dest)) .and(eq("flightdate", flightDate)); } return connect.executeAsync(statement); }
BoundStatement bindRow(Row row) { for (int i = 0 ; i < argumentIndex.length ; i++) { Object value = row.get(argumentIndex[i]); if (definitions.getType(i).getName().equals(DataType.date().getName())) { // the java driver only accepts com.datastax.driver.core.LocalDate for CQL type "DATE" value= LocalDate.fromDaysSinceEpoch((Integer) value); } bindBuffer[i] = value; if (bindBuffer[i] == null && !spec.partitionGenerator.permitNulls(argumentIndex[i])) throw new IllegalStateException(); } return statement.bind(bindBuffer); }
public static void main(String[] args) throws Exception { Cluster cluster = Cluster .builder() .addContactPoint("localhost") .build(); session = cluster.connect(); String sql = "insert into demo.stocks (date, open, high, low, close, volume, adjclose, symbol) values (?, ?, ?, ?, ?, ?, ?, ?)"; loadStatement = session.prepare(sql); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); List<String[]> rows = readData("/home/training/Downloads/datasets/stocks.csv"); for(String[] tokens: rows) { if(!tokens[0].startsWith("date")) { Date date = simpleDateFormat.parse(tokens[0]); LocalDate localDate = LocalDate.fromMillisSinceEpoch(date.getTime()); Double open = Double.valueOf(tokens[1]); Double high= Double.valueOf(tokens[2]); Double low= Double.valueOf(tokens[3]); Double close= Double.valueOf(tokens[4]); Double volume= Double.valueOf(tokens[5]); Double adjclose = Double.valueOf(tokens[6]); String symbol = tokens[7]; session.execute(loadStatement.bind(localDate, open, high, low, close, volume, adjclose, symbol)); } } session.close(); cluster.close(); }
@Test @Ignore public void testSetDateIntDate() { LocalDate v = LocalDate.fromMillisSinceEpoch(System.currentTimeMillis()); statement.setDate( 1, v ); assertArrayEquals( new String[] { "1", v.toString() }, statement.getArgumentList() ); }
public Class replaceDataType(Class clazz, Object value) { if (LocalDate.class.equals(clazz)) { clazz = Date.class; } else if (Short.class.equals(clazz) || Byte.class.equals(clazz)) { clazz = Integer.class; } else if (value instanceof Date && Long.class.equals(clazz)) { clazz = Date.class; } return clazz; }
@Override public void reduce(Text key, Iterable<TextArrayWritable> values, Context context) throws IOException, InterruptedException { String origin = origin(key.toString()); String dest = dest(key.toString()); String dateStr = departureDate(key.toString()); LocalDate depDate = date(dateStr); String[] flightData = getBestFlightWithMinArrDelay(values); if (flightData[0] != null) { persist(origin, dest, depDate, toInt(flightData[0]), toInt(flightData[1]), toInt(flightData[2])); context.write(new Text(mergeToKey(origin, dest, dateStr, flightData[0], flightData[1], flightData[2])), NullWritable.get()); } }
@SuppressWarnings("MethodWithTooManyParameters") void persist(String origin, String dest, LocalDate flightDate, Integer flightNum, Integer depTime, Integer arrDelay) { String tableName; if (depTime <= 1200) { tableName = "capstone.flightinfo_dest"; } else { tableName = "capstone.flightinfo_origin"; } String cqlQuery = "INSERT INTO " + tableName + " (origin, dest, flightDate, FlightNum, depTime, arrDelay) VALUES (?," + "?,?,?,?,?)"; PreparedStatement preparedStatement = connect.prepare(cqlQuery); BoundStatement boundStatement = preparedStatement.bind(origin, dest, flightDate, flightNum, depTime, arrDelay); connect.executeAsync(boundStatement); }
private void saveXOriginDest(String origin, String dest, LocalDate depDate, Integer flightNum, ResultSet rows) { for (Row eachFlight : rows) { String x = eachFlight.getString("dest"); LocalDate xDepDate = eachFlight.getDate("flightdate"); int xyFlightNr = eachFlight.getInt("flightnum"); if (valid(x, origin, dest)) { persist(x, origin, dest, xDepDate, depDate, xyFlightNr, flightNum); } } }
private void saveOriginDestZ(String origin, String dest, LocalDate depDate, Integer flightNum, ResultSet rows) { for (Row eachFlight : rows) { String z = eachFlight.getString("dest"); LocalDate yDepDate = eachFlight.getDate("flightdate"); int yzFlightNr = eachFlight.getInt("flightnum"); if (valid(origin, dest, z)) { persist(origin, dest, z, depDate, yDepDate, flightNum, yzFlightNr); } } }
@Test public void testGetFlightInfo() { MergeReducer reducer = new MergeReducer(); LocalDate date = LocalDate.fromYearMonthDay(2008, 1, 3); String dest = "POR"; // String origin = "NYC"; // assertTrue(reducer.getFlightInfo(origin, null, date).size() > 0); // assertTrue(reducer.getFlightInfo(null, dest, date).size() > 0); }
@Test public void testDate() { MergeReducer reducer = new MergeReducer(); LocalDate localDate = reducer.localDate(LocalDate.fromDaysSinceEpoch(1).toString()); assertNotNull(localDate); }
public LocalDate parseIt(String toparse) throws ParseException { if (null == toparse) return null; Date d = dateParser.parseIt(toparse); LocalDate ret = LocalDate.fromMillisSinceEpoch(d.getTime()); return ret; }
public String format(Object o) { LocalDate v = (LocalDate)o; if (v == null) return null; Date d = new Date(v.getMillisSinceEpoch()); return dateParser.format(d); }
/** * Returns the LocalDate based on the given String. The String can be the * number of millis since epoch or year-month-day in the form "2014-12-32". * * @param s * @return * @throws NumberFormatException */ public static LocalDate getLocalDate(String s) throws NumberFormatException { if(s == null){ throw new NumberFormatException(); } String tokens[] = s.split("-"); return (tokens.length == 3) ? LocalDate.fromYearMonthDay( Integer.parseInt(tokens[0]), Integer.parseInt(tokens[1]), Integer.parseInt(tokens[2])) : LocalDate .fromMillisSinceEpoch(Long.parseLong(s)); }
@Override public BoundStatement setDate( int i, LocalDate v ) { arguments.put( ProfilingUtilities.getIntegerString( i ), v ); return super.setDate( i, v ); }
@Override public BoundStatement setDate( String arg0, LocalDate arg1 ) { arguments.put( arg0, arg1 ); return super.setDate( arg0, arg1 ); }
@Override @SuppressWarnings("unchecked") protected Statement setStatementParameters(PreparedStatement updateCommand, Object tuple) throws DriverException { final BoundStatement boundStmnt = new BoundStatement(updateCommand); final int size = columnDataTypes.size(); for (int i = 0; i < size; i++) { final DataType type = columnDataTypes.get(i); switch (type.getName()) { case UUID: final UUID id = ((Getter<Object, UUID>)getters.get(i)).get(tuple); boundStmnt.setUUID(i, id); break; case ASCII: case VARCHAR: case TEXT: final String ascii = ((Getter<Object, String>)getters.get(i)).get(tuple); boundStmnt.setString(i, ascii); break; case BOOLEAN: final boolean bool = ((GetterBoolean<Object>)getters.get(i)).get(tuple); boundStmnt.setBool(i, bool); break; case INT: final int intValue = ((GetterInt<Object>)getters.get(i)).get(tuple); boundStmnt.setInt(i, intValue); break; case BIGINT: case COUNTER: final long longValue = ((GetterLong<Object>)getters.get(i)).get(tuple); boundStmnt.setLong(i, longValue); break; case FLOAT: final float floatValue = ((GetterFloat<Object>)getters.get(i)).get(tuple); boundStmnt.setFloat(i, floatValue); break; case DOUBLE: final double doubleValue = ((GetterDouble<Object>)getters.get(i)).get(tuple); boundStmnt.setDouble(i, doubleValue); break; case DECIMAL: final BigDecimal decimal = ((Getter<Object, BigDecimal>)getters.get(i)).get(tuple); boundStmnt.setDecimal(i, decimal); break; case SET: Set<?> set = ((Getter<Object, Set<?>>)getters.get(i)).get(tuple); boundStmnt.setSet(i, set); break; case MAP: final Map<?,?> map = ((Getter<Object, Map<?,?>>)getters.get(i)).get(tuple); boundStmnt.setMap(i, map); break; case LIST: final List<?> list = ((Getter<Object, List<?>>)getters.get(i)).get(tuple); boundStmnt.setList(i, list); break; case TIMESTAMP: final Date date = ((Getter<Object, Date>)getters.get(i)).get(tuple); boundStmnt.setDate(i, LocalDate.fromMillisSinceEpoch(date.getTime())); break; default: throw new RuntimeException("unsupported data type " + type.getName()); } } return boundStmnt; }
@Test(groups = {"unit", "server"}, dependsOnMethods = {"testInsertBasicData"}) public void testQueryBasicDataAsObject() { String cql = "select tbl.id_uuid,tbl.binary_data,tbl.date_date,tbl.date_time,tbl.date_timestamp," + "tbl.id_timeuuid,tbl.net_inet,tbl.num_big_integer,tbl.num_decimal,tbl.num_double,tbl.num_float," + "tbl.num_int,tbl.num_small_int,tbl.num_tiny_int,tbl.num_varint,tbl.str_ascii,tbl.str_text," + "tbl.str_varchar,tbl.true_or_false from \"test_drive\".\"basic_data_type\" tbl limit 1"; try { java.sql.Statement s = conn.createStatement(); assertTrue(s instanceof CassandraStatement); ResultSet rs = s.executeQuery(cql); assertTrue(rs instanceof CassandraResultSet); assertNotNull(rs); assertTrue(rs == s.getResultSet()); while (rs.next()) { // only need to read one row int index = 1; validateObjectType(rs.getObject(index++), UUID.class); validateObjectType(rs.getObject(index++), ByteBuffer.class); validateObjectType(rs.getObject(index++), LocalDate.class); validateObjectType(rs.getObject(index++), Long.class); validateObjectType(rs.getObject(index++), java.util.Date.class); validateObjectType(rs.getObject(index++), UUID.class); validateObjectType(rs.getObject(index++), InetAddress.class); validateObjectType(rs.getObject(index++), Long.class); validateObjectType(rs.getObject(index++), BigDecimal.class); validateObjectType(rs.getObject(index++), Double.class); validateObjectType(rs.getObject(index++), Float.class); validateObjectType(rs.getObject(index++), Integer.class); validateObjectType(rs.getObject(index++), Short.class); validateObjectType(rs.getObject(index++), Byte.class); validateObjectType(rs.getObject(index++), BigInteger.class); validateObjectType(rs.getObject(index++), String.class); validateObjectType(rs.getObject(index++), String.class); validateObjectType(rs.getObject(index++), String.class); validateObjectType(rs.getObject(index++), Boolean.class); } rs.close(); s.close(); } catch (Exception e) { e.printStackTrace(); fail("Error occurred during testing: " + e.getMessage()); } }
@Test(groups = {"unit", "server"}) public void testInsertLists() { String cql = "insert into test_drive.list_data_type(id,id_uuid,binary_data,date_date,date_time," + "date_timestamp,id_timeuuid,net_inet,num_big_integer,num_decimal,num_double,num_float,num_int," + "num_small_int,num_tiny_int,num_varint,str_ascii,str_text,str_varchar,true_or_false)\n" + "values(5d19b3b2-a889-4913-81ec-164e5845cf36,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; try { java.sql.PreparedStatement s = conn.prepareStatement(cql); assertTrue(s instanceof CassandraPreparedStatement); CassandraDataTypeConverters c = ((CassandraPreparedStatement) s).getDataTypeConverters(); int index = 1; s.setObject(index++, Lists.newArrayList(UUID.randomUUID())); //s.setObject(index++, Lists.newArrayList(ByteBuffer.wrap(new byte[]{1, 2, 3}))); s.setObject(index++, Lists.newArrayList(new byte[]{1, 2, 3})); //s.setObject(index++, Lists.newArrayList("2017-01-01")); s.setObject(index++, Lists.newArrayList( CassandraTestHelper.getInstance().replaceParameter( LocalDate.fromMillisSinceEpoch(System.currentTimeMillis()), Date.class))); //s.setObject(index++, Lists.newArrayList("11:50:30")); //s.setObject(index++, Lists.newArrayList(LocalTime.now().getMillisOfDay() * 1000000L)); s.setObject(index++, Lists.newArrayList( CassandraTestHelper.getInstance().replaceParameter( new Time(LocalTime.now().toDateTimeToday().getMillis()), Time.class))); //s.setObject(index++, Lists.newArrayList("2017-02-02 11:50:30.123")); s.setObject(index++, Lists.newArrayList(LocalDateTime.now().toDate())); // or you'll likely end up with error like the following: // com.datastax.driver.core.exceptions.InvalidTypeException: xxx is not a Type 1 (time-based) UUID s.setObject(index++, Lists.newArrayList(((CassandraPreparedStatement) s) .getDataTypeConverters().defaultValueOf(UUID.class))); s.setObject(index++, Lists.newArrayList(InetAddress.getByName("192.168.10.11"))); s.setObject(index++, Lists.newArrayList(Long.MAX_VALUE)); s.setObject(index++, Lists.newArrayList(new BigDecimal("33333333333333333333333333333333333"))); s.setObject(index++, Lists.newArrayList(Double.MAX_VALUE)); s.setObject(index++, Lists.newArrayList(Float.MAX_VALUE)); s.setObject(index++, Lists.newArrayList(Integer.MAX_VALUE)); s.setObject(index++, Lists.newArrayList( CassandraTestHelper.getInstance().replaceParameter(Short.MAX_VALUE, Short.class))); s.setObject(index++, Lists.newArrayList( CassandraTestHelper.getInstance().replaceParameter(Byte.MAX_VALUE, Byte.class))); s.setObject(index++, Lists.newArrayList(new BigInteger("2222222222222222222222222222222222"))); s.setObject(index++, Lists.newArrayList("ascii")); s.setObject(index++, Lists.newArrayList("text")); s.setObject(index++, Lists.newArrayList("varchar")); s.setObject(index++, Lists.newArrayList(true)); assertFalse(s.execute()); assertNull(s.getResultSet()); assertEquals(s.getUpdateCount(), 1); s.close(); } catch (Exception e) { e.printStackTrace(); fail("Error occurred during testing: " + e.getMessage()); } }
@Test(groups = {"unit", "server"}) public void testInsertSets() { String cql = "insert into test_drive.set_data_type(id,id_uuid,binary_data,date_date,date_time," + "date_timestamp,id_timeuuid,net_inet,num_big_integer,num_decimal,num_double,num_float,num_int," + "num_small_int,num_tiny_int,num_varint,str_ascii,str_text,str_varchar,true_or_false)\n" + "values(5d19b3b2-a889-4913-81ec-164e5845cf36,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; try { java.sql.PreparedStatement s = conn.prepareStatement(cql); assertTrue(s instanceof CassandraPreparedStatement); CassandraDataTypeConverters c = ((CassandraPreparedStatement) s).getDataTypeConverters(); int index = 1; s.setObject(index++, Sets.newHashSet(UUID.randomUUID())); //s.setObject(index++, Lists.newArrayList(ByteBuffer.wrap(new byte[]{1, 2, 3}))); s.setObject(index++, Sets.newHashSet(new byte[]{1, 2, 3})); //s.setObject(index++, Lists.newArrayList("2017-01-01")); s.setObject(index++, Sets.newHashSet( CassandraTestHelper.getInstance().replaceParameter( LocalDate.fromMillisSinceEpoch(System.currentTimeMillis()), Date.class))); //s.setObject(index++, Lists.newArrayList("11:50:30")); //s.setObject(index++, Lists.newArrayList(LocalTime.now().getMillisOfDay() * 1000000L)); s.setObject(index++, Sets.newHashSet( CassandraTestHelper.getInstance().replaceParameter( new Time(LocalTime.now().toDateTimeToday().getMillis()), Time.class))); //s.setObject(index++, Lists.newArrayList("2017-02-02 11:50:30.123")); s.setObject(index++, Sets.newHashSet(LocalDateTime.now().toDate())); // or you'll likely end up with error like the following: // com.datastax.driver.core.exceptions.InvalidTypeException: xxx is not a Type 1 (time-based) UUID s.setObject(index++, Sets.newHashSet(((CassandraPreparedStatement) s) .getDataTypeConverters().defaultValueOf(UUID.class))); s.setObject(index++, Sets.newHashSet(InetAddress.getByName("192.168.10.11"))); s.setObject(index++, Sets.newHashSet(Long.MAX_VALUE)); s.setObject(index++, Sets.newHashSet(new BigDecimal("33333333333333333333333333333333333"))); s.setObject(index++, Sets.newHashSet(Double.MAX_VALUE)); s.setObject(index++, Sets.newHashSet(Float.MAX_VALUE)); s.setObject(index++, Sets.newHashSet(Integer.MAX_VALUE)); s.setObject(index++, Sets.newHashSet( CassandraTestHelper.getInstance().replaceParameter(Short.MAX_VALUE, Short.class))); s.setObject(index++, Sets.newHashSet( CassandraTestHelper.getInstance().replaceParameter(Byte.MAX_VALUE, Byte.class))); s.setObject(index++, Sets.newHashSet(new BigInteger("2222222222222222222222222222222222"))); s.setObject(index++, Sets.newHashSet("ascii")); s.setObject(index++, Sets.newHashSet("text")); s.setObject(index++, Sets.newHashSet("varchar")); s.setObject(index++, Sets.newHashSet(true)); assertFalse(s.execute()); assertNull(s.getResultSet()); assertEquals(s.getUpdateCount(), 1); s.close(); } catch (Exception e) { e.printStackTrace(); fail("Error occurred during testing: " + e.getMessage()); } }
@Test(groups = {"unit", "server"}) public void testInsertMaps() { String cql = "insert into test_drive.map_data_type(id,id_uuid,binary_data,date_date,date_time," + "date_timestamp,id_timeuuid,net_inet,num_big_integer,num_decimal,num_double,num_float,num_int," + "num_small_int,num_tiny_int,num_varint,str_ascii,str_text,str_varchar,true_or_false)\n" + "values(5d19b3b2-a889-4913-81ec-164e5845cf36,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; try { java.sql.PreparedStatement s = conn.prepareStatement(cql); assertTrue(s instanceof CassandraPreparedStatement); CassandraDataTypeConverters c = ((CassandraPreparedStatement) s).getDataTypeConverters(); int index = 1; s.setObject(index++, Maps.newHashMap(UUID.randomUUID(), UUID.randomUUID())); //s.setObject(index++, Lists.newArrayList(ByteBuffer.wrap(new byte[]{1, 2, 3}))); s.setObject(index++, Maps.newHashMap(new byte[]{1, 2, 3}, new byte[]{1, 2, 3})); //s.setObject(index++, Lists.newArrayList("2017-01-01")); s.setObject(index++, Maps.newHashMap( CassandraTestHelper.getInstance().replaceParameter( LocalDate.fromMillisSinceEpoch(System.currentTimeMillis()), Date.class), CassandraTestHelper.getInstance().replaceParameter( LocalDate.fromMillisSinceEpoch(System.currentTimeMillis()), Date.class))); //s.setObject(index++, Lists.newArrayList("11:50:30")); //s.setObject(index++, Lists.newArrayList(LocalTime.now().getMillisOfDay() * 1000000L)); s.setObject(index++, Maps.newHashMap( CassandraTestHelper.getInstance().replaceParameter( new Time(LocalTime.now().toDateTimeToday().getMillis()), Time.class), CassandraTestHelper.getInstance().replaceParameter( new Time(LocalTime.now().toDateTimeToday().getMillis()), Time.class))); //s.setObject(index++, Lists.newArrayList("2017-02-02 11:50:30.123")); s.setObject(index++, Maps.newHashMap(LocalDateTime.now().toDate(), LocalDateTime.now().toDate())); // or you'll likely end up with error like the following: // com.datastax.driver.core.exceptions.InvalidTypeException: xxx is not a Type 1 (time-based) UUID s.setObject(index++, Maps.newHashMap(((CassandraPreparedStatement) s) .getDataTypeConverters().defaultValueOf(UUID.class), ((CassandraPreparedStatement) s).getDataTypeConverters().defaultValueOf(UUID.class))); s.setObject(index++, Maps.newHashMap(InetAddress.getByName("192.168.10.11"), InetAddress.getByName("192.168.10.11"))); s.setObject(index++, Maps.newHashMap(Long.MAX_VALUE, Long.MAX_VALUE)); s.setObject(index++, Maps.newHashMap(new BigDecimal("33333333333333333333333333333333333"), new BigDecimal("33333333333333333333333333333333333"))); s.setObject(index++, Maps.newHashMap(Double.MAX_VALUE, Double.MAX_VALUE)); s.setObject(index++, Maps.newHashMap(Float.MAX_VALUE, Float.MAX_VALUE)); s.setObject(index++, Maps.newHashMap(Integer.MAX_VALUE, Integer.MAX_VALUE)); s.setObject(index++, Maps.newHashMap( CassandraTestHelper.getInstance().replaceParameter(Short.MAX_VALUE, Short.class), CassandraTestHelper.getInstance().replaceParameter(Short.MAX_VALUE, Short.class))); s.setObject(index++, Maps.newHashMap( CassandraTestHelper.getInstance().replaceParameter(Byte.MAX_VALUE, Byte.class), CassandraTestHelper.getInstance().replaceParameter(Byte.MAX_VALUE, Byte.class))); s.setObject(index++, Maps.newHashMap(new BigInteger("2222222222222222222222222222222222"), new BigInteger("2222222222222222222222222222222222"))); s.setObject(index++, Maps.newHashMap("ascii", "ascii")); s.setObject(index++, Maps.newHashMap("text", "text")); s.setObject(index++, Maps.newHashMap("varchar", "varchar")); s.setObject(index++, Maps.newHashMap(true, true)); assertFalse(s.execute()); assertNull(s.getResultSet()); assertEquals(s.getUpdateCount(), 1); s.close(); } catch (Exception e) { e.printStackTrace(); fail("Error occurred during testing: " + e.getMessage()); } }
LocalDate date(String dateStr) { String[] split = dateStr.split("-"); return LocalDate.fromYearMonthDay(Integer.valueOf(split[0]), Integer.valueOf(split[1]), Integer.valueOf(split[2])); }
LocalDate localDate(String dateStr) { String[] split = dateStr.split("-"); return fromYearMonthDay(Integer.valueOf(split[0]), Integer.valueOf(split[1]), Integer.valueOf(split[2])); }
void persist(String x, String y, String z, LocalDate xDepDate, LocalDate yDepDate, Integer xyFlightNr, Integer yzFlightNr) { connect.executeAsync(preparedStatement.bind(x, y, z, xDepDate, yDepDate, xyFlightNr, yzFlightNr)); }
@Override protected Date deserialize(LocalDate localDate) { return new Date(localDate.getMillisSinceEpoch()); }
@Override protected LocalDate serialize(Date d) { return LocalDate.fromMillisSinceEpoch(d.getTime()); }
@Test public void testWriteSingleRecord() throws InterruptedException, StageException { final String tableName = "test.trips"; List<CassandraFieldMappingConfig> fieldMappings = ImmutableList.of( new CassandraFieldMappingConfig("[0]", "driver_id"), new CassandraFieldMappingConfig("[1]", "trip_id"), new CassandraFieldMappingConfig("[2]", "time"), new CassandraFieldMappingConfig("[3]", "x"), new CassandraFieldMappingConfig("[4]", "y"), new CassandraFieldMappingConfig("[5]", "dt"), new CassandraFieldMappingConfig("[6]", "ts"), new CassandraFieldMappingConfig("[7]", "time_id"), new CassandraFieldMappingConfig("[8]", "unique_id") ); CassandraTargetConfig conf = new CassandraTargetConfig(); conf.contactPoints.add("localhost"); conf.port = CASSANDRA_NATIVE_PORT; conf.protocolVersion = ProtocolVersion.V4; conf.authProviderOption = AuthProviderOption.NONE; conf.compression = CassandraCompressionCodec.NONE; conf.columnNames = fieldMappings; conf.qualifiedTableName = tableName; Target target = new CassandraTarget(conf); TargetRunner targetRunner = new TargetRunner.Builder(CassandraDTarget.class, target).build(); long now = System.currentTimeMillis(); LocalDate dt = LocalDate.fromMillisSinceEpoch(now); Date ts = new Date(); Record record = RecordCreator.create(); List<Field> fields = new ArrayList<>(); fields.add(Field.create(1)); fields.add(Field.create(2)); fields.add(Field.create(3)); fields.add(Field.create(4.0)); fields.add(Field.create(5.0)); fields.add(Field.create(Field.Type.DATE, new Date(dt.getMillisSinceEpoch()))); fields.add(Field.create(Field.Type.DATETIME, ts)); fields.add(Field.create(SAMPLE_TIMEUUID)); fields.add(Field.create(SAMPLE_UUID)); record.set(Field.create(fields)); List<Record> singleRecord = ImmutableList.of(record); targetRunner.runInit(); targetRunner.runWrite(singleRecord); // Should not be any error records. Assert.assertTrue(targetRunner.getErrorRecords().isEmpty()); Assert.assertTrue(targetRunner.getErrors().isEmpty()); targetRunner.runDestroy(); ResultSet resultSet = session.execute("SELECT * FROM test.trips"); List<Row> allRows = resultSet.all(); Assert.assertEquals(1, allRows.size()); Row row = allRows.get(0); Assert.assertEquals(1, row.getInt("driver_id")); Assert.assertEquals(2, row.getInt("trip_id")); Assert.assertEquals(3, row.getInt("time")); Assert.assertEquals(4.0, row.getDouble("x"), EPSILON); Assert.assertEquals(5.0, row.getDouble("y"), EPSILON); Assert.assertEquals(dt, row.getDate("dt")); Assert.assertEquals(ts, row.getTimestamp("ts")); Assert.assertEquals(SAMPLE_TIMEUUID, row.getUUID("time_id").toString()); Assert.assertEquals(SAMPLE_UUID, row.getUUID("unique_id").toString()); }
@Override public LocalDate getDate(int i) { return row.getDate(i); }
@Override public LocalDate getDate(String s) { return row.getDate(s); }
@Override public LocalDate getDate(int i) { throw new UnsupportedOperationException(); }
@Override public LocalDate getDate(String name) { throw new UnsupportedOperationException(); }
public boolean matches(Exchange exchange) { Object payLoad = exchange.getIn().getBody(); if (payLoad == null || !(payLoad instanceof List)) { return false; } List<Object> list = (List) payLoad; if (list.size() != 1) { return false; } payLoad = list.get(0); if (!(payLoad instanceof Map)) { return false; } Map rmap = (Map) payLoad; if (rmap.size() != map.size()) { return false; } Object value = rmap.get("test_inet"); if (!(value instanceof InetAddress)) { return false; } value = rmap.get("test_uuid"); if (!(value instanceof UUID)) { return false; } if (test_uuid.compareTo((UUID) value) != 0) { return false; } value = rmap.get("test_date"); if (!(value instanceof LocalDate)) { return false; } if (test_date.getDay() != ((LocalDate)value).getDay()) { return false; } return true; }