我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用astropy.io.fits.Column()。
def simple_generic_writer(data, file_name, **kwargs): """ Basic `Spectrum1DRef` FITS writer. """ # Create fits columns flux_col = fits.Column(name='FLUX', format='E', array=data.data) disp_col = fits.Column(name='WAVE', format='E', array=data.dispersion) uncert_col = fits.Column(name='UNCERT', format='E', array=data.uncertainty.array) mask_col = fits.Column(name='MASK', format='L', array=data.mask) cols = fits.ColDefs([flux_col, disp_col, uncert_col, mask_col]) # Create the bin table tbhdu = fits.BinTableHDU.from_columns(cols) # Create header prihdu = fits.PrimaryHDU(header=data.meta.get('header', data.wcs.to_header())) # Compose thdulist = fits.HDUList([prihdu, tbhdu]) thdulist.writeto("{}.fits".format(file_name), overwrite=True)
def write(self, filename, clobber=False): """ Write the updated/modified spectrum block to file. """ channel_col = fits.Column(name='CHANNEL', format='J', array=self.channel) counts_col = fits.Column(name='COUNTS', format='J', array=self.counts_squeezed) stat_err_col = fits.Column(name='STAT_ERR', format='D', array=self.stat_err_adjusted) grouping_col = fits.Column(name='GROUPING', format='I', array=self.grouping) quality_col = fits.Column(name='QUALITY', format='I', array=self.quality) spec_cols = fits.ColDefs([channel_col, counts_col, stat_err_col, grouping_col, quality_col]) spechdu = fits.BinTableHDU.from_columns(spec_cols, header=self.header) spechdu.writeto(filename, clobber=clobber)
def add_time_column(fitsfile, blockname="EVENTS"): """ Add a time column to the specified block of the input fits file. The time data are generated with a uniform distribution between TSTART and TSTOP. Return: A fits object with the new time column. """ if isinstance(fitsfile, str): fitsfile = fits.open(fitsfile) table = fitsfile[blockname] tstart = table.header["TSTART"] tstop = table.header["TSTOP"] counts = len(table.data) time_data = np.random.uniform(tstart, tstop, counts) time_col = fits.Column(name="time", format="1D", unit="s", array=time_data) # NOTE: append the new time column to the *last*! # Otherwise the TLMIN??/TLMAX?? keyword pairs, which record the # minimum/maximum values of corresponding columns, will become # *out of order*. Therefore the output FITS file causes weird problems # with DS9 and DM tools. newtable = fits.BinTableHDU.from_columns( table.columns + fits.ColDefs([time_col])) fitsfile[blockname].data = newtable.data # update header fitsfile[blockname].header.update(newtable.header) return fitsfile
def write(self, outfile, clobber=False): """ Create a new "SPECTRUM" table/extension and replace the original one, then write to output file. """ # Open the original input spectrum as the base fitsobj = fits.open(self.filename) columns = [ fits.Column(name="CHANNEL", format="I", array=self.channel), fits.Column(name=self.spec_type, format=self.spec_fits_format, unit=self.spec_unit, array=self.spec_data), ] if self.grouping is not None: columns.append(fits.Column(name="GROUPING", format="I", array=self.grouping)) if self.quality is not None: columns.append(fits.Column(name="QUALITY", format="I", array=self.quality)) if self.stat_err is not None: columns.append(fits.Column(name="STAT_ERR", unit=self.spec_unit, format=self.spec_fits_format, array=self.stat_err)) ext_spec_cols = fits.ColDefs(columns) ext_spec = fits.BinTableHDU.from_columns(ext_spec_cols, header=self.header) # Replace the original spectrum data fitsobj["SPECTRUM"] = ext_spec try: fitsobj.writeto(outfile, overwrite=clobber, checksum=True) except TypeError: fitsobj.writeto(outfile, clobber=clobber, checksum=True) # class Spectrum }}}
def array2fits(table, extname): cat_columns = pyfits.ColDefs([pyfits.Column(name=n, format='E',array=table[n]) for n in table.dtype.names]) return wraptable2fits(cat_columns, extname)
def savepysyn(self,wave,flux,fname,units=None): """ Cannot ever use the .writefits() method, because the array is frequently just sampled at the synphot waveset; plus, writefits is smart and does things like tapering.""" if units is None: ytype='throughput' units=' ' else: ytype='flux' col1=pyfits.Column(name='wavelength',format='D',array=wave) col2=pyfits.Column(name=ytype,format='D',array=flux) tbhdu=pyfits.BinTableHDU.from_columns(pyfits.ColDefs([col1,col2])) tbhdu.header.update('tunit1','angstrom') tbhdu.header.update('tunit2',units) tbhdu.writeto(fname.replace('.fits','_pysyn.fits'))
def _aipscc(self, threshold=None, relative=True, istokes=0, ifreq=0): ''' Make AIPS CC table Arguments: istokes (integer): index for Stokes Parameter at which the image will be saved ifreq (integer): index for Frequency at which the image will be saved threshold (float): pixels with the absolute intensity smaller than this value will be ignored. relative (boolean): If true, theshold value will be normalized with the peak intensity of the image. ''' Nx = self.header["nx"] Ny = self.header["ny"] xg, yg = self.get_xygrid(angunit="deg") X, Y = np.meshgrid(xg, yg) X = X.reshape(Nx * Ny) Y = Y.reshape(Nx * Ny) flux = self.data[istokes, ifreq] flux = flux.reshape(Nx * Ny) # threshold if threshold is None: thres = np.finfo(np.float32).eps else: if relative: thres = self.peak(istokes=istokes, ifreq=ifreq) * threshold else: thres = threshold thres = np.abs(thres) # adopt threshold X = X[flux >= thres] Y = Y[flux >= thres] flux = flux[flux >= thres] # make table columns c1 = pyfits.Column(name='FLUX', array=flux, format='1E',unit='JY') c2 = pyfits.Column(name='DELTAX', array=X, format='1E',unit='DEGREES') c3 = pyfits.Column(name='DELTAY', array=Y, format='1E',unit='DEGREES') c4 = pyfits.Column(name='MAJOR AX', array=np.zeros(len(flux)), format='1E',unit='DEGREES') c5 = pyfits.Column(name='MINOR AX', array=np.zeros(len(flux)), format='1E',unit='DEGREES') c6 = pyfits.Column(name='POSANGLE', array=np.zeros(len(flux)), format='1E',unit='DEGREES') c7 = pyfits.Column(name='TYPE OBJ', array=np.zeros(len(flux)), format='1E',unit='CODE') # make CC table tab = pyfits.BinTableHDU.from_columns([c1, c2, c3, c4, c5, c6, c7]) return tab
def fits2ldac (header4ext2, data4ext3, fits_ldac_out, doSort=True): """This function converts the binary FITS table from Astrometry.net to a binary FITS_LDAC table that can be read by PSFex. [header4ext2] is what will be recorded as a single long string in the data part of the 2nd extension of the output table [fits_ldac_out], and [data4ext3] is the data part of an HDU that will define both the header and data parts of extension 3 of [fits_ldac_out]. """ # convert header to single (very) long string ext2_str = header4ext2.tostring(endcard=False, padding=False) # if the following line is not added, the very end of the data # part of extension 2 is written to a fits table such that PSFex # runs into a segmentation fault when attempting to read it (took # me ages to find out!). ext2_str += 'END END' # read into string array ext2_data = np.array([ext2_str]) # determine format string for header of extention 2 formatstr = str(len(ext2_str))+'A' # create table 1 col1 = fits.Column(name='Field Header Card', array=ext2_data, format=formatstr) cols = fits.ColDefs([col1]) ext2 = fits.BinTableHDU.from_columns(cols) # make sure these keywords are in the header ext2.header['EXTNAME'] = 'LDAC_IMHEAD' ext2.header['TDIM1'] = '(80, {0})'.format(len(ext2_str)/80) # simply create extension 3 from [data4ext3] ext3 = fits.BinTableHDU(data4ext3) # extname needs to be as follows ext3.header['EXTNAME'] = 'LDAC_OBJECTS' # sort output table by number column if needed if doSort: index_sort = np.argsort(ext3.data['NUMBER']) ext3.data = ext3.data[index_sort] # create primary HDU prihdr = fits.Header() prihdu = fits.PrimaryHDU(header=prihdr) prihdu.header['EXPTIME'] = header4ext2['EXPTIME'] prihdu.header['FILTNAME'] = header4ext2['FILTNAME'] # prihdu.header['SEEING'] = header4ext2['SEEING'] #need to calculte and add prihdu.header['BKGSIG'] = header4ext2['SEXBKDEV'] # write hdulist to output LDAC fits table hdulist = fits.HDUList([prihdu, ext2, ext3]) hdulist.writeto(fits_ldac_out, clobber=True) hdulist.close() ################################################################################