Python numpy 模块,ndim() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.ndim()。
def apply(self, old_values, step):
"""Apply the boundary.
Args:
old_values: Old values of the points in the boundary.
step: Time step of the simulation (required if signals are to be applied).
Returns:
New values for the points in the boundary.
"""
if np.ndim(self.value) == 0 or \
(np.ndim(self.value) == 1 and type(self.value) == list):
# if a single value or a list of single values for each index is given
return self.additive * old_values + self.value
elif type(self.value) == np.ndarray:
# if a signal is given
return self.additive * old_values + self.value[step]
else:
# if a list of signals for each index is given
return [self.additive * old_values[ii] + signal[step]
for ii, signal in enumerate(self.value)]
def forward(self, input):
""":math:`\\varphi(\\mathbf{x})_j =
\\frac{e^{\mathbf{x}_j}}{\sum_{k=1}^K e^{\mathbf{x}_k}}`
where :math:`K` is the total number of neurons in the layer. This
activation function gets applied row-wise.
Parameters
----------
x : float32
The activation (the summed, weighted input of a neuron).
Returns
-------
float32 where the sum of the row is 1 and each single value is in [0, 1]
The output of the softmax function applied to the activation.
"""
assert np.ndim(input) == 2
self.last_forward = input
x = input - np.max(input, axis=1, keepdims=True)
exp_x = np.exp(x)
s = exp_x / np.sum(exp_x, axis=1, keepdims=True)
return s
def forward(self, input, *args, **kwargs):
assert np.ndim(input) == 3, 'Only support batch training.'
self.last_input = input
nb_batch, nb_timestep, nb_in = input.shape
output = _zero((nb_batch, nb_timestep, self.n_out))
if len(self.activations) == 0:
self.activations = [self.activation_cls() for _ in range(nb_timestep)]
output[:, 0, :] = self.activations[0].forward(np.dot(input[:, 0, :], self.W) + self.b)
for i in range(1, nb_timestep):
output[:, i, :] = self.activations[i].forward(
np.dot(input[:, i, :], self.W) +
np.dot(output[:, i - 1, :], self.U) + self.b)
self.last_output = output
if self.return_sequence:
return self.last_output
else:
return self.last_output[:, -1, :]
def test_MeanPooling():
from npdl.layers import MeanPooling
pool = MeanPooling((2, 2))
pool.connect_to(PreLayer((10, 1, 20, 30)))
assert pool.out_shape == (10, 1, 10, 15)
with pytest.raises(ValueError):
pool.forward(np.random.rand(10, 10))
with pytest.raises(ValueError):
pool.backward(np.random.rand(10, 20))
assert np.ndim(pool.forward(np.random.rand(10, 20, 30))) == 3
assert np.ndim(pool.backward(np.random.rand(10, 20, 30))) == 3
assert np.ndim(pool.forward(np.random.rand(10, 1, 20, 30))) == 4
assert np.ndim(pool.backward(np.random.rand(10, 1, 20, 30))) == 4
def test_MaxPooling():
from npdl.layers import MaxPooling
pool = MaxPooling((2, 2))
pool.connect_to(PreLayer((10, 1, 20, 30)))
assert pool.out_shape == (10, 1, 10, 15)
with pytest.raises(ValueError):
pool.forward(np.random.rand(10, 10))
with pytest.raises(ValueError):
pool.backward(np.random.rand(10, 20))
assert np.ndim(pool.forward(np.random.rand(10, 20, 30))) == 3
assert np.ndim(pool.backward(np.random.rand(10, 20, 30))) == 3
assert np.ndim(pool.forward(np.random.rand(10, 1, 20, 30))) == 4
assert np.ndim(pool.backward(np.random.rand(10, 1, 20, 30))) == 4
def test_LSTM():
for seq in (True, False):
layer = LSTM(n_out=200, n_in=100, return_sequence=seq)
assert layer.out_shape is None
layer.connect_to()
assert len(layer.out_shape) == (3 if seq else 2)
input = np.random.rand(10, 50, 100)
mask = np.random.randint(0, 2, (10, 50))
assert np.ndim(layer.forward(input, mask)) == (3 if seq else 2)
with pytest.raises(NotImplementedError):
layer.backward(None)
assert len(layer.params) == 12
assert len(layer.grads) == 12
def add(self, outputs, targets):
outputs = to_numpy(outputs)
targets = to_numpy(targets)
if np.ndim(targets) == 2:
targets = np.argmax(targets, 1)
assert np.ndim(outputs) == 2, 'wrong output size (2D expected)'
assert np.ndim(targets) == 1, 'wrong target size (1D or 2D expected)'
assert targets.shape[0] == outputs.shape[0], 'number of outputs and targets do not match'
top_k = self.top_k
max_k = int(top_k[-1])
predict = torch.from_numpy(outputs).topk(max_k, 1, True, True)[1].numpy()
correct = (predict == targets[:, np.newaxis].repeat(predict.shape[1], 1))
self.size += targets.shape[0]
for k in top_k:
self.corrects[k] += correct[:, :k].sum()
def hessian(self, x, d=None):
"""
Computes Hessian matrix
"""
d = calc_distances(x) if d is None else d
if d.ndim == 1: d = squareform(d)
H = np.zeros((3*len(x), 3*len(x)))
n = self.n
for i in range(len(x)):
for j in range(len(x)):
if j == i: continue
dx = x[i]-x[j]
r = d[i,j]
h = n / r**(0.5*n+2) * ((n+2) * np.multiply.outer(dx,dx) - np.eye(3) * r)
H[3*i:3*(i+1), 3*j:3*(j+1)] = -h
H[3*i:3*(i+1), 3*i:3*(i+1)] += h
return H
def rdf(coords, bins=100, r_max=None):
"""
Radial distribution function
Parameters
----------
coords :
list of coordinate arrays
bins : int or numpy array
distance bins
r_max : positive float or None
maximum distance
"""
if np.ndim(coords) == 2: coords = [coords]
d = np.sqrt(np.concatenate(map(calc_distances, coords), 0))
if r_max is not None: d = d[d<r_max]
g, bins = np.histogram(d, bins=bins)
r = 0.5 * (bins[1:]+bins[:-1])
return r, g/r**2
def add(self, output, target):
if torch.is_tensor(output):
output = output.cpu().squeeze().numpy()
if torch.is_tensor(target):
target = target.cpu().squeeze().numpy()
elif isinstance(target, numbers.Number):
target = np.asarray([target])
assert np.ndim(output) == 1, \
'wrong output size (1D expected)'
assert np.ndim(target) == 1, \
'wrong target size (1D expected)'
assert output.shape[0] == target.shape[0], \
'number of outputs and targets does not match'
assert np.all(np.add(np.equal(target, 1), np.equal(target, 0))), \
'targets should be binary (0, 1)'
self.scores = np.append(self.scores, output)
self.targets = np.append(self.targets, target)
def outer(self, a, b):
"""
Return the function applied to the outer product of a and b.
"""
(da, db) = (getdata(a), getdata(b))
d = self.f.outer(da, db)
ma = getmask(a)
mb = getmask(b)
if ma is nomask and mb is nomask:
m = nomask
else:
ma = getmaskarray(a)
mb = getmaskarray(b)
m = umath.logical_or.outer(ma, mb)
if (not m.ndim) and m:
return masked
if m is not nomask:
np.copyto(d, da, where=m)
if not d.shape:
return d
masked_d = d.view(get_masked_subclass(a, b))
masked_d._mask = m
return masked_d
def round(self, decimals=0, out=None):
"""
Return an array rounded a to the given number of decimals.
Refer to `numpy.around` for full documentation.
See Also
--------
numpy.around : equivalent function
"""
result = self._data.round(decimals=decimals, out=out).view(type(self))
if result.ndim > 0:
result._mask = self._mask
result._update_from(self)
elif self._mask:
# Return masked when the scalar is masked
result = masked
# No explicit output: we're done
if out is None:
return result
if isinstance(out, MaskedArray):
out.__setmask__(self._mask)
return out
def _infer_interval_breaks(coord, kind=None):
"""
Interpolate the bounds from the data in coord
Parameters
----------
%(CFDecoder.get_plotbounds.parameters.no_ignore_shape)s
Returns
-------
%(CFDecoder.get_plotbounds.returns)s
Notes
-----
this currently only works for rectilinear grids"""
if coord.ndim == 1:
return _infer_interval_breaks(coord)
elif coord.ndim == 2:
from scipy.interpolate import interp2d
kind = kind or rcParams['decoder.interp_kind']
y, x = map(np.arange, coord.shape)
new_x, new_y = map(_infer_interval_breaks, [x, y])
coord = np.asarray(coord)
return interp2d(x, y, coord, kind=kind, copy=False)(new_x, new_y)
def cov2corr(cov):
"""Calculate the correlation matrix based on a
covariance matrix
Parameters
----------
cov: 2D array
Returns
-------
corr: 2D array
correlation converted from the covarince matrix
"""
assert cov.ndim == 2, 'covariance matrix should be 2D array'
inv_sd = 1 / np.sqrt(np.diag(cov))
corr = cov * inv_sd[None, :] * inv_sd[:, None]
return corr
def _crop_roi(fullframe, roisz):
xpos = roisz[0]
ypos = roisz[1]
xlen = roisz[2]
ylen = roisz[3]
# numpy array indexing: lines are the first index => y direction goes first
chan = np.ndim(fullframe)
if xpos == -1:
cropped = np.zeros((36, 36))
else:
if chan == 2:
cropped = fullframe[ypos:ypos+ylen, xpos:xpos+xlen]
elif chan == 3:
cropped = fullframe[ypos:ypos + ylen, xpos:xpos + xlen, :]
else:
raise Exception('unsupported nb of channels')
return cropped
def Energy_Estimate(data, pauli_list):
"""Compute expectation value of a list of diagonal Paulis with
coefficients given measurement data. If somePaulis are non-diagonal
appropriate post-rotations had to be performed in the collection of data
Args:
data : output of the execution of a quantum program
pauli_list : list of [coeff, Pauli]
Returns:
The expectation value
"""
energy = 0
if np.ndim(pauli_list) == 1:
energy = pauli_list[0] * measure_pauli_z(data, pauli_list[1])
else:
for p in pauli_list:
energy += p[0] * measure_pauli_z(data, p[1])
return energy
def outer(self, a, b):
"""
Return the function applied to the outer product of a and b.
"""
(da, db) = (getdata(a), getdata(b))
d = self.f.outer(da, db)
ma = getmask(a)
mb = getmask(b)
if ma is nomask and mb is nomask:
m = nomask
else:
ma = getmaskarray(a)
mb = getmaskarray(b)
m = umath.logical_or.outer(ma, mb)
if (not m.ndim) and m:
return masked
if m is not nomask:
np.copyto(d, da, where=m)
if not d.shape:
return d
masked_d = d.view(get_masked_subclass(a, b))
masked_d._mask = m
return masked_d
def round(self, decimals=0, out=None):
"""
Return an array rounded a to the given number of decimals.
Refer to `numpy.around` for full documentation.
See Also
--------
numpy.around : equivalent function
"""
result = self._data.round(decimals=decimals, out=out).view(type(self))
if result.ndim > 0:
result._mask = self._mask
result._update_from(self)
elif self._mask:
# Return masked when the scalar is masked
result = masked
# No explicit output: we're done
if out is None:
return result
if isinstance(out, MaskedArray):
out.__setmask__(self._mask)
return out
def capInf(x, copy=False):
x = np.array(x, copy=copy)
mn = np.finfo(x.dtype).min
mx = np.finfo(x.dtype).max
if x.ndim == 0:
if x < mn:
x[...] = mn
if x > mx:
x[...] = mx
else:
x[x < mn] = mn
x[x > mx] = mx
return x
def capZero(x, copy=False):
"""
Notes: If copy is False and x is a numpy array,
then x is modified in place.
"""
x = np.array(x, copy=copy)
tiny = np.finfo(x.dtype).tiny
if x.ndim == 0:
if x < tiny:
x[...] = tiny
else:
x[x < tiny] = tiny
return x
def image_preprocess(obs, resize_width, resize_height, to_gray):
"""Applies basic preprocessing for image observations.
Args:
obs (numpy.ndarray): 2-D or 3-D uint8 type image.
resize_width (int): Resize width. To disable resize, pass None.
resize_height (int): Resize height. To disable resize, pass None.
to_gray (bool): Converts image to grayscale.
Returns (numpy.ndarray):
Processed 3-D float type image.
"""
processed_obs = np.squeeze(obs)
if to_gray:
processed_obs = cv2.cvtColor(processed_obs, cv2.COLOR_RGB2GRAY)
if resize_height and resize_width:
processed_obs = cv2.resize(processed_obs, (resize_height, resize_width))
if np.ndim(processed_obs) == 2:
processed_obs = np.expand_dims(processed_obs, 2)
return processed_obs
def add(self, other, idx):
if other.ndim == 2 and self.ndim == 1:
self = KernelMatrix(np.diag(self))
if self.ndim == 1:
self[idx] += other
else:
if other.ndim == 1:
self[idx, idx] += other
else:
self._setcliques(idx)
idx = ((idx, idx) if isinstance(idx, slice)
else (idx[:, None], idx))
self[idx] += other
return self
def inv(self, logdet=False):
if self.ndim == 1:
inv = 1.0/self
if logdet:
return inv, np.sum(np.log(self))
else:
return inv
else:
try:
cf = sl.cho_factor(self)
inv = sl.cho_solve(cf, np.identity(cf[0].shape[0]))
if logdet:
ld = 2.0*np.sum(np.log(np.diag(cf[0])))
except np.linalg.LinAlgError:
u, s, v = np.linalg.svd(self)
inv = np.dot(u/s, u.T)
if logdet:
ld = np.sum(np.log(s))
if logdet:
return inv, ld
else:
return inv
def solve(self, other, left_array=None, logdet=False):
if other.ndim == 1:
if left_array is None:
ret = self._solve_D1(other)
elif left_array is not None and left_array.ndim == 1:
ret = self._solve_1D1(other, left_array)
elif left_array is not None and left_array.ndim == 2:
ret = np.dot(left_array.T, self._solve_D1(other))
else:
raise TypeError
elif other.ndim == 2:
if left_array is None:
raise TypeError
elif left_array is not None and left_array.ndim == 2:
ret = self._solve_2D2(other, left_array)
elif left_array is not None and left_array.ndim == 1:
ret = np.dot(other.T, self._solve_D1(left_array))
else:
raise TypeError
else:
raise TypeError
return (ret, self._get_logdet()) if logdet else ret
def outer(self, a, b):
"""
Return the function applied to the outer product of a and b.
"""
(da, db) = (getdata(a), getdata(b))
d = self.f.outer(da, db)
ma = getmask(a)
mb = getmask(b)
if ma is nomask and mb is nomask:
m = nomask
else:
ma = getmaskarray(a)
mb = getmaskarray(b)
m = umath.logical_or.outer(ma, mb)
if (not m.ndim) and m:
return masked
if m is not nomask:
np.copyto(d, da, where=m)
if not d.shape:
return d
masked_d = d.view(get_masked_subclass(a, b))
masked_d._mask = m
masked_d._update_from(d)
return masked_d
def __call__(self, *args, **params):
methodname = self.__name__
instance = self.obj
# Fallback : if the instance has not been initialized, use the first
# arg
if instance is None:
args = list(args)
instance = args.pop(0)
data = instance._data
mask = instance._mask
cls = type(instance)
result = getattr(data, methodname)(*args, **params).view(cls)
result._update_from(instance)
if result.ndim:
if not self._onmask:
result.__setmask__(mask)
elif mask is not nomask:
result.__setmask__(getattr(mask, methodname)(*args, **params))
else:
if mask.ndim and (not mask.dtype.names and mask.all()):
return masked
return result
def test_valid_fit(self):
obs = [np.array([1, 1]), np.array([[1, 1], [2, 2]])]
called = [False, False]
def init(x):
self.assertEqual(len(x), len(obs))
self.assertEqual(np.ndim(x[0]), 2)
called[0] = True
def fit(x):
self.assertEqual(len(x), len(obs))
self.assertEqual(np.ndim(x[0]), 2)
called[1] = True
self.hmm.init_callback = init
self.hmm.fit_callback = fit
self.hmm.fit(obs)
self.assertEqual(self.hmm.n_features_, 2)
self.assertTrue(called[0])
self.assertTrue(called[1])
called[0], called[1] = False, False
self.hmm.fit(obs)
self.assertFalse(called[0])
self.assertTrue(called[1])
def ensure_ndarray(A, shape=None, uniform=None, ndim=None, size=None, dtype=None, kind=None):
r""" Ensures A is an ndarray and does an assert_array with the given parameters
Returns
-------
A : ndarray
If A is already an ndarray, it is just returned. Otherwise this is an independent copy as an ndarray
"""
if not isinstance(A, np.ndarray):
try:
A = np.array(A)
except:
raise AssertionError('Given argument cannot be converted to an ndarray:\n'+str(A))
assert_array(A, shape=shape, uniform=uniform, ndim=ndim, size=size, dtype=dtype, kind=kind)
return A
def ensure_ndarray_or_sparse(A, shape=None, uniform=None, ndim=None, size=None, dtype=None, kind=None):
r""" Ensures A is an ndarray or a scipy sparse matrix and does an assert_array with the given parameters
Returns
-------
A : ndarray
If A is already an ndarray, it is just returned. Otherwise this is an independent copy as an ndarray
"""
if not isinstance(A, np.ndarray) and not scisp.issparse(A):
try:
A = np.array(A)
except:
raise AssertionError('Given argument cannot be converted to an ndarray:\n'+str(A))
assert_array(A, shape=shape, uniform=uniform, ndim=ndim, size=size, dtype=dtype, kind=kind)
return A
def outer(self, a, b):
"""
Return the function applied to the outer product of a and b.
"""
(da, db) = (getdata(a), getdata(b))
d = self.f.outer(da, db)
ma = getmask(a)
mb = getmask(b)
if ma is nomask and mb is nomask:
m = nomask
else:
ma = getmaskarray(a)
mb = getmaskarray(b)
m = umath.logical_or.outer(ma, mb)
if (not m.ndim) and m:
return masked
if m is not nomask:
np.copyto(d, da, where=m)
if not d.shape:
return d
masked_d = d.view(get_masked_subclass(a, b))
masked_d._mask = m
masked_d._update_from(d)
return masked_d
def __call__(self, *args, **params):
methodname = self.__name__
instance = self.obj
# Fallback : if the instance has not been initialized, use the first
# arg
if instance is None:
args = list(args)
instance = args.pop(0)
data = instance._data
mask = instance._mask
cls = type(instance)
result = getattr(data, methodname)(*args, **params).view(cls)
result._update_from(instance)
if result.ndim:
if not self._onmask:
result.__setmask__(mask)
elif mask is not nomask:
result.__setmask__(getattr(mask, methodname)(*args, **params))
else:
if mask.ndim and (not mask.dtype.names and mask.all()):
return masked
return result
def _sort(group_idx, a, size, fill_value, dtype=None, reversed_=False):
if np.iscomplexobj(a):
raise NotImplementedError("a must be real, could use np.lexsort or "
"sort with recarray for complex.")
if not (np.isscalar(fill_value) or len(fill_value) == 0):
raise ValueError("fill_value must be scalar or an empty sequence")
if reversed_:
order_group_idx = np.argsort(group_idx + -1j * a, kind='mergesort')
else:
order_group_idx = np.argsort(group_idx + 1j * a, kind='mergesort')
counts = np.bincount(group_idx, minlength=size)
if np.ndim(a) == 0:
a = np.full(size, a, dtype=type(a))
ret = np.split(a[order_group_idx], np.cumsum(counts)[:-1])
ret = np.asarray(ret, dtype=object)
if np.isscalar(fill_value):
fill_untouched(group_idx, ret, fill_value)
return ret
def _sum(group_idx, a, size, fill_value, dtype=None):
dtype = minimum_dtype_scalar(fill_value, dtype, a)
if np.ndim(a) == 0:
ret = np.bincount(group_idx, minlength=size).astype(dtype)
if a != 1:
ret *= a
else:
if np.iscomplexobj(a):
ret = np.empty(size, dtype=dtype)
ret.real = np.bincount(group_idx, weights=a.real,
minlength=size)
ret.imag = np.bincount(group_idx, weights=a.imag,
minlength=size)
else:
ret = np.bincount(group_idx, weights=a,
minlength=size).astype(dtype)
if fill_value != 0:
fill_untouched(group_idx, ret, fill_value)
return ret
def _mean(group_idx, a, size, fill_value, dtype=np.dtype(np.float64)):
if np.ndim(a) == 0:
raise ValueError("cannot take mean with scalar a")
counts = np.bincount(group_idx, minlength=size)
if np.iscomplexobj(a):
dtype = a.dtype # TODO: this is a bit clumsy
sums = np.empty(size, dtype=dtype)
sums.real = np.bincount(group_idx, weights=a.real,
minlength=size)
sums.imag = np.bincount(group_idx, weights=a.imag,
minlength=size)
else:
sums = np.bincount(group_idx, weights=a,
minlength=size).astype(dtype)
with np.errstate(divide='ignore'):
ret = sums.astype(dtype) / counts
if not np.isnan(fill_value):
ret[counts == 0] = fill_value
return ret
def outer(self, a, b):
"""
Return the function applied to the outer product of a and b.
"""
(da, db) = (getdata(a), getdata(b))
d = self.f.outer(da, db)
ma = getmask(a)
mb = getmask(b)
if ma is nomask and mb is nomask:
m = nomask
else:
ma = getmaskarray(a)
mb = getmaskarray(b)
m = umath.logical_or.outer(ma, mb)
if (not m.ndim) and m:
return masked
if m is not nomask:
np.copyto(d, da, where=m)
if not d.shape:
return d
masked_d = d.view(get_masked_subclass(a, b))
masked_d._mask = m
return masked_d
def round(self, decimals=0, out=None):
"""
Return each element rounded to the given number of decimals.
Refer to `numpy.around` for full documentation.
See Also
--------
ndarray.around : corresponding function for ndarrays
numpy.around : equivalent function
"""
result = self._data.round(decimals=decimals, out=out).view(type(self))
if result.ndim > 0:
result._mask = self._mask
result._update_from(self)
elif self._mask:
# Return masked when the scalar is masked
result = masked
# No explicit output: we're done
if out is None:
return result
if isinstance(out, MaskedArray):
out.__setmask__(self._mask)
return out
def after_run(self, _run_context, run_values):
fetches_batch = run_values.results
for fetches in unbatch_dict(fetches_batch):
# Convert to unicode
fetches["predicted_tokens"] = np.char.decode(
fetches["predicted_tokens"].astype("S"), "utf-8")
predicted_tokens = fetches["predicted_tokens"]
# If we're using beam search we take the first beam
if np.ndim(predicted_tokens) > 1:
predicted_tokens = predicted_tokens[:, 0]
fetches["features.source_tokens"] = np.char.decode(
fetches["features.source_tokens"].astype("S"), "utf-8")
source_tokens = fetches["features.source_tokens"]
source_len = fetches["features.source_len"]
if self._unk_replace_fn is not None:
# We slice the attention scores so that we do not
# accidentially replace UNK with a SEQUENCE_END token
attention_scores = fetches["attention_scores"]
attention_scores = attention_scores[:, :source_len - 1]
predicted_tokens = self._unk_replace_fn(
source_tokens=source_tokens,
predicted_tokens=predicted_tokens,
attention_scores=attention_scores)
sent = self.params["delimiter"].join(predicted_tokens).split(
"SEQUENCE_END")[0]
# Apply postproc
if self._postproc_fn:
sent = self._postproc_fn(sent)
sent = sent.strip()
print(sent)
def forward(self, input, *args, **kwargs):
assert np.ndim(input) == 3, 'Only support batch training.'
# record
self.last_input = input
# dim
nb_batch, nb_timesteps, nb_in = input.shape
# outputs
output = _zero((nb_batch, nb_timesteps, self.n_out))
# forward
for i in range(nb_timesteps):
# data
s_pre = _zero((nb_batch, self.n_out)) if i == 0 else output[:, i - 1, :]
x_now = input[:, i, :]
# computation
z_now = self.gate_activation.forward(np.dot(x_now, self.U_z) +
np.dot(s_pre, self.W_z) +
self.b_z)
r_now = self.gate_activation.forward(np.dot(x_now, self.U_r) +
np.dot(s_pre, self.W_r) +
self.b_r)
h_now = self.activation.forward(np.dot(x_now, self.U_h) +
np.dot(s_pre * r_now, self.W_h) +
self.b_h)
output[:, i, :] = (1 - z_now) * h_now + z_now * s_pre
# record
self.last_output = output
# return
if self.return_sequence:
return self.last_output
else:
return self.last_output[:, -1, :]
def forward(self, input, *args, **kwargs):
assert np.ndim(input) == 2
self.last_input = input
return self.embed_words[input]
def backward(self, pre_grad, *args, **kwargs):
new_h, new_w = self.out_shape[-2:]
pool_h, pool_w = self.pool_size
length = np.prod(self.pool_size)
layer_grads = _zero(self.input_shape)
if np.ndim(pre_grad) == 4:
nb_batch, nb_axis, _, _ = pre_grad.shape
for a in np.arange(nb_batch):
for b in np.arange(nb_axis):
for h in np.arange(new_h):
for w in np.arange(new_w):
h_shift, w_shift = h * pool_h, w * pool_w
layer_grads[a, b, h_shift: h_shift + pool_h, w_shift: w_shift + pool_w] = \
pre_grad[a, b, h, w] / length
elif np.ndim(pre_grad) == 3:
nb_batch, _, _ = pre_grad.shape
for a in np.arange(nb_batch):
for h in np.arange(new_h):
for w in np.arange(new_w):
h_shift, w_shift = h * pool_h, w * pool_w
layer_grads[a, h_shift: h_shift + pool_h, w_shift: w_shift + pool_w] = \
pre_grad[a, h, w] / length
else:
raise ValueError()
return layer_grads
def forward(self, input, *args, **kwargs):
# shape
self.input_shape = input.shape
pool_h, pool_w = self.pool_size
new_h, new_w = self.out_shape[-2:]
# forward
self.last_input = input
outputs = _zero(self.input_shape[:-2] + self.out_shape[-2:])
if np.ndim(input) == 4:
nb_batch, nb_axis, _, _ = input.shape
for a in np.arange(nb_batch):
for b in np.arange(nb_axis):
for h in np.arange(new_h):
for w in np.arange(new_w):
outputs[a, b, h, w] = np.max(input[a, b, h:h + pool_h, w:w + pool_w])
elif np.ndim(input) == 3:
nb_batch, _, _ = input.shape
for a in np.arange(nb_batch):
for h in np.arange(new_h):
for w in np.arange(new_w):
outputs[a, h, w] = np.max(input[a, h:h + pool_h, w:w + pool_w])
else:
raise ValueError()
return outputs
def backward(self, pre_grad, *args, **kwargs):
new_h, new_w = self.out_shape[-2:]
pool_h, pool_w = self.pool_size
layer_grads = _zero(self.input_shape)
if np.ndim(pre_grad) == 4:
nb_batch, nb_axis, _, _ = pre_grad.shape
for a in np.arange(nb_batch):
for b in np.arange(nb_axis):
for h in np.arange(new_h):
for w in np.arange(new_w):
patch = self.last_input[a, b, h:h + pool_h, w:w + pool_w]
max_idx = np.unravel_index(patch.argmax(), patch.shape)
h_shift, w_shift = h * pool_h + max_idx[0], w * pool_w + max_idx[1]
layer_grads[a, b, h_shift, w_shift] = pre_grad[a, b, a, w]
elif np.ndim(pre_grad) == 3:
nb_batch, _, _ = pre_grad.shape
for a in np.arange(nb_batch):
for h in np.arange(new_h):
for w in np.arange(new_w):
patch = self.last_input[a, h:h + pool_h, w:w + pool_w]
max_idx = np.unravel_index(patch.argmax(), patch.shape)
h_shift, w_shift = h * pool_h + max_idx[0], w * pool_w + max_idx[1]
layer_grads[a, h_shift, w_shift] = pre_grad[a, a, w]
else:
raise ValueError()
return layer_grads
def test_MeanSquaredError():
from npdl.objectives import MeanSquaredError
obj = MeanSquaredError()
outputs = np.random.rand(10, 20)
targets = np.random.rand(10, 20)
f_res = obj.forward(outputs, targets)
b_res = obj.backward(outputs, targets)
assert np.ndim(f_res) == 0
assert np.ndim(b_res) == 2
def test_HellingerDistance():
from npdl.objectives import HellingerDistance
obj = HellingerDistance()
outputs = np.random.random((10, 20))
targets = np.random.random((10, 20))
f_res = obj.forward(outputs, targets)
b_res = obj.backward(outputs, targets)
assert np.ndim(f_res) == 0
assert np.ndim(b_res) == 2
def test_BinaryCrossEntropy():
from npdl.objectives import BinaryCrossEntropy
obj = BinaryCrossEntropy()
outputs = np.random.randint(0, 2, (10, 1))
targets = np.random.randint(0, 2, (10, 1))
f_res = obj.forward(outputs, targets)
b_res = obj.backward(outputs, targets)
assert np.ndim(f_res) == 0
assert np.ndim(b_res) == 2
def test_SoftmaxCategoricalCrossEntropy():
from npdl.objectives import SoftmaxCategoricalCrossEntropy
obj = SoftmaxCategoricalCrossEntropy()
outputs = np.random.random((10, 20))
targets = np.random.random((10, 20))
f_res = obj.forward(outputs, targets)
b_res = obj.backward(outputs, targets)
assert np.ndim(f_res) == 0
assert np.ndim(b_res) == 2
def toscalar(arg):
arg = npp.checksize(arg, 1)
r = np.ndim(arg)
if r == 1: arg = arg[0]
elif r == 2: arg = arg[0, 0]
return arg
def tondim2(arg, ndim1tocolumn=False, copy=False):
r = np.ndim(arg)
if r == 0: arg = np.array(((arg,),))
elif r == 1:
arg = np.array((arg,))
if ndim1tocolumn: arg = arg.T
return np.array(arg, copy=copy)
def checker(input_var, desire_size):
'''
check if debug = 1
'''
if input_var is None:
print('input_variable does not exist!')
if desire_size is None:
print('desire_size does not exist!')
dd = numpy.size(desire_size)
dims = numpy.shape(input_var)
# print('dd=',dd,'dims=',dims)
if numpy.isnan(numpy.sum(input_var[:])):
print('input has NaN')
if numpy.ndim(input_var) < dd:
print('input signal has too few dimensions')
if dd > 1:
if dims[0:dd] != desire_size[0:dd]:
print(dims[0:dd])
print(desire_size)
print('input signal has wrong size1')
elif dd == 1:
if dims[0] != desire_size:
print(dims[0])
print(desire_size)
print('input signal has wrong size2')
if numpy.mod(numpy.prod(dims), numpy.prod(desire_size)) != 0:
print('input signal shape is not multiples of desired size!')
def _create_kspace_sampling_density(nufft):
"""
Compute kspace sampling density from the nufft object
"""
y = numpy.ones((nufft.st['M'],),dtype = numpy.complex64)
nufft.y = nufft.thr.to_device(y)
nufft._y2k()
w = numpy.abs( nufft.k_Kd2.get())#**2) ))
nufft.st['w'] = w#self.nufftobj.vec2k(w)
RTR=nufft.st['w'] # see __init__() in class "nufft"
return RTR
# def _create_laplacian_kernel(nufft):
# #===============================================================================
# # # # Laplacian oeprator, convolution kernel in spatial domain
# # # related to constraint
# #===============================================================================
# uker = numpy.zeros(nufft.st['Kd'][:],dtype=numpy.complex64,order='C')
# n_dims= numpy.size(nufft.st['Nd'])
#
# if n_dims == 1:
# uker[0] = -2.0
# uker[1] = 1.0
# uker[-1] = 1.0
# elif n_dims == 2:
# uker[0,0] = -4.0
# uker[1,0] = 1.0
# uker[-1,0] = 1.0
# uker[0,1] = 1.0
# uker[0,-1] = 1.0
# elif n_dims == 3:
# uker[0,0,0] = -6.0
# uker[1,0,0] = 1.0
# uker[-1,0,0] = 1.0
# uker[0,1,0] = 1.0
# uker[0,-1,0] = 1.0
# uker[0,0,1] = 1.0
# uker[0,0,-1] = 1.0
#
# uker =numpy.fft.fftn(uker) #, self.nufftobj.st['Kd'], range(0,numpy.ndim(uker)))
# return uker