我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用astropy.io.fits.Header()。
def header(self): dDc = self.configs.Dc_cell Dc_min, Dc_max = self.configs.Dc_limit header = fits.Header() header["BUNIT"] = (self.configs.unit, "Data unit") header["zmin"] = (self.configs.zmin, "HI simulation minimum redshift") header["zmax"] = (self.configs.zmax, "HI simulation maximum redshift") header["dz"] = (self.configs.dz, "HI simulation redshift step size") header["Dc_min"] = (Dc_min, "[cMpc] comoving distance at zmin") header["Dc_max"] = (Dc_max, "[cMpc] comoving distance at zmax") header["Dc_step"] = (dDc, "[cMpc] comoving distance between slices") header["Lside"] = (self.configs.Lside, "[cMpc] Simulation side length") header["Nside"] = (self.configs.Nside, "Number of cells at each side") header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), "File creation date") header.add_history(" ".join(sys.argv)) header.extend(self.wcs.to_header(), update=True) return header
def write_slice(self, outfile, data, z, clobber=False): freq = z2freq(z) Dc = cosmo.comoving_distance(z).value # [Mpc] header = fits.Header() header["BUNIT"] = (self.header["BUNIT"], self.header.comments["BUNIT"]) header["Lside"] = (self.header["Lside"], self.header.comments["Lside"]) header["Nside"] = (self.header["Nside"], self.header.comments["Nside"]) header["REDSHIFT"] = (z, "redshift of this slice") header["FREQ"] = (freq, "[MHz] observed HI signal frequency") header["Dc"] = (Dc, "[cMpc] comoving distance") header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), "File creation date") header.add_history(" ".join(sys.argv)) hdu = fits.PrimaryHDU(data=data, header=header) try: hdu.writeto(outfile, overwrite=clobber) except TypeError: hdu.writeto(outfile, clobber=clobber) logger.info("Wrote slice to file: %s" % outfile)
def header(self): dDc = self.Dc_cell header = fits.Header() header["BUNIT"] = (str(self.unit), "Data unit") header["zmin"] = (self.zmin, "HI simulation minimum redshift") header["zmax"] = (self.zmax, "HI simulation maximum redshift") header["Dc_min"] = (self.Dc_min, "[cMpc] comoving distance at zmin") header["Dc_max"] = (self.Dc_max, "[cMpc] comoving distance at zmax") header["Dc_step"] = (dDc, "[cMpc] comoving distance between slices") header["Lside"] = (self.Lside, "[cMpc] Simulation side length") header["Nside"] = (self.Nside, "Number of cells at each side") header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), "File creation date") header.add_history(" ".join(sys.argv)) header.extend(self.wcs.to_header(), update=True) return header
def _update_hdr_wcs(self, hdr, wcs_hdr): """Update a header with WCS parameters :param hdr: A pyfits.Header() instance :param wcs_header: A pyfits.Header() instance containing the new WCS parameters. """ hdr.extend(wcs_hdr, strip=True, update=True, end=True) # delete unused keywords created by pywcs if 'RESTFRQ' in hdr: del hdr['RESTFRQ'] if 'RESTWAV' in hdr: del hdr['RESTWAV'] if 'LONPOLE' in hdr: del hdr['LONPOLE'] if 'LATPOLE' in hdr: del hdr['LATPOLE'] return hdr
def writeSVD(self, svdoutputfits='ZAP_SVD.fits'): """Write the SVD to an individual fits file.""" check_file_exists(svdoutputfits) header = fits.Header() header['ZAPvers'] = (__version__, 'ZAP version') header['ZAPzlvl'] = (self.run_zlevel, 'ZAP zero level correction') header['ZAPclean'] = (self.run_clean, 'ZAP NaN cleaning performed for calculation') header['ZAPcftyp'] = (self._cftype, 'ZAP continuum filter type') header['ZAPcfwid'] = (self._cfwidth, 'ZAP continuum filter size') header['ZAPmask'] = (self.maskfile, 'ZAP mask used to remove sources') nseg = len(self.pranges) header['ZAPnseg'] = (nseg, 'Number of segments used for ZAP SVD') hdu = fits.HDUList([fits.PrimaryHDU(self.zlsky)]) for i in range(len(self.pranges)): hdu.append(fits.ImageHDU(self.especeval[i][0])) # write for later use hdu.writeto(svdoutputfits) logger.info('SVD file saved to %s', svdoutputfits)
def open_image(infile): """ Open the slice image and return its header and 2D image data. NOTE ---- The input slice image may have following dimensions: * NAXIS=2: [Y, X] * NAXIS=3: [FREQ=1, Y, X] * NAXIS=4: [STOKES=1, FREQ=1, Y, X] NOTE ---- Only open slice image that has only ONE frequency and ONE Stokes parameter. Returns ------- header : `~astropy.io.fits.Header` image : 2D `~numpy.ndarray` The 2D [Y, X] image part of the slice image. """ with fits.open(infile) as f: header = f[0].header data = f[0].data if data.ndim == 2: # NAXIS=2: [Y, X] image = data elif data.ndim == 3 and data.shape[0] == 1: # NAXIS=3: [FREQ=1, Y, X] image = data[0, :, :] elif data.ndim == 4 and data.shape[0] == 1 and data.shape[1] == 1: # NAXIS=4: [STOKES=1, FREQ=1, Y, X] image = data[0, 0, :, :] else: raise ValueError("Slice '{0}' has invalid dimensions: {1}".format( infile, data.shape)) return (header, image)
def header(self): if not hasattr(self, "header_"): self.header_ = fits.Header() return self.header_
def __init__(self, float32=True, clobber=False, checksum=False): self.type_ = None self.data = None self.frequency_ = None # [MHz] self.pixelsize_ = None # [arcsec] self.creator_ = __name__ self.header_ = fits.Header() self.float32_ = float32 self.clobber_ = clobber self.checksum_ = checksum
def write_fits_image(outfile, image, header=None, float32=False, clobber=False, checksum=False): """ Write the supplied image (together with header information) into the output FITS file. Parameters ---------- outfile : str The path/filename to the output file storing the pickled object. image : 2D `~numpy.ndarray` The image data to be written out to the FITS file. NOTE: image.shape: (nrow, ncol) <-> FITS image: (ncol, nrow) header : `~astropy.io.fits.Header` The FITS header information for this image float32 : bool, optional Whether coerce the image data (generally double/float64 data type) into single/float32 (in order to save space)? Default: False (i.e., preserve the data type unchanged) clobber : bool, optional Whether to overwrite the existing output file. Default: False checksum : bool, optional Whether to calculate the data checksum, which may cost some time? Default: False """ _create_dir(outfile) _check_existence(outfile, clobber=clobber, remove=True) hdr = fits.Header() hdr["CREATOR"] = (__name__, "File creator") hdr["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), "File creation date") if header is not None: hdr.extend(header, update=True) if float32: image = np.asarray(image, dtype=np.float32) hdu = fits.PrimaryHDU(data=image, header=header) hdu.writeto(outfile, checksum=checksum) logger.info("Wrote image to FITS file: %s" % outfile)
def read_fits_healpix(filename): """ Read the HEALPix map from a FITS file or a BinTableHDU to 1D array in *RING* ordering. Parameters ---------- filename : str or `~astropy.io.fits.BinTableHDU` Filename of the HEALPix FITS file, or an `~astropy.io.fits.BinTableHDU` instance. Returns ------- data : 1D `~numpy.ndarray` HEALPix data in *RING* ordering with same dtype as input header : `~astropy.io.fits.Header` Header of the input FITS file NOTE ---- This function wraps on `healpy.read_map()`, but set the data type of data array to its original value as in FITS file, as well as return the header of input FITS file. """ if isinstance(filename, fits.BinTableHDU): hdu = filename else: # Read the first extended table hdu = fits.open(filename)[1] # Hack to ignore the dtype byteorder, use native endianness dtype = np.dtype(hdu.data.field(0).dtype.type) header = hdu.header data = hp.read_map(hdu, nest=False, verbose=False) return (data.astype(dtype), header)
def header(self): """ This function ... """ # If the WCS for this frame is defined, use it to create a header if self.wcs is not None: header = self.wcs.to_header() # Else, create a new empty header else: header = fits.Header() # Add properties to the header header['NAXIS'] = 2 header['NAXIS1'] = self.xsize header['NAXIS2'] = self.ysize # ISSUE: see bug #4592 on Astropy GitHub (WCS.to_header issue) # temporary fix !! # I don't know whether this is a good fix.. but it seems to fix it for a particular situation #if "PC1_1" in header: #if "NAXIS1" in header: header.remove("NAXIS1") #if "NAXIS2" in header: header.remove("NAXIS2") #if "CDELT1" in header: header.remove("CDELT1") #if "CDELT2" in header: header.remove("CDELT2") #header.rename_keyword("PC1_1", "CD1_1") #header.rename_keyword("PC2_2", "CD2_2") # Return the header return header # -----------------------------------------------------------------
def header(self): '''Returns an astropy header''' session = db.Session.object_session(self) data = session.query(HeaderKeyword.name, HeaderValue.value, HeaderValue.comment).join(HeaderValue, HduToHeaderValue).filter( HduToHeaderValue.header_value_pk == HeaderValue.pk, HduToHeaderValue.hdu_pk == self.pk).all() hdr = fits.Header(data) return hdr
def header(self): '''Returns an astropy header''' session = Session.object_session(self) data = session.query(FitsHeaderKeyword.label, FitsHeaderValue.value, FitsHeaderValue.comment).join(FitsHeaderValue).filter( FitsHeaderValue.cube == self).all() hdr = fits.Header(data) return hdr
def makeHeader(self): wcscols = self.cols[2:] newhdr = fits.Header() for c in wcscols: newhdr[c] = float(self.__getattribute__(c)) if type(self.__getattribute__(c)) == Decimal else self.__getattribute__(c) return newhdr
def _test_init(self, model_cube, galaxy, bintype='SPX', template_kin='GAU-MILESHC'): assert model_cube._release == galaxy.release assert model_cube._drpver == galaxy.drpver assert model_cube._dapver == galaxy.dapver assert model_cube.bintype == bintype assert model_cube.template_kin == template_kin assert model_cube.plateifu == galaxy.plateifu assert model_cube.mangaid == galaxy.mangaid assert isinstance(model_cube.header, fits.Header) assert isinstance(model_cube.wcs, WCS) assert model_cube.wavelength is not None assert model_cube.redcorr is not None
def __init__(self, campaign=0, channel=1, cadenceno=1, data_store=None, shape=KEPLER_CHANNEL_SHAPE): self.campaign = campaign self.channel = channel self.cadenceno = cadenceno self.data_store = data_store self.header = fits.Header() self.data = np.empty(shape, dtype=np.float32) self.data[:] = np.nan
def name(self): """ name of the table given by the Header['NAME'] attribute """ return self.header.get('NAME', None)
def fits_header(self) -> Header: """FITS header for the given WCS (`~astropy.io.fits.Header`).""" return self.wcs.to_header()
def savefits(cube, fitsname, **kwargs): logger = getLogger('decode.io.savefits') ### pick up kwargs dropdeg = kwargs.pop('dropdeg', False) ndim = len(cube.dims) ### load yaml FITSINFO = get_data('decode', 'data/fitsinfo.yaml') hdrdata = yaml.load(FITSINFO) ### default header if ndim == 2: header = fits.Header(hdrdata['dcube_2d']) data = cube.values.T elif ndim == 3: if dropdeg: header = fits.Header(hdrdata['dcube_2d']) data = cube.values[:, :, 0].T else: header = fits.Header(hdrdata['dcube_3d']) data = cube.values.T else: raise TypeError(ndim) ### update Header if cube.coordsys == 'AZEL': header.update({'CTYPE1': 'dAZ', 'CTYPE2': 'dEL'}) elif cube.coordsys == 'RADEC': header.update({'OBSRA': float(cube.xref), 'OBSDEC': float(cube.yref)}) else: pass header.update({'CRVAL1': float(cube.x[0]), 'CDELT1': float(cube.x[1] - cube.x[0]), 'CRVAL2': float(cube.y[0]), 'CDELT2': float(cube.y[1] - cube.y[0]), 'DATE': datetime.now(timezone('UTC')).isoformat()}) if (ndim == 3) and (not dropdeg): header.update({'CRVAL3': float(cube.kidid[0])}) fits.writeto(fitsname, data, header, **kwargs) logger.info('{} has been created.'.format(fitsname))
def _load_map_from_db(self): """Initialises the Map from a ``Maps`` with ``data_origin='db'``.""" mdb = marvin.marvindb if not mdb.isdbconnected: raise marvin.core.exceptions.MarvinError('No db connected') if sqlalchemy is None: raise marvin.core.exceptions.MarvinError('sqlalchemy required to access the local DB.') if version.StrictVersion(self.maps._dapver) <= version.StrictVersion('1.1.1'): table = mdb.dapdb.SpaxelProp else: table = mdb.dapdb.SpaxelProp5 fullname_value = self.maps_property.fullname(channel=self.channel) value = mdb.session.query(getattr(table, fullname_value)).filter( table.file_pk == self.maps.data.pk).order_by(table.spaxel_index).all() self.value = np.array(value).reshape(self.shape).T if self.maps_property.ivar: fullname_ivar = self.maps_property.fullname(channel=self.channel, ext='ivar') ivar = mdb.session.query(getattr(table, fullname_ivar)).filter( table.file_pk == self.maps.data.pk).order_by(table.spaxel_index).all() self.ivar = np.array(ivar).reshape(self.shape).T if self.maps_property.mask: fullname_mask = self.maps_property.fullname(channel=self.channel, ext='mask') mask = mdb.session.query(getattr(table, fullname_mask)).filter( table.file_pk == self.maps.data.pk).order_by(table.spaxel_index).all() self.mask = np.array(mask).reshape(self.shape).T # Gets the header hdus = self.maps.data.hdus header_dict = None for hdu in hdus: if self.maps_property.name.upper() == hdu.extname.name.upper(): header_dict = hdu.header_to_dict() break if not header_dict: warnings.warn('cannot find the header for property {0}.' .format(self.maps_property.name), marvin.core.exceptions.MarvinUserWarning) else: self.header = fits.Header(header_dict) self.unit = self.maps_property.unit
def _fits_generate_header(tab): """ Generate the corresponding fits Header that contains all necessary info Parameters ---------- tab: SimpleTable instance table Returns ------- hdr: pyfits.Header header instance """ # get column cards cards = [] # names units and comments for e, k in enumerate(tab.keys()): cards.append(('TTYPE{0:d}'.format(e + 1), k, tab._desc.get(k, ''))) u = tab._units.get(k, '') if u not in ['', 'None', None]: cards.append(('TUNIT{0:d}'.format(e + 1), tab._units.get(k, ''), 'unit of {0:s}'.format(k))) # add aliases for e, v in enumerate(tab._aliases.items()): cards.append( ('ALIAS{0:d}'.format(e + 1), '='.join(v), '') ) if tab.header['NAME'] not in ['', 'None', None, 'No Name']: cards.append(('EXTNAME', tab.header['NAME'], '')) hdr = pyfits.Header(cards) for k, v in tab.header.items(): if (v not in ['', 'None', None]) & (k != 'NAME'): if (k != 'COMMENT') & (k != 'HISTORY'): hdr.update(k, v) else: txt = v.split('\n') for j in txt: if k == 'COMMENT': hdr.add_comment(j) elif k == 'HISTORY': hdr.add_history(j) return hdr
def _fits_writeto(filename, data, header=None, output_verify='exception', clobber=False, checksum=False): """ Create a new FITS file using the supplied data/header. Patched version of pyfits to correctly include provided header Parameters ---------- filename : file path, file object, or file like object File to write to. If opened, must be opened in a writeable binary mode such as 'wb' or 'ab+'. data : array, record array, or groups data object data to write to the new file header : `Header` object, optional the header associated with ``data``. If `None`, a header of the appropriate type is created for the supplied data. This argument is optional. output_verify : str Output verification option. Must be one of ``"fix"``, ``"silentfix"``, ``"ignore"``, ``"warn"``, or ``"exception"``. May also be any combination of ``"fix"`` or ``"silentfix"`` with ``"+ignore"``, ``+warn``, or ``+exception" (e.g. ``"fix+warn"``). See :ref:`verify` for more info. clobber : bool, optional If `True`, and if filename already exists, it will overwrite the file. Default is `False`. checksum : bool, optional If `True`, adds both ``DATASUM`` and ``CHECKSUM`` cards to the headers of all HDU's written to the file """ hdu = pyfits.convenience._makehdu(data, header) hdu.header.update(header.cards) if hdu.is_image and not isinstance(hdu, pyfits.PrimaryHDU): hdu = pyfits.PrimaryHDU(data, header=header) hdu.writeto(filename, clobber=clobber, output_verify=output_verify, checksum=checksum)
def _ascii_generate_header(tab, comments='#', delimiter=' ', commentedHeader=True): """ Generate the corresponding ascii Header that contains all necessary info Parameters ---------- tab: SimpleTable instance table comments: str string to prepend header lines delimiter: str, optional The string used to separate values. By default, this is any whitespace. commentedHeader: bool, optional if set, the last line of the header is expected to be the column titles Returns ------- hdr: str string that will be be written at the beginning of the file """ hdr = [] if comments is None: comments = '' # table header length = max(map(len, tab.header.keys())) fmt = '{{0:s}} {{1:{0:d}s}}\t{{2:s}}'.format(length) for k, v in tab.header.items(): for vk in v.split('\n'): if len(vk) > 0: hdr.append(fmt.format(comments, k.upper(), vk.strip())) # column metadata hdr.append(comments) # add empty line length = max(map(len, tab.keys())) fmt = '{{0:s}}{{0:s}} {{1:{0:d}s}}\t{{2:s}}\t{{3:s}}'.format(length) for colname in tab.keys(): unit = tab._units.get(colname, 'None') desc = tab._desc.get(colname, 'None') hdr.append(fmt.format(comments, colname, unit, desc)) # aliases if len(tab._aliases) > 0: hdr.append(comments) # add empty line for k, v in tab._aliases.items(): hdr.append('{0:s} alias\t{1:s}={2:s}'.format(comments, k, v)) # column names hdr.append(comments) if commentedHeader: hdr.append('{0:s} {1:s}'.format(comments, delimiter.join(tab.keys()))) else: hdr.append('{0:s}'.format(delimiter.join(tab.keys()))) return '\n'.join(hdr)
def make_fits_header(header,first=True,LOFAR=False): '''Takes .fil header into fits header format ''' base_header = {} base_header['SIMPLE'] = True base_header['NAXIS'] = 2 base_header['NAXIS1'] = int(header['Number of channels']) base_header['DOPPLER'] = 0.0 base_header['SNR'] = 0.0 base_header['EXTEND'] = True base_header['DELTAT'] = float(header['Sample time (us)'])/1e6 base_header['MJD'] = float(header['Time stamp of first sample (MJD)']) base_header['XTENSION'] = 'IMAGE ' base_header['PCOUNT'] = 1 base_header['GCOUNT'] = 1 base_header['TOFFSET'] = float(header['Sample time (us)'])/1e6 if LOFAR: base_header['BITPIX'] = -32 base_header['DELTAF'] = 0.000001497456 # LOFAR specific (project LC2_040). base_header['DEC'] = float(header['Source DEC (J2000)']) base_header['RA'] = float(header['Source RA (J2000)']) base_header['SOURCE'] = header['Source Name'].replace('\xc2\xa0','_').replace(' ','_') else: if '32' in header['Number of bits per sample']: base_header['BITPIX'] = -32 else: raise ValueError('Check nbits per sample. Not equeal 32') base_header['DELTAF'] = np.abs(float(header['Channel bandwidth (MHz)'])) base_header['DEC'] = header['Source DEC (J2000)'] base_header['RA'] = header['Source RA (J2000)'] base_header['SOURCE'] = header['Source Name'].replace('\xc2\xa0','_').replace(' ','') base_header['FCNTR'] = float(header['Frequency of channel 1 (MHz)']) - base_header['DELTAF']*base_header['NAXIS1']/2 if first: base_header['NAXIS2'] = int(header['Number of samples']) key_list = ['SIMPLE','BITPIX','NAXIS','NAXIS1','NAXIS2','EXTEND','DELTAT','DELTAF','FCNTR','MJD','DEC','RA','DOPPLER','SNR','SOURCE'] else: base_header['NAXIS2'] = 1 key_list = ['XTENSION','BITPIX','NAXIS','NAXIS1','NAXIS2','PCOUNT','GCOUNT','DELTAT','DELTAF','FCNTR','TOFFSET','DEC','RA','DOPPLER','SNR','SOURCE'] fits_header=pyfits.Header(cards=[pyfits.Card(key=key,value=base_header[key]) for key in key_list]) return fits_header
def fits2ldac (header4ext2, data4ext3, fits_ldac_out, doSort=True): """This function converts the binary FITS table from Astrometry.net to a binary FITS_LDAC table that can be read by PSFex. [header4ext2] is what will be recorded as a single long string in the data part of the 2nd extension of the output table [fits_ldac_out], and [data4ext3] is the data part of an HDU that will define both the header and data parts of extension 3 of [fits_ldac_out]. """ # convert header to single (very) long string ext2_str = header4ext2.tostring(endcard=False, padding=False) # if the following line is not added, the very end of the data # part of extension 2 is written to a fits table such that PSFex # runs into a segmentation fault when attempting to read it (took # me ages to find out!). ext2_str += 'END END' # read into string array ext2_data = np.array([ext2_str]) # determine format string for header of extention 2 formatstr = str(len(ext2_str))+'A' # create table 1 col1 = fits.Column(name='Field Header Card', array=ext2_data, format=formatstr) cols = fits.ColDefs([col1]) ext2 = fits.BinTableHDU.from_columns(cols) # make sure these keywords are in the header ext2.header['EXTNAME'] = 'LDAC_IMHEAD' ext2.header['TDIM1'] = '(80, {0})'.format(len(ext2_str)/80) # simply create extension 3 from [data4ext3] ext3 = fits.BinTableHDU(data4ext3) # extname needs to be as follows ext3.header['EXTNAME'] = 'LDAC_OBJECTS' # sort output table by number column if needed if doSort: index_sort = np.argsort(ext3.data['NUMBER']) ext3.data = ext3.data[index_sort] # create primary HDU prihdr = fits.Header() prihdu = fits.PrimaryHDU(header=prihdr) prihdu.header['EXPTIME'] = header4ext2['EXPTIME'] prihdu.header['FILTNAME'] = header4ext2['FILTNAME'] # prihdu.header['SEEING'] = header4ext2['SEEING'] #need to calculte and add prihdu.header['BKGSIG'] = header4ext2['SEXBKDEV'] # write hdulist to output LDAC fits table hdulist = fits.HDUList([prihdu, ext2, ext3]) hdulist.writeto(fits_ldac_out, clobber=True) hdulist.close() ################################################################################