Python numpy 模块,ma() 实例源码
我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用numpy.ma()。
def timer(s, v='', nloop=500, nrep=3):
units = ["s", "ms", "µs", "ns"]
scaling = [1, 1e3, 1e6, 1e9]
print("%s : %-50s : " % (v, s), end=' ')
varnames = ["%ss,nm%ss,%sl,nm%sl" % tuple(x*4) for x in 'xyz']
setup = 'from __main__ import numpy, ma, %s' % ','.join(varnames)
Timer = timeit.Timer(stmt=s, setup=setup)
best = min(Timer.repeat(nrep, nloop)) / nloop
if best > 0.0:
order = min(-int(numpy.floor(numpy.log10(best)) // 3), 3)
else:
order = 3
print("%d loops, best of %d: %.*g %s per loop" % (nloop, nrep,
3,
best * scaling[order],
units[order]))
def timer(s, v='', nloop=500, nrep=3):
units = ["s", "ms", "µs", "ns"]
scaling = [1, 1e3, 1e6, 1e9]
print("%s : %-50s : " % (v, s), end=' ')
varnames = ["%ss,nm%ss,%sl,nm%sl" % tuple(x*4) for x in 'xyz']
setup = 'from __main__ import numpy, ma, %s' % ','.join(varnames)
Timer = timeit.Timer(stmt=s, setup=setup)
best = min(Timer.repeat(nrep, nloop)) / nloop
if best > 0.0:
order = min(-int(numpy.floor(numpy.log10(best)) // 3), 3)
else:
order = 3
print("%d loops, best of %d: %.*g %s per loop" % (nloop, nrep,
3,
best * scaling[order],
units[order]))
def fn_getma(fn, bnum=1):
"""Get masked array from input filename
Parameters
----------
fn : str
Input filename string
bnum : int, optional
Band number
Returns
-------
np.ma.array
Masked array containing raster values
"""
#Add check for filename existence
ds = fn_getds(fn)
return ds_getma(ds, bnum=bnum)
#Given input dataset, return a masked array for the input band
def ds_getma(ds, bnum=1):
"""Get masked array from input GDAL Dataset
Parameters
----------
ds : gdal.Dataset
Input GDAL Datset
bnum : int, optional
Band number
Returns
-------
np.ma.array
Masked array containing raster values
"""
b = ds.GetRasterBand(bnum)
return b_getma(b)
#Given input band, return a masked array
def b_getma(b):
"""Get masked array from input GDAL Band
Parameters
----------
b : gdal.Band
Input GDAL Band
Returns
-------
np.ma.array
Masked array containing raster values
"""
b_ndv = get_ndv_b(b)
#bma = np.ma.masked_equal(b.ReadAsArray(), b_ndv)
#This is more appropriate for float, handles precision issues
bma = np.ma.masked_values(b.ReadAsArray(), b_ndv)
return bma
def timer(s, v='', nloop=500, nrep=3):
units = ["s", "ms", "µs", "ns"]
scaling = [1, 1e3, 1e6, 1e9]
print("%s : %-50s : " % (v, s), end=' ')
varnames = ["%ss,nm%ss,%sl,nm%sl" % tuple(x*4) for x in 'xyz']
setup = 'from __main__ import numpy, ma, %s' % ','.join(varnames)
Timer = timeit.Timer(stmt=s, setup=setup)
best = min(Timer.repeat(nrep, nloop)) / nloop
if best > 0.0:
order = min(-int(numpy.floor(numpy.log10(best)) // 3), 3)
else:
order = 3
print("%d loops, best of %d: %.*g %s per loop" % (nloop, nrep,
3,
best * scaling[order],
units[order]))
def timer(s, v='', nloop=500, nrep=3):
units = ["s", "ms", "µs", "ns"]
scaling = [1, 1e3, 1e6, 1e9]
print("%s : %-50s : " % (v, s), end=' ')
varnames = ["%ss,nm%ss,%sl,nm%sl" % tuple(x*4) for x in 'xyz']
setup = 'from __main__ import numpy, ma, %s' % ','.join(varnames)
Timer = timeit.Timer(stmt=s, setup=setup)
best = min(Timer.repeat(nrep, nloop)) / nloop
if best > 0.0:
order = min(-int(numpy.floor(numpy.log10(best)) // 3), 3)
else:
order = 3
print("%d loops, best of %d: %.*g %s per loop" % (nloop, nrep,
3,
best * scaling[order],
units[order]))
def timer(s, v='', nloop=500, nrep=3):
units = ["s", "ms", "µs", "ns"]
scaling = [1, 1e3, 1e6, 1e9]
print("%s : %-50s : " % (v, s), end=' ')
varnames = ["%ss,nm%ss,%sl,nm%sl" % tuple(x*4) for x in 'xyz']
setup = 'from __main__ import numpy, ma, %s' % ','.join(varnames)
Timer = timeit.Timer(stmt=s, setup=setup)
best = min(Timer.repeat(nrep, nloop)) / nloop
if best > 0.0:
order = min(-int(numpy.floor(numpy.log10(best)) // 3), 3)
else:
order = 3
print("%d loops, best of %d: %.*g %s per loop" % (nloop, nrep,
3,
best * scaling[order],
units[order]))
def test_testPut(self):
# Test of put
with suppress_warnings() as sup:
sup.filter(
np.ma.core.MaskedArrayFutureWarning,
"setting an item on a masked array which has a "
"shared mask will not copy")
d = arange(5)
n = [0, 0, 0, 1, 1]
m = make_mask(n)
x = array(d, mask=m)
self.assertTrue(x[3] is masked)
self.assertTrue(x[4] is masked)
x[[1, 4]] = [10, 40]
self.assertTrue(x.mask is not m)
self.assertTrue(x[3] is masked)
self.assertTrue(x[4] is not masked)
self.assertTrue(eq(x, [0, 10, 2, -1, 40]))
x = array(d, mask=m)
x.put([0, 1, 2], [-1, 100, 200])
self.assertTrue(eq(x, [-1, 100, 200, 0, 0]))
self.assertTrue(x[3] is masked)
self.assertTrue(x[4] is masked)
def timer(s, v='', nloop=500, nrep=3):
units = ["s", "ms", "µs", "ns"]
scaling = [1, 1e3, 1e6, 1e9]
print("%s : %-50s : " % (v, s), end=' ')
varnames = ["%ss,nm%ss,%sl,nm%sl" % tuple(x*4) for x in 'xyz']
setup = 'from __main__ import numpy, ma, %s' % ','.join(varnames)
Timer = timeit.Timer(stmt=s, setup=setup)
best = min(Timer.repeat(nrep, nloop)) / nloop
if best > 0.0:
order = min(-int(numpy.floor(numpy.log10(best)) // 3), 3)
else:
order = 3
print("%d loops, best of %d: %.*g %s per loop" % (nloop, nrep,
3,
best * scaling[order],
units[order]))
def timer(s, v='', nloop=500, nrep=3):
units = ["s", "ms", "µs", "ns"]
scaling = [1, 1e3, 1e6, 1e9]
print("%s : %-50s : " % (v, s), end=' ')
varnames = ["%ss,nm%ss,%sl,nm%sl" % tuple(x*4) for x in 'xyz']
setup = 'from __main__ import numpy, ma, %s' % ','.join(varnames)
Timer = timeit.Timer(stmt=s, setup=setup)
best = min(Timer.repeat(nrep, nloop)) / nloop
if best > 0.0:
order = min(-int(numpy.floor(numpy.log10(best)) // 3), 3)
else:
order = 3
print("%d loops, best of %d: %.*g %s per loop" % (nloop, nrep,
3,
best * scaling[order],
units[order]))
def test_testMixedArithmetic(self):
na = np.array([1])
ma = array([1])
self.assertTrue(isinstance(na + ma, MaskedArray))
self.assertTrue(isinstance(ma + na, MaskedArray))
def test_testUfuncRegression(self):
f_invalid_ignore = [
'sqrt', 'arctanh', 'arcsin', 'arccos',
'arccosh', 'arctanh', 'log', 'log10', 'divide',
'true_divide', 'floor_divide', 'remainder', 'fmod']
for f in ['sqrt', 'log', 'log10', 'exp', 'conjugate',
'sin', 'cos', 'tan',
'arcsin', 'arccos', 'arctan',
'sinh', 'cosh', 'tanh',
'arcsinh',
'arccosh',
'arctanh',
'absolute', 'fabs', 'negative',
'floor', 'ceil',
'logical_not',
'add', 'subtract', 'multiply',
'divide', 'true_divide', 'floor_divide',
'remainder', 'fmod', 'hypot', 'arctan2',
'equal', 'not_equal', 'less_equal', 'greater_equal',
'less', 'greater',
'logical_and', 'logical_or', 'logical_xor']:
try:
uf = getattr(umath, f)
except AttributeError:
uf = getattr(fromnumeric, f)
mf = getattr(np.ma, f)
args = self.d[:uf.nin]
with np.errstate():
if f in f_invalid_ignore:
np.seterr(invalid='ignore')
if f in ['arctanh', 'log', 'log10']:
np.seterr(divide='ignore')
ur = uf(*args)
mr = mf(*args)
self.assertTrue(eq(ur.filled(0), mr.filled(0), f))
self.assertTrue(eqmask(ur.mask, mr.mask))
def compare_functions_1v(func, nloop=500,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
funcname = func.__name__
print("-"*50)
print("%s on small arrays" % funcname)
module, data = "numpy.ma", "nmxs"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
print("%s on large arrays" % funcname)
module, data = "numpy.ma", "nmxl"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
return
def compare_methods(methodname, args, vars='x', nloop=500, test=True,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
print("-"*50)
print("%s on small arrays" % methodname)
data, ver = "nm%ss" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
print("%s on large arrays" % methodname)
data, ver = "nm%sl" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
return
def test_testMixedArithmetic(self):
na = np.array([1])
ma = array([1])
self.assertTrue(isinstance(na + ma, MaskedArray))
self.assertTrue(isinstance(ma + na, MaskedArray))
def test_testUfuncRegression(self):
f_invalid_ignore = [
'sqrt', 'arctanh', 'arcsin', 'arccos',
'arccosh', 'arctanh', 'log', 'log10', 'divide',
'true_divide', 'floor_divide', 'remainder', 'fmod']
for f in ['sqrt', 'log', 'log10', 'exp', 'conjugate',
'sin', 'cos', 'tan',
'arcsin', 'arccos', 'arctan',
'sinh', 'cosh', 'tanh',
'arcsinh',
'arccosh',
'arctanh',
'absolute', 'fabs', 'negative',
'floor', 'ceil',
'logical_not',
'add', 'subtract', 'multiply',
'divide', 'true_divide', 'floor_divide',
'remainder', 'fmod', 'hypot', 'arctan2',
'equal', 'not_equal', 'less_equal', 'greater_equal',
'less', 'greater',
'logical_and', 'logical_or', 'logical_xor']:
try:
uf = getattr(umath, f)
except AttributeError:
uf = getattr(fromnumeric, f)
mf = getattr(np.ma, f)
args = self.d[:uf.nin]
with np.errstate():
if f in f_invalid_ignore:
np.seterr(invalid='ignore')
if f in ['arctanh', 'log', 'log10']:
np.seterr(divide='ignore')
ur = uf(*args)
mr = mf(*args)
self.assertTrue(eq(ur.filled(0), mr.filled(0), f))
self.assertTrue(eqmask(ur.mask, mr.mask))
def compare_functions_1v(func, nloop=500,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
funcname = func.__name__
print("-"*50)
print("%s on small arrays" % funcname)
module, data = "numpy.ma", "nmxs"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
print("%s on large arrays" % funcname)
module, data = "numpy.ma", "nmxl"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
return
def compare_methods(methodname, args, vars='x', nloop=500, test=True,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
print("-"*50)
print("%s on small arrays" % methodname)
data, ver = "nm%ss" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
print("%s on large arrays" % methodname)
data, ver = "nm%sl" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
return
def ds_getma_sub(src_ds, bnum=1, scale=None, maxdim=1024.):
"""Load a subsampled array, rather than full resolution
This is useful when working with large rasters
Uses buf_xsize and buf_ysize options from GDAL ReadAsArray method.
Parameters
----------
ds : gdal.Dataset
Input GDAL Datset
bnum : int, optional
Band number
scale : int, optional
Scaling factor
maxdim : int, optional
Maximum dimension along either axis, in pixels
Returns
-------
np.ma.array
Masked array containing raster values
"""
#print src_ds.GetFileList()[0]
b = src_ds.GetRasterBand(bnum)
b_ndv = get_ndv_b(b)
ns, nl = get_sub_dim(src_ds, scale, maxdim)
#The buf_size parameters determine the final array dimensions
b_array = b.ReadAsArray(buf_xsize=ns, buf_ysize=nl)
bma = np.ma.masked_values(b_array, b_ndv)
return bma
#Note: need to consolidate with warplib.writeout (takes ds, not ma)
#Add option to build overviews when writing GTiff
#Input proj must be WKT
def replace_ndv(b, new_ndv):
b_ndv = get_ndv_b(b)
bma = np.ma.masked_values(b.ReadAsArray(), b_ndv)
bma.set_fill_value(new_ndv)
b.WriteArray(bma.filled())
b.SetNoDataValue(new_ndv)
return b
def test_testMixedArithmetic(self):
na = np.array([1])
ma = array([1])
self.assertTrue(isinstance(na + ma, MaskedArray))
self.assertTrue(isinstance(ma + na, MaskedArray))
def test_testUfuncRegression(self):
f_invalid_ignore = [
'sqrt', 'arctanh', 'arcsin', 'arccos',
'arccosh', 'arctanh', 'log', 'log10', 'divide',
'true_divide', 'floor_divide', 'remainder', 'fmod']
for f in ['sqrt', 'log', 'log10', 'exp', 'conjugate',
'sin', 'cos', 'tan',
'arcsin', 'arccos', 'arctan',
'sinh', 'cosh', 'tanh',
'arcsinh',
'arccosh',
'arctanh',
'absolute', 'fabs', 'negative',
# 'nonzero', 'around',
'floor', 'ceil',
# 'sometrue', 'alltrue',
'logical_not',
'add', 'subtract', 'multiply',
'divide', 'true_divide', 'floor_divide',
'remainder', 'fmod', 'hypot', 'arctan2',
'equal', 'not_equal', 'less_equal', 'greater_equal',
'less', 'greater',
'logical_and', 'logical_or', 'logical_xor']:
try:
uf = getattr(umath, f)
except AttributeError:
uf = getattr(fromnumeric, f)
mf = getattr(np.ma, f)
args = self.d[:uf.nin]
with np.errstate():
if f in f_invalid_ignore:
np.seterr(invalid='ignore')
if f in ['arctanh', 'log', 'log10']:
np.seterr(divide='ignore')
ur = uf(*args)
mr = mf(*args)
self.assertTrue(eq(ur.filled(0), mr.filled(0), f))
self.assertTrue(eqmask(ur.mask, mr.mask))
def compare_functions_1v(func, nloop=500,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
funcname = func.__name__
print("-"*50)
print("%s on small arrays" % funcname)
module, data = "numpy.ma", "nmxs"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
print("%s on large arrays" % funcname)
module, data = "numpy.ma", "nmxl"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
return
def compare_methods(methodname, args, vars='x', nloop=500, test=True,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
print("-"*50)
print("%s on small arrays" % methodname)
data, ver = "nm%ss" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
print("%s on large arrays" % methodname)
data, ver = "nm%sl" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
return
def test_testMixedArithmetic(self):
na = np.array([1])
ma = array([1])
self.assertTrue(isinstance(na + ma, MaskedArray))
self.assertTrue(isinstance(ma + na, MaskedArray))
def test_testUfuncRegression(self):
f_invalid_ignore = [
'sqrt', 'arctanh', 'arcsin', 'arccos',
'arccosh', 'arctanh', 'log', 'log10', 'divide',
'true_divide', 'floor_divide', 'remainder', 'fmod']
for f in ['sqrt', 'log', 'log10', 'exp', 'conjugate',
'sin', 'cos', 'tan',
'arcsin', 'arccos', 'arctan',
'sinh', 'cosh', 'tanh',
'arcsinh',
'arccosh',
'arctanh',
'absolute', 'fabs', 'negative',
# 'nonzero', 'around',
'floor', 'ceil',
# 'sometrue', 'alltrue',
'logical_not',
'add', 'subtract', 'multiply',
'divide', 'true_divide', 'floor_divide',
'remainder', 'fmod', 'hypot', 'arctan2',
'equal', 'not_equal', 'less_equal', 'greater_equal',
'less', 'greater',
'logical_and', 'logical_or', 'logical_xor']:
try:
uf = getattr(umath, f)
except AttributeError:
uf = getattr(fromnumeric, f)
mf = getattr(np.ma, f)
args = self.d[:uf.nin]
with np.errstate():
if f in f_invalid_ignore:
np.seterr(invalid='ignore')
if f in ['arctanh', 'log', 'log10']:
np.seterr(divide='ignore')
ur = uf(*args)
mr = mf(*args)
self.assertTrue(eq(ur.filled(0), mr.filled(0), f))
self.assertTrue(eqmask(ur.mask, mr.mask))
def compare_functions_1v(func, nloop=500,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
funcname = func.__name__
print("-"*50)
print("%s on small arrays" % funcname)
module, data = "numpy.ma", "nmxs"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
print("%s on large arrays" % funcname)
module, data = "numpy.ma", "nmxl"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
return
def compare_methods(methodname, args, vars='x', nloop=500, test=True,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
print("-"*50)
print("%s on small arrays" % methodname)
data, ver = "nm%ss" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
print("%s on large arrays" % methodname)
data, ver = "nm%sl" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
return
def _get_stblock(self, data_input, hnd, mdlt, start_frame=None):
goodness = False
if start_frame is None:
start_frame = random.randint(0, len(data_input['min_length'])-self.step*(self.nframes-1)-1)
stblock = numpy.zeros([self.nframes, self.block_size, self.block_size])
for ii in xrange(self.nframes):
v = data_input[hnd][mdlt][start_frame + ii * self.step]
mm = abs(numpy.ma.maximum(v))
if mm > 0.:
# normalize to zero mean, unit variance,
# concatenate in spatio-temporal blocks
stblock[ii] = self.prenormalize(v)
goodness = True
return stblock, goodness
def test_testMixedArithmetic(self):
na = np.array([1])
ma = array([1])
self.assertTrue(isinstance(na + ma, MaskedArray))
self.assertTrue(isinstance(ma + na, MaskedArray))
def test_testUfuncRegression(self):
f_invalid_ignore = [
'sqrt', 'arctanh', 'arcsin', 'arccos',
'arccosh', 'arctanh', 'log', 'log10', 'divide',
'true_divide', 'floor_divide', 'remainder', 'fmod']
for f in ['sqrt', 'log', 'log10', 'exp', 'conjugate',
'sin', 'cos', 'tan',
'arcsin', 'arccos', 'arctan',
'sinh', 'cosh', 'tanh',
'arcsinh',
'arccosh',
'arctanh',
'absolute', 'fabs', 'negative',
'floor', 'ceil',
'logical_not',
'add', 'subtract', 'multiply',
'divide', 'true_divide', 'floor_divide',
'remainder', 'fmod', 'hypot', 'arctan2',
'equal', 'not_equal', 'less_equal', 'greater_equal',
'less', 'greater',
'logical_and', 'logical_or', 'logical_xor']:
try:
uf = getattr(umath, f)
except AttributeError:
uf = getattr(fromnumeric, f)
mf = getattr(np.ma, f)
args = self.d[:uf.nin]
with np.errstate():
if f in f_invalid_ignore:
np.seterr(invalid='ignore')
if f in ['arctanh', 'log', 'log10']:
np.seterr(divide='ignore')
ur = uf(*args)
mr = mf(*args)
self.assertTrue(eq(ur.filled(0), mr.filled(0), f))
self.assertTrue(eqmask(ur.mask, mr.mask))
def compare_functions_1v(func, nloop=500,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
funcname = func.__name__
print("-"*50)
print("%s on small arrays" % funcname)
module, data = "numpy.ma", "nmxs"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
print("%s on large arrays" % funcname)
module, data = "numpy.ma", "nmxl"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
return
def compare_methods(methodname, args, vars='x', nloop=500, test=True,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
print("-"*50)
print("%s on small arrays" % methodname)
data, ver = "nm%ss" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
print("%s on large arrays" % methodname)
data, ver = "nm%sl" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
return
def test_testMixedArithmetic(self):
na = np.array([1])
ma = array([1])
self.assertTrue(isinstance(na + ma, MaskedArray))
self.assertTrue(isinstance(ma + na, MaskedArray))
def test_testUfuncRegression(self):
f_invalid_ignore = [
'sqrt', 'arctanh', 'arcsin', 'arccos',
'arccosh', 'arctanh', 'log', 'log10', 'divide',
'true_divide', 'floor_divide', 'remainder', 'fmod']
for f in ['sqrt', 'log', 'log10', 'exp', 'conjugate',
'sin', 'cos', 'tan',
'arcsin', 'arccos', 'arctan',
'sinh', 'cosh', 'tanh',
'arcsinh',
'arccosh',
'arctanh',
'absolute', 'fabs', 'negative',
'floor', 'ceil',
'logical_not',
'add', 'subtract', 'multiply',
'divide', 'true_divide', 'floor_divide',
'remainder', 'fmod', 'hypot', 'arctan2',
'equal', 'not_equal', 'less_equal', 'greater_equal',
'less', 'greater',
'logical_and', 'logical_or', 'logical_xor']:
try:
uf = getattr(umath, f)
except AttributeError:
uf = getattr(fromnumeric, f)
mf = getattr(np.ma, f)
args = self.d[:uf.nin]
with np.errstate():
if f in f_invalid_ignore:
np.seterr(invalid='ignore')
if f in ['arctanh', 'log', 'log10']:
np.seterr(divide='ignore')
ur = uf(*args)
mr = mf(*args)
self.assertTrue(eq(ur.filled(0), mr.filled(0), f))
self.assertTrue(eqmask(ur.mask, mr.mask))
def compare_methods(methodname, args, vars='x', nloop=500, test=True,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
print("-"*50)
print("%s on small arrays" % methodname)
data, ver = "nm%ss" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
print("%s on large arrays" % methodname)
data, ver = "nm%sl" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
return
def compare_functions_2v(func, nloop=500, test=True,
xs=xs, nmxs=nmxs,
ys=ys, nmys=nmys,
xl=xl, nmxl=nmxl,
yl=yl, nmyl=nmyl):
funcname = func.__name__
print("-"*50)
print("%s on small arrays" % funcname)
module, data = "numpy.ma", "nmxs,nmys"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
print("%s on large arrays" % funcname)
module, data = "numpy.ma", "nmxl,nmyl"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
return
def test_testMixedArithmetic(self):
na = np.array([1])
ma = array([1])
self.assertTrue(isinstance(na + ma, MaskedArray))
self.assertTrue(isinstance(ma + na, MaskedArray))
def test_testUfuncRegression(self):
f_invalid_ignore = [
'sqrt', 'arctanh', 'arcsin', 'arccos',
'arccosh', 'arctanh', 'log', 'log10', 'divide',
'true_divide', 'floor_divide', 'remainder', 'fmod']
for f in ['sqrt', 'log', 'log10', 'exp', 'conjugate',
'sin', 'cos', 'tan',
'arcsin', 'arccos', 'arctan',
'sinh', 'cosh', 'tanh',
'arcsinh',
'arccosh',
'arctanh',
'absolute', 'fabs', 'negative',
'floor', 'ceil',
'logical_not',
'add', 'subtract', 'multiply',
'divide', 'true_divide', 'floor_divide',
'remainder', 'fmod', 'hypot', 'arctan2',
'equal', 'not_equal', 'less_equal', 'greater_equal',
'less', 'greater',
'logical_and', 'logical_or', 'logical_xor']:
try:
uf = getattr(umath, f)
except AttributeError:
uf = getattr(fromnumeric, f)
mf = getattr(np.ma, f)
args = self.d[:uf.nin]
with np.errstate():
if f in f_invalid_ignore:
np.seterr(invalid='ignore')
if f in ['arctanh', 'log', 'log10']:
np.seterr(divide='ignore')
ur = uf(*args)
mr = mf(*args)
self.assertTrue(eq(ur.filled(0), mr.filled(0), f))
self.assertTrue(eqmask(ur.mask, mr.mask))
def compare_functions_1v(func, nloop=500,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
funcname = func.__name__
print("-"*50)
print("%s on small arrays" % funcname)
module, data = "numpy.ma", "nmxs"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
print("%s on large arrays" % funcname)
module, data = "numpy.ma", "nmxl"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
return
def compare_methods(methodname, args, vars='x', nloop=500, test=True,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
print("-"*50)
print("%s on small arrays" % methodname)
data, ver = "nm%ss" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
print("%s on large arrays" % methodname)
data, ver = "nm%sl" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
return
def test_testCopySize(self):
# Tests of some subtle points of copying and sizing.
with suppress_warnings() as sup:
sup.filter(
np.ma.core.MaskedArrayFutureWarning,
"setting an item on a masked array which has a "
"shared mask will not copy")
n = [0, 0, 1, 0, 0]
m = make_mask(n)
m2 = make_mask(m)
self.assertTrue(m is m2)
m3 = make_mask(m, copy=1)
self.assertTrue(m is not m3)
x1 = np.arange(5)
y1 = array(x1, mask=m)
self.assertTrue(y1._data is not x1)
self.assertTrue(allequal(x1, y1._data))
self.assertTrue(y1.mask is m)
y1a = array(y1, copy=0)
self.assertTrue(y1a.mask is y1.mask)
y2 = array(x1, mask=m, copy=0)
self.assertTrue(y2.mask is m)
self.assertTrue(y2[2] is masked)
y2[2] = 9
self.assertTrue(y2[2] is not masked)
self.assertTrue(y2.mask is not m)
self.assertTrue(allequal(y2.mask, 0))
y3 = array(x1 * 1.0, mask=m)
self.assertTrue(filled(y3).dtype is (x1 * 1.0).dtype)
x4 = arange(4)
x4[2] = masked
y4 = resize(x4, (8,))
self.assertTrue(eq(concatenate([x4, x4]), y4))
self.assertTrue(eq(getmask(y4), [0, 0, 1, 0, 0, 0, 1, 0]))
y5 = repeat(x4, (2, 2, 2, 2), axis=0)
self.assertTrue(eq(y5, [0, 0, 1, 1, 2, 2, 3, 3]))
y6 = repeat(x4, 2, axis=0)
self.assertTrue(eq(y5, y6))
def composite(reducers, *rasters, normalize='sum', nodata=-9999.0, dtype=np.float32):
'''
NOTE: Uses masked arrays in NumPy and therefore is MUCH slower than the
`composite2()` function, which is equivalent in output.
Creates a multi-image (multi-date) composite from input rasters. The
reducers argument specifies, in the order of the bands (endmembers), how
to pick a value for that band in each pixel. If None is given, then the
median value of that band from across the images is used for that pixel
value. If None is specified as a reducer, the corresponding band(s) will
be dropped. Combining None reducer(s) with a normalized sum effectively
subtracts an endmember under the unity constraint. Arguments:
reducers One of ('min', 'max', 'mean', 'median', None) for each endmember
rasters One or more raster files to composite
normalize True (by default) to normalize results by their sum
nodata The NoData value (defaults to -9999)
dtype The data type to coerce in the output array; very important if the desired output is float but NoData value is integer
'''
shp = rasters[0].shape
num_non_null_bands = shp[0] - len([b for b in reducers if b is None])
assert all(map(lambda x: x == shp, [r.shape for r in rasters])), 'Rasters must have the same shape'
assert len(reducers) == shp[0], 'Must provide a reducer for each band (including None to drop the band)'
# Swap the sequence of rasters for a sequence of bands, then collapse the X-Y axes
stack = np.array(rasters).swapaxes(0, 1).reshape(shp[0], len(rasters), shp[-1]*shp[-2])
# Mask out NoData values
stack_masked = np.ma.masked_where(stack == nodata, stack)
# For each band (or endmember)...
band_arrays = []
for i in range(shp[0]):
if reducers[i] in ('min', 'max', 'median', 'mean'):
band_arrays.append(getattr(np.ma, reducers[i])(stack_masked[i, ...], axis=0))
# Stack each reduced band (and reshape to multi-band image)
final_stack = np.ma.vstack(band_arrays).reshape((num_non_null_bands, shp[-2], shp[-1]))
# Calculate a normalized sum (e.g., fractions must sum to one)
if normalize is not None:
constant = getattr(final_stack, normalize)(axis=0) # The sum across the bands
constant.set_fill_value(1.0) # NaNs will be divided by 1.0
constant = np.ma.repeat(constant, num_non_null_bands, axis=0).reshape(final_stack.shape)
# Divide the values in each band by the normalized sum across the bands
if num_non_null_bands > 1:
final_stack = final_stack / constant.swapaxes(0, 1)
else:
final_stack = final_stack / constant
# NOTE: Essential to cast type, e.g., to float in case first pixel (i.e. top-left) is all NoData of an integer type
final_stack.set_fill_value(dtype(nodata)) # Fill NoData for NaNs
return final_stack.filled()