我正在尝试从Spark RDD向Ignite缓存执行插入操作。我正在使用2.2版本的Ignite和2.1版本的Spark。
我要做的第一步是在单独的scala脚本中创建缓存,如下所示:
object Create_Ignite_Cache { case class Custom_Class( @(QuerySqlField @field)(index = true) a: String, @(QuerySqlField @field)(index = true) b: String, @(QuerySqlField @field)(index = true) c: String, @(QuerySqlField @field)(index = true) d: String, @(QuerySqlField @field)(index = true) e: String, @(QuerySqlField @field)(index = true) f: String, @(QuerySqlField @field)(index = true) g: String, @(QuerySqlField @field)(index = true) h: String ) def main(args: Array[String]): Unit = { val spi = new TcpDiscoverySpi val ipFinder = new TcpDiscoveryMulticastIpFinder val adresses = new util.ArrayList[String] adresses.add("127.0.0.1:48500..48520") ipFinder.setAddresses(adresses) spi.setIpFinder(ipFinder) val cfg = new IgniteConfiguration().setDiscoverySpi(spi).setClientMode(true) val cache_conf = new CacheConfiguration[String, Custom_Class]().setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(1).setIndexedTypes(classOf[String], classOf[Custom_Class]).setName("Spark_Ignite") val ignite = Ignition.getOrStart(cfg) ignite.getOrCreateCache(cache_conf) System.out.println("[INFO] CACHE CREATED") ignite.close() } }
接下来,我运行了一个Spark应用程序,将igniteRDD的内容插入到缓存中:
object Spark_Streaming_Processing { case class Custom_Class( @(QuerySqlField @field)(index = true) a: String, @(QuerySqlField @field)(index = true) b: String, @(QuerySqlField @field)(index = true) c: String, @(QuerySqlField @field)(index = true) d: String, @(QuerySqlField @field)(index = true) e: String, @(QuerySqlField @field)(index = true) f: String, @(QuerySqlField @field)(index = true) g: String, @(QuerySqlField @field)(index = true) h: String ) //START IGNITE CONTEXT val addresses=new util.ArrayList[String]() addresses.add("127.0.0.1:48500..48520") val igniteContext:IgniteContext=new IgniteContext(sqlContext.sparkContext,()=> new IgniteConfiguration().setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(addresses)) ).setCacheConfiguration(new CacheConfiguration[String,Custom_Class]() .setName("Spark_Ignite").setBackups(1).setIndexedTypes(classOf[String],classOf[Custom_Class])) ,true) println(igniteContext.ignite().cacheNames()) val ignite_cache_rdd:IgniteRDD[String,Custom_Class] =igniteContext.fromCache[String,Custom_Class]("Spark_Ignite") val processed_Pair:RDD[(String,Custom_Class)]=(...)// rdd with data, which as you can see has the correct datatypes as parameters ignite_cache_rdd.savePairs(processed_PairRDD) } }
可以看出,这些类是完全相同的。
成功运行该应用程序后,我可以在ignitevisor中看到该缓存包含63条记录,如在控制台的上一个快照中所见。
但是,如果我尝试对高速缓存执行sql查询,如下所示:
ignite_cache_rdd.sql("select * from Custom_Class").show(truncate = false)
结果我得到一个空表。
如果我通过外部sql服务器查询,也会发生同样的事情。
奇怪的是,如果我不创建缓存先验,而是运行Spark应用程序,则IgniteContext会创建缓存(如果该缓存不存在), 然后 我就可以在查询中看到记录了!
这可能是什么问题?
据我所知,键和值的数据类型完全相同,因此我在查询时应该能够看到它们。
感谢您的时间。
这里的问题是您使用不同的类来创建缓存并将数据插入到其中。即使这两个类的字段匹配,它们的完全限定名称也不同,因此这是两个不同的类。
如果希望能够从SQL查询数据,则应在缓存创建和数据插入期间使用相同的类。
跳过创建缓存解决了该问题的原因是Spark应用程序自己创建了一个缓存,而不是使用现有的缓存。因此,当Spark创建它时,在缓存创建过程中将使用实际对象的类。