我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用scipy.interpolate()。
def __call__(self, vals, fill_value=np.nan): """ Evaluate interpolator for values given at the source points. Parameters ---------- vals : ndarray of float, shape (numsourcepoints, ...) Values at the source points which to interpolate fill_value : float is needed if linear interpolation fails; defaults to np.nan Returns ------- output : ndarray of float with shape (numtargetpoints,...) """ self._check_shape(vals) ip = LinearNDInterpolator(self.src, vals, fill_value=fill_value) return ip(self.trg) # ----------------------------------------------------------------------------- # Covariance routines needed for Kriging # -----------------------------------------------------------------------------
def apply_grouping(self, energy_channel, grouping, verbose=False): """ Group the ARF channels (INTERPOLATED with respect to the spectral channels) by the supplied grouping specification. Arguments: * energy_channel: energies of the spectral channel * grouping: spectral grouping specification Return: `self.specresp_grp' """ if self.groupped: return if verbose: print("INFO: Grouping ARF '%s' ..." % self.filename) self.energy_channel = energy_channel self.grouping = grouping # interpolate the ARF w.r.t the spectral channel energies arf_interp = self.interpolate(x=energy_channel, verbose=verbose) self.specresp_grp = group_data(arf_interp, grouping) self.groupped = True # class ARF }}}
def getGriddata(x,y,z,extend): ''' data x,y,z and boundbox to print ''' (xmin,xmax,ymin,ymax)=extend grid_y, grid_x = np.mgrid[xmin:xmax:(xmax-xmin)*10j, ymin:ymax:(ymax-ymin)*10j] points=[] for i in range(x.shape[0]): points.append([y[i],x[i]]) values=z # see http://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html from scipy.interpolate import griddata # grid_z0 = griddata(points, values, (grid_x, grid_y), method='nearest') # grid_z1 = griddata(points, values, (grid_x, grid_y), method='linear') grid_z2 = scipy.interpolate.griddata(points, values, (grid_x, grid_y), method='cubic') return grid_z2
def _prepare_interpolators(self): """ Updates the interpolator functions the user calls to interpolate the current plan. """ if len(self.x_seq) == 1: self.get_state = lambda t: self.x_seq[0] self.get_effort = lambda t: np.zeros(self.ncontrols) else: self.get_state = interp1d(self.t_seq, np.array(self.x_seq), axis=0, assume_sorted=True, bounds_error=False, fill_value=self.x_seq[-1][:]) self.get_effort = interp1d(self.t_seq, np.array(self.u_seq), axis=0, assume_sorted=True, bounds_error=False, fill_value=self.u_seq[-1][:]) #################################################
def regrid(x,y,z,xnew,ynew,method='cubic'): """ Regrid 1D arrays (x,y,z) -- where z is some scalar field mapped at positions x,y -- to a 2d array Z defined in the cartesian grids xnew,ynew (1D arrays with new grid). For the interpolation method, choose nearest, linear or cubic. >>> rho=regrid(d.x,d.y,d.rho,xnew,ynew) .. todo:: need to create a 3d version of this method, paving the road for the 3d simulations. """ import scipy.interpolate # regrid the data to a nice cartesian grid Z = scipy.interpolate.griddata((x, y), z, (xnew[None,:], ynew[:,None]), method=method) # get rid of NaNs return nanzero(Z)
def lambda_interp(lambdau, s): # lambda is the index sequence that is produced by the model # s is the new vector at which evaluations are required. # the value is a vector of left and right indices, and a vector of fractions. # the new values are interpolated bewteen the two using the fraction # Note: lambda decreases. you take: # sfrac*left+(1-sfrac*right) if len(lambdau) == 1: nums = len(s) left = scipy.zeros([nums, 1], dtype = scipy.integer) right = left sfrac = scipy.zeros([nums, 1], dtype = scipy.float64) else: s[s > scipy.amax(lambdau)] = scipy.amax(lambdau) s[s < scipy.amin(lambdau)] = scipy.amin(lambdau) k = len(lambdau) sfrac = (lambdau[0] - s)/(lambdau[0] - lambdau[k - 1]) lambdau = (lambdau[0] - lambdau)/(lambdau[0] - lambdau[k - 1]) coord = scipy.interpolate.interp1d(lambdau, range(k))(sfrac) left = scipy.floor(coord).astype(scipy.integer, copy = False) right = scipy.ceil(coord).astype(scipy.integer, copy = False) # tf = left != right sfrac[tf] = (sfrac[tf] - lambdau[right[tf]])/(lambdau[left[tf]] - lambdau[right[tf]]) sfrac[~tf] = 1.0 #if left != right: # sfrac = (sfrac - lambdau[right])/(lambdau[left] - lambdau[right]) #else: # sfrac[left == right] = 1.0 result = dict() result['left'] = left result['right'] = right result['frac'] = sfrac return(result) # end of lambda_interp # =========================================
def test_interpolate(self): ts = Series(np.arange(len(self.ts), dtype=float), self.ts.index) ts_copy = ts.copy() ts_copy[5:10] = np.NaN linear_interp = ts_copy.interpolate(method='linear') self.assert_numpy_array_equal(linear_interp, ts) ord_ts = Series([d.toordinal() for d in self.ts.index], index=self.ts.index).astype(float) ord_ts_copy = ord_ts.copy() ord_ts_copy[5:10] = np.NaN time_interp = ord_ts_copy.interpolate(method='time') self.assert_numpy_array_equal(time_interp, ord_ts) # try time interpolation on a non-TimeSeries # Only raises ValueError if there are NaNs. non_ts = self.series.copy() non_ts[0] = np.NaN self.assertRaises(ValueError, non_ts.interpolate, method='time')
def test_interpolate_index_values(self): s = Series(np.nan, index=np.sort(np.random.rand(30))) s[::3] = np.random.randn(10) vals = s.index.values.astype(float) result = s.interpolate(method='index') expected = s.copy() bad = isnull(expected.values) good = ~bad expected = Series(np.interp(vals[bad], vals[good], s.values[good]), index=s.index[bad]) assert_series_equal(result[bad], expected) # 'values' is synonymous with 'index' for the method kwarg other_result = s.interpolate(method='values') assert_series_equal(other_result, result) assert_series_equal(other_result[bad], expected)
def test_interp_limit_before_ends(self): # These test are for issue #11115 -- limit ends properly. s = Series([np.nan, np.nan, 5, 7, np.nan, np.nan]) expected = Series([np.nan, np.nan, 5., 7., 7., np.nan]) result = s.interpolate(method='linear', limit=1, limit_direction='forward') assert_series_equal(result, expected) expected = Series([np.nan, 5., 5., 7., np.nan, np.nan]) result = s.interpolate(method='linear', limit=1, limit_direction='backward') assert_series_equal(result, expected) expected = Series([np.nan, 5., 5., 7., 7., np.nan]) result = s.interpolate(method='linear', limit=1, limit_direction='both') assert_series_equal(result, expected)
def test_interp_basic(self): df = DataFrame({'A': [1, 2, np.nan, 4], 'B': [1, 4, 9, np.nan], 'C': [1, 2, 3, 5], 'D': list('abcd')}) expected = DataFrame({'A': [1., 2., 3., 4.], 'B': [1., 4., 9., 9.], 'C': [1, 2, 3, 5], 'D': list('abcd')}) result = df.interpolate() assert_frame_equal(result, expected) result = df.set_index('C').interpolate() expected = df.set_index('C') expected.loc[3, 'A'] = 3 expected.loc[5, 'B'] = 9 assert_frame_equal(result, expected)
def test_interp_rowwise(self): df = DataFrame({0: [1, 2, np.nan, 4], 1: [2, 3, 4, np.nan], 2: [np.nan, 4, 5, 6], 3: [4, np.nan, 6, 7], 4: [1, 2, 3, 4]}) result = df.interpolate(axis=1) expected = df.copy() expected.loc[3, 1] = 5 expected.loc[0, 2] = 3 expected.loc[1, 3] = 3 expected[4] = expected[4].astype(np.float64) assert_frame_equal(result, expected) # scipy route tm._skip_if_no_scipy() result = df.interpolate(axis=1, method='values') assert_frame_equal(result, expected) result = df.interpolate(axis=0) expected = df.interpolate() assert_frame_equal(result, expected)
def test_interp_ignore_all_good(self): # GH df = DataFrame({'A': [1, 2, np.nan, 4], 'B': [1, 2, 3, 4], 'C': [1., 2., np.nan, 4.], 'D': [1., 2., 3., 4.]}) expected = DataFrame({'A': np.array( [1, 2, 3, 4], dtype='float64'), 'B': np.array( [1, 2, 3, 4], dtype='int64'), 'C': np.array( [1., 2., 3, 4.], dtype='float64'), 'D': np.array( [1., 2., 3., 4.], dtype='float64')}) result = df.interpolate(downcast=None) assert_frame_equal(result, expected) # all good result = df[['B', 'D']].interpolate(downcast=None) assert_frame_equal(result, df[['B', 'D']])
def __call__(self, vals): """ Evaluate interpolator for values given at the source points. Parameters ---------- vals : ndarray of float, shape (numsources, ...) Values at the source points which to interpolate Returns ------- output : None """ self._check_shape(vals) return None
def __call__(self, vals, maxdist=None): """ Evaluate interpolator for values given at the source points. Parameters ---------- vals : ndarray of float, shape (numsourcepoints, ...) Values at the source points which to interpolate maxdist : the maximum distance up to which an interpolated values is assigned - if maxdist is exceeded, np.nan will be assigned If maxdist==None, values will be assigned everywhere Returns ------- output : ndarray of float with shape (numtargetpoints,...) """ self._check_shape(vals) out = vals[self.ix] if maxdist is None: return out else: return np.where(self.dists > maxdist, np.nan, out)
def interpolate(self, x=None, verbose=False): """ Interpolate the ARF curve using `scipy.interpolate' If the requested point is outside of the data range, the fill value of *zero* is returned. Arguments: * x: points at which the interpolation to be calculated. Return: If x is None, then the interpolated function is returned, otherwise, the interpolated data are returned. """ if not hasattr(self, "f_interp") or self.f_interp is None: arf = self.get_data(copy=False) if verbose: print("INFO: Interpolating ARF '%s' (may take a while) ..." % self.filename) f_interp = scipy.interpolate.interp1d( self.energy, arf, kind="quadratic", bounds_error=False, fill_value=0.0, assume_sorted=True) self.f_interp = f_interp if x is not None: return self.f_interp(x) else: return self.f_interp
def interpolate(x,y,z, gridsize,mode='thin_plate',rbfmode=True,shape=None): grids=gridsize dx=np.max(x)-np.min(x) dy=np.max(y)-np.min(y) if dx>dy: gridx=grids gridy=int(round(dy/dx*grids)) else: gridy=grids gridx=int(round(dx/dy*grids)) if shape<>None: (gridy,gridx)=shape xi, yi = np.linspace(np.min(x), np.max(x), gridx), np.linspace(np.min(y), np.max(y), gridy) xi, yi = np.meshgrid(xi, yi) if rbfmode: rbf = scipy.interpolate.Rbf(x, y, z, function=mode) rbf2 = scipy.interpolate.Rbf( y,x, z, function=mode) else: print "interp2d nicht implementiert" rbf = scipy.interpolate.interp2d(x, y, z, kind=mode) zi=rbf2(yi,xi) return [rbf,xi,yi,zi]
def interpolate(coords, var, interp_coords, missing_value=None, fill=True, kind="linear"): """Interpolate globally defined data to a different (regular) grid. Arguments: coords: Tuple of coordinate arrays for each dimension. var (:obj:`ndarray` of dim (nx1, ..., nxd)): Variable data to interpolate. interp_coords: Tuple of coordinate arrays to interpolate to. missing_value (optional): Value denoting cells of missing data in ``var``. Is replaced by `NaN` before interpolating. Defaults to `None`, which means no replacement is taking place. fill (bool, optional): Whether `NaN` values should be replaced by the nearest finite value after interpolating. Defaults to ``True``. kind (str, optional): Order of interpolation. Supported are `nearest` and `linear` (default). Returns: :obj:`ndarray` containing the interpolated values on the grid spanned by ``interp_coords``. """ if len(coords) != len(interp_coords) or len(coords) != var.ndim: raise ValueError("Dimensions of coordinates and values do not match") var = np.array(var) if missing_value is not None: invalid_mask = np.isclose(var, missing_value) var[invalid_mask] = np.nan if var.ndim > 1 and coords[0].ndim == 1: interp_grid = np.rollaxis(np.array(np.meshgrid( *interp_coords, indexing="ij", copy=False)), 0, len(interp_coords) + 1) else: interp_grid = coords var = scipy.interpolate.interpn(coords, var, interp_grid, bounds_error=False, fill_value=np.nan, method=kind) if fill: var = fill_holes(var) return var
def get_periodic_interval(current_time, cycle_length, rec_spacing, n_rec): """Used for linear interpolation between periodic time intervals. One common application is the interpolation of external forcings that are defined at discrete times (e.g. one value per month of a standard year) to the current time step. Arguments: current_time (float): Time to interpolate to. cycle_length (float): Total length of one periodic cycle. rec_spacing (float): Time spacing between each data record. n_rec (int): Total number of records available. Returns: :obj:`tuple` containing (n1, f1), (n2, f2): Indices and weights for the interpolated record array. Example: The following interpolates a record array ``data`` containing 12 monthly values to the current time step: >>> year_in_seconds = 60. * 60. * 24. * 365. >>> current_time = 60. * 60. * 24. * 45. # mid-february >>> print(data.shape) (360, 180, 12) >>> (n1, f1), (n2, f2) = get_periodic_interval(current_time, year_in_seconds, year_in_seconds / 12, 12) >>> data_at_current_time = f1 * data[..., n1] + f2 * data[..., n2] """ locTime = current_time - rec_spacing * 0.5 + \ cycle_length * (2 - round(current_time / cycle_length)) tmpTime = locTime % cycle_length tRec1 = 1 + int(tmpTime / rec_spacing) tRec2 = 1 + tRec1 % int(n_rec) wght2 = (tmpTime - rec_spacing * (tRec1 - 1)) / rec_spacing wght1 = 1.0 - wght2 return (tRec1 - 1, wght1), (tRec2 - 1, wght2)
def map_profile_onto_turb_grid(self, profileTango): """Since Tango's domain is larger than GENE's in both directions, we can use a simple interpolating spline to resample the profile on GENE's grid. """ interpolate = scipy.interpolate.InterpolatedUnivariateSpline(self.psiTango, profileTango) profileGene = interpolate(self.psiGene) return profileGene
def map_profile_onto_turb_grid(self, profileTango): """Since Tango's domain is larger than GENE's in both directions, we can use a simple interpolating spline to resample the profile on GENE's grid. """ interpolate = scipy.interpolate.InterpolatedUnivariateSpline(self.xTango, profileTango) profileTurb = interpolate(self.xTurb) return profileTurb
def extend_with_zeros_both_sides(xSmall, fSmall, xLarge, enforcePositive=False): """Extending a function to a larger domain, with zeros where it was not originally defined. The domain xSmall should be fully contained within xLarge. That is, xLarge extends farther outward on both sides of the domain. This function operates by resampling within the overlapping region xSmall, and then extending with zeros. Sometimes, interpolation might produce negative values when zero is the minimum for physical reasons. The diffusion coefficient is one example where one wants to maintain positivity. In this case, one can optionally enforce positivity of the returned value by zeroing out negative values. Inputs: xSmall independent variable on the smaller domain (array) fSmall dependent variable on the smaller domain (array) xLarge independent variable on the larger domain (array) enforcePositive (optional) If True, set any negative values to zero before returning (boolean) Outputs: fLarge dependent variable on the larger domain (array) """ assert xLarge[0] <= xSmall[0] and xLarge[-1] >= xSmall[-1] # resample within the overlapping region fLarge = np.zeros_like(xLarge) # initialize with zeros ind = np.where(xLarge > xSmall[0]) indstart = ind[0][0] ind = np.where(xLarge < xSmall[-1]) indfinal = ind[0][-1] xLargeTemp = xLarge[indstart : indfinal + 1] interpolate = scipy.interpolate.InterpolatedUnivariateSpline(xSmall, fSmall) fLarge[indstart : indfinal+1] = interpolate(xLargeTemp) # extend with zeros -- automatically performed because fLarge was initialized with zeros if enforcePositive == True: ind = fLarge < 0 fLarge[ind] = 0 return fLarge
def extend_with_zeros_left_side(xIn, fIn, xOut, enforcePositive=False): """Extending a function to another domain, with zeros where it was not originally defined. The domains xIn and xOut should satsify xOut[0] < xIn[0] and xOut[-1] < xIn[0]. The output domain is "to the left" of the input domain. This function operates by resampling within the overlapping region, and then extending with zeros. Sometimes, interpolation might produce negative values when zero is the minimum for physical reasons. The diffusion coefficient is one example where one wants to maintain positivity. In this case, one can optionally enforce positivity of the returned value by zeroing out negative values. Inputs: xIn independent variable on the input domain (array) fIn dependent variable on the input domain (array) xOut independent variable on the new domain (array) enforcePositive (optional) If True, set any negative values to zero before returning (boolean) Outputs: fOut dependent variable on the new domain (array) """ assert xOut[0] <= xIn[0] and xOut[-1] <= xIn[-1] fOut = np.zeros_like(xOut) # initialize with zeros # resample within the overlapping region ind = np.where(xOut > xIn[0]) indstart = ind[0][0] xOutTemp = xOut[indstart:] interpolate = scipy.interpolate.InterpolatedUnivariateSpline(xIn, fIn) fOut[indstart:] = interpolate(xOutTemp) # extend with zeros -- automatically performed because fOut was initialized with zeros if enforcePositive == True: ind = fOut < 0 fOut[ind] = 0 return fOut ################################################### #### Functions for extrapolation ####
def make_extrapolator_fixed_slope(xSmall, fSmall, outwardSlope): """Create an extrapolator that uses cubic interpolation within the given domain xSmall, and an imposed linear fit with imposed slope outside the given domain xSmall. Data must be sorted. Inputs: xSmall independent variable on the smaller domain (array) fSmall dependent variable on the smaller domain (array) outwardSlope imposed slope outside the domain xSmall Outputs: extrapolator function that can be evaluated on a domain, like interpolators """ def extrapolator(xLarge): fLarge = np.zeros_like(xLarge, dtype=np.float) # exterior region: left side indLeftExterior = xLarge < xSmall[0] fLarge[indLeftExterior] = outwardSlope * (xLarge[indLeftExterior] - xSmall[0]) + fSmall[0] #exterior region: right side indRightExterior = xLarge > xSmall[-1] fLarge[indRightExterior] = outwardSlope * (xLarge[indRightExterior] - xSmall[-1]) + fSmall[-1] # interpolated points in the interior using cubic interpolation interpolatorInterior = scipy.interpolate.InterpolatedUnivariateSpline(xSmall, fSmall, k=3) # cubic indInterior = (xLarge >= xSmall[0]) & (xLarge <= xSmall[-1]) fLarge[indInterior] = interpolatorInterior(xLarge[indInterior]) return fLarge return extrapolator
def interp1d(*args, **kwargs): kwargs.pop('assume_sorted', None) return scipy.interpolate.interp1d(*args, **kwargs)
def interpolate(self): if self._interpolate is None: try: import scipy.interpolate as interpolate except ImportError: interpolate = NotAModule(self._name) self._interpolate = interpolate return self._interpolate
def interpolate(self, stellar_photosphere, transitions): """ Interpolate non-LTE corrections to the equivalent widths of many transitions for a single stellar photosphere. :param stellar_photosphere: A stellar atmosphere model. :param transitions: A table of atomic transitions. """ # A convenience function. raise NotImplementedError
def __init__(self,cqt,Ls): from scipy.interpolate import interp1d self.intp = [interp1d(np.linspace(0,Ls,len(r)),r) for r in cqt]
def __init__(self,cqt,Ls): from scipy.interpolate import interp1d self.intp = interp1d(np.linspace(0,Ls,len(cqt)),cqt)
def moveout_correction(self): #################################################################################### ''' Moveout correction relative to a reference ray parameter of 6.4 s/deg. This stretches the time axis for smaller ray parameters (larger epicentral distances), and shrinks the time axis for larger ray parameters (smaller epicentral distances). #NOTE 3-16-16, Moveout correction doesn't work properly... The time axis seems to be stretching in the opposite way that it should. ''' p = self.slowness_table[:,0] s = self.slowness_table[:,1] #interpolate with np.interp. make sure values in the first vector are increasing scale = np.interp(self.ray_param,p[::-1],s[::-1]) #print "ray parameter, scale = ",self.ray_param,scale #scale the receiver function and interpolate new time axis new_time = self.time * scale f = interp1d(new_time,self.rf_st[0].data,bounds_error=False,fill_value=0) self.rf_st[0].data = f(self.time) f = interp1d(new_time,self.rf_st[1].data,bounds_error=False,fill_value=0) self.rf_st[1].data = f(self.time) f = interp1d(new_time,self.rf_st[2].data,bounds_error=False,fill_value=0) self.rf_st[2].data = f(self.time) ####################################################################################
def rf_moveout_correction(rf_trace,table='None'): #################################################################################### ''' takes a receiver function trace in the rfh5 format. if table = 'None', the slowness lookup table will be read. alternatively if calling this function repeatedly, pass the table as an argument to avoid repetative i/o. ''' if table == 'None': slowness_table = np.loadtxt('/geo/work10/romaguir/seismology/seis_tools/seispy/slowness_table.dat') else: slowness_table = table p = slowness_table[:,0] s = slowness_table[:,1] #interpolate with np.interp. make sure values in the first vector are increasing scale = np.interp(rf_trace.stats.ray_param,p[::-1],s[::-1]) #scale the receiver function and interpolate new time axis time = np.linspace(0,rf_trace.stats.delta*rf_trace.stats.npts,rf_trace.stats.npts) new_time = time * scale f = interp1d(new_time,rf_trace.data,bounds_error=False,fill_value=0) rf_mvc = f(time) rf_trace.data = rf_mvc return rf_trace ####################################################################################
def _skip_if_no_pchip(): try: from scipy.interpolate import pchip_interpolate # noqa except ImportError: raise nose.SkipTest('scipy.interpolate.pchip missing') # ---------------------------------------------------------------------- # Generic types test cases
def test_interp_regression(self): tm._skip_if_no_scipy() _skip_if_no_pchip() ser = Series(np.sort(np.random.uniform(size=100))) # interpolate at new_index new_index = ser.index.union(Index([49.25, 49.5, 49.75, 50.25, 50.5, 50.75])) interp_s = ser.reindex(new_index).interpolate(method='pchip') # does not blow up, GH5977 interp_s[49:51]
def test_interpolate_corners(self): s = Series([np.nan, np.nan]) assert_series_equal(s.interpolate(), s) s = Series([]).interpolate() assert_series_equal(s.interpolate(), s) tm._skip_if_no_scipy() s = Series([np.nan, np.nan]) assert_series_equal(s.interpolate(method='polynomial', order=1), s) s = Series([]).interpolate() assert_series_equal(s.interpolate(method='polynomial', order=1), s)
def test_nan_interpolate(self): s = Series([0, 1, np.nan, 3]) result = s.interpolate() expected = Series([0., 1., 2., 3.]) assert_series_equal(result, expected) tm._skip_if_no_scipy() result = s.interpolate(method='polynomial', order=1) assert_series_equal(result, expected)
def test_nan_irregular_index(self): s = Series([1, 2, np.nan, 4], index=[1, 3, 5, 9]) result = s.interpolate() expected = Series([1., 2., 3., 4.], index=[1, 3, 5, 9]) assert_series_equal(result, expected)
def test_nan_str_index(self): s = Series([0, 1, 2, np.nan], index=list('abcd')) result = s.interpolate() expected = Series([0., 1., 2., 2.], index=list('abcd')) assert_series_equal(result, expected)
def test_interp_quad(self): tm._skip_if_no_scipy() sq = Series([1, 4, np.nan, 16], index=[1, 2, 3, 4]) result = sq.interpolate(method='quadratic') expected = Series([1., 4., 9., 16.], index=[1, 2, 3, 4]) assert_series_equal(result, expected)
def test_interp_scipy_basic(self): tm._skip_if_no_scipy() s = Series([1, 3, np.nan, 12, np.nan, 25]) # slinear expected = Series([1., 3., 7.5, 12., 18.5, 25.]) result = s.interpolate(method='slinear') assert_series_equal(result, expected) result = s.interpolate(method='slinear', downcast='infer') assert_series_equal(result, expected) # nearest expected = Series([1, 3, 3, 12, 12, 25]) result = s.interpolate(method='nearest') assert_series_equal(result, expected.astype('float')) result = s.interpolate(method='nearest', downcast='infer') assert_series_equal(result, expected) # zero expected = Series([1, 3, 3, 12, 12, 25]) result = s.interpolate(method='zero') assert_series_equal(result, expected.astype('float')) result = s.interpolate(method='zero', downcast='infer') assert_series_equal(result, expected) # quadratic expected = Series([1, 3., 6.769231, 12., 18.230769, 25.]) result = s.interpolate(method='quadratic') assert_series_equal(result, expected) result = s.interpolate(method='quadratic', downcast='infer') assert_series_equal(result, expected) # cubic expected = Series([1., 3., 6.8, 12., 18.2, 25.]) result = s.interpolate(method='cubic') assert_series_equal(result, expected)
def test_interp_limit_forward(self): s = Series([1, 3, np.nan, np.nan, np.nan, 11]) # Provide 'forward' (the default) explicitly here. expected = Series([1., 3., 5., 7., np.nan, 11.]) result = s.interpolate(method='linear', limit=2, limit_direction='forward') assert_series_equal(result, expected) result = s.interpolate(method='linear', limit=2, limit_direction='FORWARD') assert_series_equal(result, expected)
def test_interp_limit_bad_direction(self): s = Series([1, 3, np.nan, np.nan, np.nan, 11]) self.assertRaises(ValueError, s.interpolate, method='linear', limit=2, limit_direction='abc') # raises an error even if no limit is specified. self.assertRaises(ValueError, s.interpolate, method='linear', limit_direction='abc')
def test_interp_limit_direction(self): # These tests are for issue #9218 -- fill NaNs in both directions. s = Series([1, 3, np.nan, np.nan, np.nan, 11]) expected = Series([1., 3., np.nan, 7., 9., 11.]) result = s.interpolate(method='linear', limit=2, limit_direction='backward') assert_series_equal(result, expected) expected = Series([1., 3., 5., np.nan, 9., 11.]) result = s.interpolate(method='linear', limit=1, limit_direction='both') assert_series_equal(result, expected) # Check that this works on a longer series of nans. s = Series([1, 3, np.nan, np.nan, np.nan, 7, 9, np.nan, np.nan, 12, np.nan]) expected = Series([1., 3., 4., 5., 6., 7., 9., 10., 11., 12., 12.]) result = s.interpolate(method='linear', limit=2, limit_direction='both') assert_series_equal(result, expected) expected = Series([1., 3., 4., np.nan, 6., 7., 9., 10., 11., 12., 12.]) result = s.interpolate(method='linear', limit=1, limit_direction='both') assert_series_equal(result, expected)
def test_interp_limit_to_ends(self): # These test are for issue #10420 -- flow back to beginning. s = Series([np.nan, np.nan, 5, 7, 9, np.nan]) expected = Series([5., 5., 5., 7., 9., np.nan]) result = s.interpolate(method='linear', limit=2, limit_direction='backward') assert_series_equal(result, expected) expected = Series([5., 5., 5., 7., 9., 9.]) result = s.interpolate(method='linear', limit=2, limit_direction='both') assert_series_equal(result, expected)
def test_interp_multiIndex(self): idx = MultiIndex.from_tuples([(0, 'a'), (1, 'b'), (2, 'c')]) s = Series([1, 2, np.nan], index=idx) expected = s.copy() expected.loc[2] = 2 result = s.interpolate() assert_series_equal(result, expected) tm._skip_if_no_scipy() with tm.assertRaises(ValueError): s.interpolate(method='polynomial', order=1)
def test_interp_nonmono_raise(self): tm._skip_if_no_scipy() s = Series([1, np.nan, 3], index=[0, 2, 1]) with tm.assertRaises(ValueError): s.interpolate(method='krogh')
def test_interp_datetime64(self): tm._skip_if_no_scipy() df = Series([1, np.nan, 3], index=date_range('1/1/2000', periods=3)) result = df.interpolate(method='nearest') expected = Series([1., 1., 3.], index=date_range('1/1/2000', periods=3)) assert_series_equal(result, expected)
def test_interp_limit_no_nans(self): # GH 7173 s = pd.Series([1., 2., 3.]) result = s.interpolate(limit=1) expected = s assert_series_equal(result, expected)
def test_interp_combo(self): df = DataFrame({'A': [1., 2., np.nan, 4.], 'B': [1, 4, 9, np.nan], 'C': [1, 2, 3, 5], 'D': list('abcd')}) result = df['A'].interpolate() expected = Series([1., 2., 3., 4.], name='A') assert_series_equal(result, expected) result = df['A'].interpolate(downcast='infer') expected = Series([1, 2, 3, 4], name='A') assert_series_equal(result, expected)