我正在尝试根据时间序列数据上的滑动窗口提取特征。在Scala中,似乎有一个sliding基于此帖子和文档的功能
sliding
import org.apache.spark.mllib.rdd.RDDFunctions._ sc.parallelize(1 to 100, 10) .sliding(3) .map(curSlice => (curSlice.sum / curSlice.size)) .collect()
我的问题是PySpark是否有类似的功能?或者,如果还没有这样的功能,我们如何实现类似的滑动窗口转换?
据我所知,sliding函数在Python中不可用,SlidingRDD是私有类,不能在外部访问MLlib。
SlidingRDD
MLlib
如果要sliding在现有的RDD上使用,则可以这样创建可怜人sliding:
def sliding(rdd, n): assert n > 0 def gen_window(xi, n): x, i = xi return [(i - offset, (i, x)) for offset in xrange(n)] return ( rdd. zipWithIndex(). # Add index flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset groupByKey(). # Group to create windows # Sort values to ensure order inside window and drop indices mapValues(lambda vals: [x for (i, x) in sorted(vals)]). sortByKey(). # Sort to makes sure we keep original order values(). # Get values filter(lambda x: len(x) == n)) # Drop beginning and end
或者,您可以尝试这样的操作(在的帮助下toolz)
toolz
from toolz.itertoolz import sliding_window, concat def sliding2(rdd, n): assert n > 1 def get_last_el(i, iter): """Return last n - 1 elements from the partition""" return [(i, [x for x in iter][(-n + 1):])] def slide(i, iter): """Prepend previous items and return sliding window""" return sliding_window(n, concat([last_items.value[i - 1], iter])) def clean_last_items(last_items): """Adjust for empty or to small partitions""" clean = {-1: [None] * (n - 1)} for i in range(rdd.getNumPartitions()): clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):] return {k: tuple(v) for k, v in clean.items()} last_items = sc.broadcast(clean_last_items( rdd.mapPartitionsWithIndex(get_last_el).collectAsMap())) return rdd.mapPartitionsWithIndex(slide)