我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用scipy.linalg.cho_factor()。
def wrap_constr_jacob_func(constr_jacob): """Convenience function to wrap function calculating constraint Jacobian. Produces a function which returns a dictionary with entry with key `dc_dpos` corresponding to calculated constraint Jacobian and optionally also entry with key `gram_chol` for Cholesky decomposition of Gram matrix if keyword argument `calc_gram_chol` is True. """ def constr_jacob_wrapper(pos, calc_gram_chol=True): jacob = constr_jacob(pos) cache = {'dc_dpos': jacob} if calc_gram_chol: cache['gram_chol'] = la.cho_factor(jacob.dot(jacob.T)) return cache return constr_jacob_wrapper
def project_onto_nullspace(mom, cache): """ Projects a momentum on to the nullspace of the constraint Jacobian. Parameters ---------- mom : vector Momentum state to project. cache : dictionary Dictionary of cached constraint Jacobian results. Returns ------- mom : vector Momentum state projected into nullspace of constraint Jacobian. """ dc_dpos = cache['dc_dpos'] if 'gram_chol' not in cache: cache['gram_chol'] = la.cho_factor(dc_dpos.dot(dc_dpos.T)) gram_chol = cache['gram_chol'] dc_dpos_mom = dc_dpos.dot(mom) gram_inv_dc_dpos_mom = la.cho_solve(gram_chol, dc_dpos_mom) dc_dpos_pinv_dc_dpos_mom = dc_dpos.T.dot(gram_inv_dc_dpos_mom) mom -= dc_dpos_pinv_dc_dpos_mom return mom
def nci(x, m, P): # dimension of state, # time steps, # MC simulations, # inference algorithms (filters/smoothers) d, time, mc_sims, algs = m.shape dx = x[..., na] - m # Mean Square Error matrix MSE = np.empty((d, d, time, mc_sims, algs)) for k in range(time): for s in range(mc_sims): for alg in range(algs): MSE[..., k, s, alg] = np.outer(dx[..., k, s, alg], dx[..., k, s, alg]) MSE = MSE.mean(axis=3) # average over MC simulations # dx_iP_dx = np.empty((1, time, mc_sims, algs)) NCI = np.empty((1, time, mc_sims, algs)) for k in range(1, time): for s in range(mc_sims): for alg in range(algs): # iP_dx = cho_solve(cho_factor(P[:, :, k, s, alg]), dx[:, k, s, alg]) # dx_iP_dx[:, k, s, alg] = dx[:, k, s, alg].dot(iP_dx) # iMSE_dx = cho_solve(cho_factor(MSE[..., k, fi]), dx[:, k, s, alg]) # NCI[..., k, s, fi] = 10*np.log10(dx_iP_dx[:, k, s, fi]) - 10*np.log10(dx[:, k, s, alg].dot(iMSE_dx)) dx_iP_dx = dx[:, k, s, alg].dot(np.linalg.inv(P[..., k, s, alg])).dot(dx[:, k, s, alg]) dx_iMSE_dx = dx[:, k, s, alg].dot(np.linalg.inv(MSE[..., k, alg])).dot(dx[:, k, s, alg]) NCI[..., k, s, alg] = 10 * np.log10(dx_iP_dx) - 10 * np.log10(dx_iMSE_dx) return NCI[:, 1:, ...].mean(axis=1) # average over time steps (ignore the 1st)
def check_pd(A, lower=True): """ Checks if A is PD. If so returns True and Cholesky decomposition, otherwise returns False and None """ try: return True, np.tril(cho_factor(A, lower=lower)[0]) except LinAlgError as err: if 'not positive definite' in str(err): return False, None
def kalman_filter(y, H, R, F, Q, mu, PI, z=None): """ Given the following sequences (one item of given dimension for each time step): - *y*: measurements (M) - *H*: measurement operator (MxN) - *R*: measurement noise covariance (MxM) - *F*: time update operator (NxN) - *Q*: time update noise covariance (NxN) - *mu*: initial state (N) - *PI*: initial state covariance (NxN) - *z*: (optional) systematic time update input (N) Return the :class:`FilterResult` containing lists of posterior state estimates and error covariances. """ x_hat = [] P = [] x_hat_prior = mu P_prior = PI if z is None: z = repeat(None) for i, (y_i, H_i, R_i, F_i, Q_i, z_i) in enumerate(izip(y, H, R, F, Q, z)): # measurement update A = cho_factor(NP.matmul(H_i, NP.matmul(P_prior, H_i.T)) + R_i) B = cho_solve(A, NP.matmul(H_i, P_prior)) b = cho_solve(A, y_i - NP.matmul(H_i, x_hat_prior)) C = NP.matmul(P_prior, H_i.T) x_hat.append(x_hat_prior + NP.matmul(C, b)) P.append(P_prior - NP.matmul(C, B)) # time update x_hat_prior = NP.matmul(F_i, x_hat[-1]) if z_i is not None: x_hat_prior += z_i P_prior = NP.matmul(F_i, NP.matmul(P[-1], F_i.T)) + Q_i return FilterResult(x_hat, P)
def solve(self, other): if other.ndim == 1: Nx = np.array(other / self.N) elif other.ndim == 2: Nx = np.array(other / self.N[:,None]) UNx = np.dot(self.U.T, Nx) Sigma = np.diag(1/self.J) + np.dot(self.U.T, self.U/self.N[:,None]) cf = sl.cho_factor(Sigma) if UNx.ndim == 1: tmp = np.dot(self.U, sl.cho_solve(cf, UNx)) / self.N else: tmp = np.dot(self.U, sl.cho_solve(cf, UNx)) / self.N[:,None] return Nx - tmp
def logdet(self): Sigma = np.diag(1/self.J) + np.dot(self.U.T, self.U/self.N[:,None]) cf = sl.cho_factor(Sigma) ld = np.sum(np.log(self.N)) + np.sum(np.log(self.J)) ld += np.sum(2*np.log(np.diag(cf[0]))) return ld
def __init__(self, x): if sps.issparse(x): x = x.toarray() self.cf = sl.cho_factor(x)
def _solve_ZNX(self, X, Z): """Solves :math:`Z^T N^{-1}X`, where :math:`X` and :math:`Z` are 1-d or 2-d arrays. """ if X.ndim == 1: X = X.reshape(X.shape[0], 1) if Z.ndim == 1: Z = Z.reshape(Z.shape[0], 1) n, m = Z.shape[1], X.shape[1] ZNX = np.zeros((n, m)) if len(self._idx) > 0: ZNXr = np.dot(Z[self._idx,:].T, X[self._idx,:] / self._nvec[self._idx, None]) else: ZNXr = 0 for slc, block in zip(self._slices, self._blocks): Zblock = Z[slc, :] Xblock = X[slc, :] if slc.stop - slc.start > 1: cf = sl.cho_factor(block+np.diag(self._nvec[slc])) bx = sl.cho_solve(cf, Xblock) else: bx = Xblock / self._nvec[slc][:, None] ZNX += np.dot(Zblock.T, bx) ZNX += ZNXr return ZNX.squeeze() if len(ZNX) > 1 else float(ZNX)
def _solve_NX(self, X): """Solves :math:`N^{-1}X`, where :math:`X` is a 1-d or 2-d array. """ if X.ndim == 1: X = X.reshape(X.shape[0], 1) NX = X / self._nvec[:,None] for slc, block in zip(self._slices, self._blocks): Xblock = X[slc, :] if slc.stop - slc.start > 1: cf = sl.cho_factor(block+np.diag(self._nvec[slc])) NX[slc] = sl.cho_solve(cf, Xblock) return NX.squeeze()
def _get_logdet(self): """Returns log determinant of :math:`N+UJU^{T}` where :math:`U` is a quantization matrix. """ if len(self._idx) > 0: logdet = np.sum(np.log(self._nvec[self._idx])) else: logdet = 0 for slc, block in zip(self._slices, self._blocks): if slc.stop - slc.start > 1: cf = sl.cho_factor(block+np.diag(self._nvec[slc])) logdet += np.sum(2*np.log(np.diag(cf[0]))) else: logdet += np.sum(np.log(self._nvec[slc])) return logdet
def __init__(self, Sigma): self._cholesky = la.cho_factor(Sigma)
def _measurement_update(self, y): gain = cho_solve(cho_factor(self.z_cov_pred), self.xz_cov).T self.x_mean_filt = self.x_mean_pred + gain.dot(y - self.z_mean_pred) self.x_cov_filt = self.x_cov_pred - gain.dot(self.z_cov_pred).dot(gain.T)
def _smoothing_update(self): gain = cho_solve(cho_factor(self.x_cov_pred), self.xx_cov).T self.x_mean_smooth = self.x_mean_filt + gain.dot(self.x_mean_smooth - self.x_mean_pred) self.x_cov_smooth = self.x_cov_filt + gain.dot(self.x_cov_smooth - self.x_cov_pred).dot(gain.T)
def weights_rbf(self, unit_sp, hypers): # BQ weights for RBF kernel with given hypers, computations adopted from the GP-ADF code [Deisenroth] with # the following assumptions: # (A1) the uncertain input is zero-mean with unit covariance # (A2) one set of hyper-parameters is used for all output dimensions (one GP models all outputs) d, n = unit_sp.shape # GP kernel hyper-parameters alpha, el, jitter = hypers['sig_var'], hypers['lengthscale'], hypers['noise_var'] assert len(el) == d # pre-allocation for convenience eye_d, eye_n = np.eye(d), np.eye(n) iLam1 = np.atleast_2d(np.diag(el ** -1)) # sqrt(Lambda^-1) iLam2 = np.atleast_2d(np.diag(el ** -2)) inp = unit_sp.T.dot(iLam1) # sigmas / el[:, na] (x - m)^T*sqrt(Lambda^-1) # (numSP, xdim) K = np.exp(2 * np.log(alpha) - 0.5 * maha(inp, inp)) iK = cho_solve(cho_factor(K + jitter * eye_n), eye_n) B = iLam2 + eye_d # (D, D) c = alpha ** 2 / np.sqrt(det(B)) t = inp.dot(inv(B)) # inn*(P + Lambda)^-1 l = np.exp(-0.5 * np.sum(inp * t, 1)) # (N, 1) zet = 2 * np.log(alpha) - 0.5 * np.sum(inp * inp, 1) inp = inp.dot(iLam1) R = 2 * iLam2 + eye_d t = 1 / np.sqrt(det(R)) L = np.exp((zet[:, na] + zet[:, na].T) + maha(inp, -inp, V=0.5 * inv(R))) q = c * l # evaluations of the kernel mean map (from the viewpoint of RHKS methods) # mean weights wm_f = q.dot(iK) iKQ = iK.dot(t * L) # covariance weights wc_f = iKQ.dot(iK) # cross-covariance "weights" wc_fx = np.diag(q).dot(iK) # used for self.D.dot(x - mean).dot(wc_fx).dot(fx) self.D = inv(eye_d + np.diag(el ** 2)) # S(S+Lam)^-1; for S=I, (I+Lam)^-1 # model variance; to be added to the covariance # this diagonal form assumes independent GP outputs (cov(f^a, f^b) = 0 for all a, b: a neq b) self.model_var = np.diag((alpha ** 2 - np.trace(iKQ)) * np.ones((d, 1))) return wm_f, wc_f, wc_fx
def plot_gp_model(self, f, unit_sp, args, test_range=(-5, 5, 50), plot_dims=(0, 0)): # plot out_dim vs. in_dim in_dim, out_dim = plot_dims # test input must have the same dimension as specified in kernel test = np.linspace(*test_range) test_pts = np.zeros((self.d, len(test))) test_pts[in_dim, :] = test # function value observations at training points (unit sigma-points) y = np.apply_along_axis(f, 0, unit_sp, args) fx = np.apply_along_axis(f, 0, test_pts, args) # function values at test points K = self.kern.K(unit_sp.T) # covariances between sigma-points k = self.kern.K(test_pts.T, unit_sp.T) # covariance between test inputs and sigma-points kxx = self.kern.Kdiag(test_pts.T) # prior predictive variance k_iK = cho_solve(cho_factor(K), k.T).T gp_mean = k_iK.dot(y[out_dim, :]) # GP mean gp_var = np.diag(np.diag(kxx) - k_iK.dot(k.T)) # GP predictive variance # plot the GP mean, predictive variance and the true function plt.figure() plt.plot(test, fx[out_dim, :], color='r', ls='--', lw=2, label='true') plt.plot(test, gp_mean, color='b', ls='-', lw=2, label='GP mean') plt.fill_between(test, gp_mean + 2 * np.sqrt(gp_var), gp_mean - 2 * np.sqrt(gp_var), color='b', alpha=0.25, label='GP variance') plt.plot(unit_sp[in_dim, :], y[out_dim, :], color='k', ls='', marker='o', ms=8, label='data') plt.legend() plt.show()
def __call__(self, xs, phiinv_method='partition'): # map parameter vector if needed params = xs if isinstance(xs,dict) else self.pta.map_params(xs) # phiinvs will be a list or may be a big matrix if spatially # correlated signals TNrs = self.pta.get_TNr(params) TNTs = self.pta.get_TNT(params) phiinvs = self.pta.get_phiinv(params, logdet=True, method=phiinv_method) # get -0.5 * (rNr + logdet_N) piece of likelihood loglike = -0.5 * np.sum([l for l in self.pta.get_rNr_logdet(params)]) # red noise piece if self.pta._commonsignals: phiinv, logdet_phi = phiinvs Sigma = self._make_sigma(TNTs, phiinv) TNr = np.concatenate(TNrs) cf = cholesky(Sigma) expval = cf(TNr) logdet_sigma = cf.logdet() loglike += 0.5*(np.dot(TNr, expval) - logdet_sigma - logdet_phi) else: for TNr, TNT, (phiinv, logdet_phi) in zip(TNrs, TNTs, phiinvs): Sigma = TNT + (np.diag(phiinv) if phiinv.ndim == 1 else phiinv) try: cf = sl.cho_factor(Sigma) expval = sl.cho_solve(cf, TNr) except: return -np.inf logdet_sigma = np.sum(2 * np.log(np.diag(cf[0]))) loglike += 0.5*(np.dot(TNr, expval) - logdet_sigma - logdet_phi) return loglike
def get_phiinv_byfreq_cliques(self, params, logdet=False, cholesky=False): phi = self.get_phi(params, cliques=True) if isinstance(phi, list): return [None if phivec is None else phivec.inv(logdet) for phivec in phi] else: ld = 0 # first invert all the cliques for clcount in range(self._clcount): idx = (self._cliques == clcount) if np.any(idx): idx2 = np.ix_(idx,idx) if cholesky: cf = sl.cho_factor(phi[idx2]) if logdet: ld += 2.0*np.sum(np.log(np.diag(cf[0]))) phi[idx2] = sl.cho_solve( cf, np.identity(cf[0].shape[0])) else: phi2 = phi[idx2] if logdet: ld += np.linalg.slogdet(phi2)[1] phi[idx2] = np.linalg.inv(phi2) # then do the pure diagonal terms idx = (self._cliques == -1) if logdet: ld += np.sum(np.log(phi[idx,idx])) phi[idx,idx] = 1.0/phi[idx,idx] return (phi, ld) if logdet else phi # we use "cliques" to account for sparse non-diagonal Phi matrices # for each value in self._cliques, the matrix indices with that value form # an independent submatrix that can be inverted separately # reset clique index
def weights_rbf(self, unit_sp, hypers): d, n = unit_sp.shape # GP kernel hyper-parameters alpha, el, jitter = hypers['sig_var'], hypers['lengthscale'], hypers['noise_var'] assert len(el) == d # pre-allocation for convenience eye_d, eye_n, eye_y = np.eye(d), np.eye(n), np.eye(n + d * n) K = self.kern_eq_der(unit_sp, hypers) # evaluate kernel matrix BOTTLENECK iK = cho_solve(cho_factor(K + jitter * eye_y), eye_y) # invert kernel matrix BOTTLENECK Lam = np.diag(el ** 2) iLam = np.diag(el ** -1) # sqrt(Lambda^-1) iiLam = np.diag(el ** -2) # Lambda^-1 inn = iLam.dot(unit_sp) # (x-m)^T*iLam # (N, D) B = iiLam + eye_d # P*Lambda^-1+I, (P+Lam)^-1 = Lam^-1*(P*Lam^-1+I)^-1 # (D, D) cho_B = cho_factor(B) t = cho_solve(cho_B, inn) # dot(inn, inv(B)) # (x-m)^T*iLam*(P+Lambda)^-1 # (D, N) l = np.exp(-0.5 * np.sum(inn * t, 0)) # (N, 1) q = (alpha ** 2 / np.sqrt(det(B))) * l # (N, 1) Sig_q = cho_solve(cho_B, eye_d) # B^-1*I eta = Sig_q.dot(unit_sp) # (D,N) Sig_q*x mu_q = iiLam.dot(eta) # (D,N) r = q[na, :] * iiLam.dot(mu_q - unit_sp) # -t.dot(iLam) * q # (D, N) q_tilde = np.hstack((q.T, r.T.ravel())) # (1, N+N*D) # weights for mean wm = q_tilde.dot(iK) # quantities for cross-covariance "weights" iLamSig = iiLam.dot(Sig_q) # (D,D) r_tilde = (q[na, na, :] * iLamSig[..., na] + mu_q[na, ...] * r[:, na, :]).T.reshape(n * d, d).T # (D, N*D) R_tilde = np.hstack((q[na, :] * mu_q, r_tilde)) # (D, N+N*D) # input-output covariance (cross-covariance) "weights" Wcc = R_tilde.dot(iK) # (D, N+N*D) # quantities for covariance weights zet = 2 * np.log(alpha) - 0.5 * np.sum(inn * inn, 0) # (D,N) 2log(alpha) - 0.5*(x-m)^T*Lambda^-1*(x-m) inn = iiLam.dot(unit_sp) # inp / el[:, na]**2 R = 2 * iiLam + eye_d # 2P*Lambda^-1 + I # (N,N) Q = (1.0 / np.sqrt(det(R))) * np.exp((zet[:, na] + zet[:, na].T) + maha(inn.T, -inn.T, V=0.5 * solve(R, eye_d))) cho_LamSig = cho_factor(Lam + Sig_q) Sig_Q = cho_solve(cho_LamSig, Sig_q).dot(iiLam) # (D,D) Lambda^-1 (Lambda*(Lambda+Sig_q)^-1*Sig_q) Lambda^-1 eta_tilde = iiLam.dot(cho_solve(cho_LamSig, eta)) # Lambda^-1(Lambda+Sig_q)^-1*eta ETA = eta_tilde[..., na] + eta_tilde[:, na, :] # (D,N,N) pairwise sum of pre-multiplied eta's (D,N,N) # mu_Q = ETA + in_mean[:, na] # (D,N,N) xnmu = inn[..., na] - ETA # (D,N,N) x_n - mu^Q_nm # xmmu = sigmas[:, na, :] - mu_Q # x_m - mu^Q_nm E_dff = (-Q[na, ...] * xnmu).swapaxes(0, 1).reshape(d * n, n) # (D,D,N,N) (x_n - mu^Q_nm)(x_m - mu^Q_nm)^T + Sig_Q T = xnmu[:, na, ...] * xnmu.swapaxes(1, 2)[na, ...] + Sig_Q[..., na, na] E_dffd = (Q[na, na, ...] * T).swapaxes(0, 3).reshape(d * n, -1) # (N*D, N*D) Q_tilde = np.vstack((np.hstack((Q, E_dff.T)), np.hstack((E_dff, E_dffd)))) # (N+N*D, N+N*D) # weights for covariance iKQ = iK.dot(Q_tilde) Wc = iKQ.dot(iK) # model variance self.model_var = np.diag((alpha ** 2 - np.trace(iKQ)) * np.ones((d, 1))) return wm, Wc, Wcc