我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用astropy.io.fits.HDUList()。
def simple_generic_writer(data, file_name, **kwargs): """ Basic `Spectrum1DRef` FITS writer. """ # Create fits columns flux_col = fits.Column(name='FLUX', format='E', array=data.data) disp_col = fits.Column(name='WAVE', format='E', array=data.dispersion) uncert_col = fits.Column(name='UNCERT', format='E', array=data.uncertainty.array) mask_col = fits.Column(name='MASK', format='L', array=data.mask) cols = fits.ColDefs([flux_col, disp_col, uncert_col, mask_col]) # Create the bin table tbhdu = fits.BinTableHDU.from_columns(cols) # Create header prihdu = fits.PrimaryHDU(header=data.meta.get('header', data.wcs.to_header())) # Compose thdulist = fits.HDUList([prihdu, tbhdu]) thdulist.writeto("{}.fits".format(file_name), overwrite=True)
def combineFits(filenames,outFname): """ combine the data in multiple ExoSOFT fits files together. Used primarily for after multi-process runs. """ nFiles = len(filenames) (head0,dataALL) = loadFits(filenames[0]) for filename in filenames[1:]: (head,data) = loadFits(filename) dataALL = np.concatenate((dataALL,data)) hdu = pyfits.PrimaryHDU(dataALL) hdulist = pyfits.HDUList([hdu]) header = hdulist[0].header for key in head0: if key=='NSAMPLES': ##load in total number of samples for this combined file header['NSAMPLES'] = (int(head0['NSAMPLES'])*nFiles,head0.comments['NSAMPLES']) else: header[key] = (head0[key],head0.comments[key]) hdulist.writeto(outFname) hdulist.close() log.info("output file written to:below\n"+outFname)
def renameFits(filenameIn,filenameOut,killInput=True,overwrite=True): """ Load input into a new fits HDU, write to output name, close, rm input file. """ goodToGo = True if os.path.exists(filenameOut): if overwrite: rmFiles([filenameOut]) else: goodToGo=False log.critical("File already exists, either set overwrite==True, or there be more careful.") if goodToGo: (head,data) = loadFits(filenameIn) hdu = pyfits.PrimaryHDU(data) hdulist = pyfits.HDUList([hdu]) header = hdulist[0].header for key in head: header[key] = (head[key],head.comments[key]) hdulist.writeto(filenameOut) hdulist.close() log.info("output file written to:below\n"+filenameOut) if killInput: rmFiles([filenameIn])
def regrid_h2(projDir='/media/DATAPART/projects/GAS/testing/', region_list = ['L1688','NGC1333','B18','OrionA'], file_extension='DR1_rebase3', herDir = '/media/DATAPART/projects/GAS/otherData/herschel_ayushi/', herFile_list=['OphL1688','perseus','Tau_B18','orionA-N']): for region_i in range(len(region_list)): region = region_list[region_i] herFilename = herFile_list[region_i] herColFits = herDir+'{0}/Colden_temp/{1}_colden_masked.fits'.format(region,herFilename) nh3ImFits = projDir + '{0}/{0}_NH3_11_{1}_mom0_QA_trim.fits'.format(region,file_extension) h2_hdu = fits.open(herColFits) nh3_hdr = fits.getheader(nh3ImFits) new_h2 = FITS_tools.hcongrid.hcongrid(h2_hdu[0].data,h2_hdu[0].header,nh3_hdr) new_h2_hdu = fits.PrimaryHDU(new_h2,nh3_hdr) new_h2_hduList = fits.HDUList([new_h2_hdu]) new_h2_hduList.writeto('nh2_regridded/{0}_NH2_regrid.fits',clobber=True)
def save_fibmodel(self): ''' Save the fibers to fits file with two extensions The first is 3-d for trace, wavelength and fiber_to_fiber The second is which fibers are good and not dead ''' try: self.fibers[0].fibmodel except: self.log.warning('Trying to save fibermodel but none exist.') return None ylims = np.linspace(-1.*self.fsize, self.fsize, self.fibmodel_nbins) fibmodel = np.zeros((len(self.fibers), self.fibmodel_nbins, self.D)) for i,fiber in enumerate(self.fibers): fibmodel[i,:,:] = fiber.fibmodel s = fits.PrimaryHDU(np.array(fibmodel,dtype='float32')) t = fits.ImageHDU(np.array(ylims,dtype='float32')) hdu = fits.HDUList([s,t]) fn = op.join(self.path, 'fibermodel_%s_%s_%s_%s.fits' %(self.specid, self.ifuslot, self.ifuid, self.amp)) self.write_to_fits(hdu, fn)
def writeSVD(self, svdoutputfits='ZAP_SVD.fits'): """Write the SVD to an individual fits file.""" check_file_exists(svdoutputfits) header = fits.Header() header['ZAPvers'] = (__version__, 'ZAP version') header['ZAPzlvl'] = (self.run_zlevel, 'ZAP zero level correction') header['ZAPclean'] = (self.run_clean, 'ZAP NaN cleaning performed for calculation') header['ZAPcftyp'] = (self._cftype, 'ZAP continuum filter type') header['ZAPcfwid'] = (self._cfwidth, 'ZAP continuum filter size') header['ZAPmask'] = (self.maskfile, 'ZAP mask used to remove sources') nseg = len(self.pranges) header['ZAPnseg'] = (nseg, 'Number of segments used for ZAP SVD') hdu = fits.HDUList([fits.PrimaryHDU(self.zlsky)]) for i in range(len(self.pranges)): hdu.append(fits.ImageHDU(self.especeval[i][0])) # write for later use hdu.writeto(svdoutputfits) logger.info('SVD file saved to %s', svdoutputfits)
def burnInStripper(mcmcFnames,burnInLengths): """ Strip the initial burn-in off each chain. """ newFnames=[] for i in range(0,len(mcmcFnames)): filename = mcmcFnames[i] burnIn = burnInLengths[i] if os.path.exists(filename): (head,data) = loadFits(filename) ##strip burn-in and write to new fits log.debug("Before stripping burn-in, file had "+str(len(data[:,0]))+" samples") hdu = pyfits.PrimaryHDU(data[burnIn:,:]) hdulist = pyfits.HDUList([hdu]) newHead = hdulist[0].header log.debug("Before stripping burn-in, file had "+str(len(hdulist[0].data[:,0]))+" samples") for key in head: newHead[key]=(head[key],head.comments[key]) n = os.path.splitext(os.path.basename(filename)) newFname = os.path.join(os.path.dirname(mcmcFnames[0]),n[0]+'_BIstripped.fits') hdulist.writeto(newFname) log.info("output file written to:below\n"+newFname) hdulist.close() newFnames.append(newFname) log.debug("burn-in stripped file written to:\n"+newFname) return newFnames
def _load_modelcube_from_file(self): """Initialises a model cube from a file.""" if self.data is not None: assert isinstance(self.data, fits.HDUList), 'data is not an HDUList object' else: try: self.data = fits.open(self.filename) except IOError as err: raise IOError('filename {0} cannot be found: {1}'.format(self.filename, err)) self.header = self.data[0].header self.shape = self.data['FLUX'].data.shape[1:] self.wcs = WCS(self.data['FLUX'].header) self.wavelength = self.data['WAVE'].data self.redcorr = self.data['REDCORR'].data self.plateifu = self.header['PLATEIFU'] self.mangaid = self.header['MANGAID'] # Checks and populates release. file_drpver = self.header['VERSDRP3'] file_drpver = 'v1_5_1' if file_drpver == 'v1_5_0' else file_drpver file_ver = marvin.config.lookUpRelease(file_drpver) assert file_ver is not None, 'cannot find file version.' if file_ver != self._release: warnings.warn('mismatch between file version={0} and object release={1}. ' 'Setting object release to {0}'.format(file_ver, self._release), marvin.core.exceptions.MarvinUserWarning) self._release = file_ver self._drpver, self._dapver = marvin.config.lookUpVersions(release=self._release)
def _load_cube_from_file(self, data=None): """Initialises a cube from a file.""" if data is not None: assert isinstance(data, fits.HDUList), 'data is not an HDUList object' else: try: self.data = fits.open(self.filename) except IOError as err: raise IOError('filename {0} cannot be found: {1}'.format(self.filename, err)) self.header = self.data[1].header self.shape = self.data['FLUX'].data.shape[1:] self.wcs = WCS(self.header) self.wavelength = self.data['WAVE'].data self.plateifu = self.header['PLATEIFU'] # Checks and populates the release. file_drpver = self.header['VERSDRP3'] file_drpver = 'v1_5_1' if file_drpver == 'v1_5_0' else file_drpver file_ver = marvin.config.lookUpRelease(file_drpver) assert file_ver is not None, 'cannot find file version.' if file_ver != self._release: warnings.warn('mismatch between file release={0} and object release={1}. ' 'Setting object release to {0}'.format(file_ver, self._release), MarvinUserWarning) self._release = file_ver # Reload NSA data from file version of drpall file self._drpver, self._dapver = marvin.config.lookUpVersions(release=self._release) self._drpall = marvin.config._getDrpAllPath(file_drpver) self.mangaid = self.header['MANGAID'] nsa_source = 'drpall' if self.nsa_source == 'auto' else self.nsa_source self._nsa = None self._nsa = get_nsa_data(self.mangaid, mode='auto', source=nsa_source, drpver=self._drpver, drpall=self._drpall) self._drpver, self._dapver = marvin.config.lookUpVersions(release=self._release)
def to_fits(self): primary_hdu = fits.PrimaryHDU() image_hdu = fits.ImageHDU(self.data, self.header) hdulist = fits.HDUList([primary_hdu, image_hdu]) return hdulist
def create_mock_fits(): x = np.ones((5, 5)) prihdu = fits.PrimaryHDU(x) # Single extension FITS img = fits.ImageHDU(data=x) singlehdu = fits.HDUList([prihdu, img]) singlehdu.writeto('image.fits', clobber=True)
def wraptable2fits(cat_columns, extname): tbhdu = fits_from_columns(pyfits.ColDefs(cat_columns)) hdu = pyfits.PrimaryHDU() import datetime, time now = datetime.datetime.fromtimestamp(time.time()) nowstr = now.isoformat() nowstr = nowstr[:nowstr.rfind('.')] hdu.header['DATE'] = nowstr hdu.header['ANALYSIS'] = 'NWAY matching' tbhdu.header['EXTNAME'] = extname hdulist = pyfits.HDUList([hdu, tbhdu]) return hdulist
def load_data(self): hdulist = fits.open(self.filename) print("MuseCube: Loading the cube fluxes and variances...") # import pdb; pdb.set_trace() self.cube = hdulist[1].data self.stat = hdulist[2].data # for ivar weighting ; consider creating it in init ; takes long # self.flux_over_ivar = self.cube / self.stat self.header_1 = hdulist[1].header # Necesito el header para crear una buena copia del white. self.header_0 = hdulist[0].header if self.filename_white is None: print("MuseCube: No white image given, creating one.") w_data = self.create_white(save=False) w_header_0 = copy.deepcopy(self.header_0) w_header_1 = copy.deepcopy(self.header_1) # These loops remove the third dimension from the header's keywords. This is neccesary in order to # create the white image and preserve the cube astrometry for i in w_header_0.keys(): if '3' in i: del w_header_0[i] for i in w_header_1.keys(): if '3' in i: del w_header_1[i] # prepare the header hdu = fits.HDUList() hdu_0 = fits.PrimaryHDU(header=w_header_0) hdu_1 = fits.ImageHDU(data=w_data, header=w_header_1) hdu.append(hdu_0) hdu.append(hdu_1) hdu.writeto('new_white.fits', clobber=True) self.filename_white = 'new_white.fits' print("MuseCube: `new_white.fits` image saved to disk.")
def spec_to_redmonster_format(spec, fitsname, n_id=None, mag=None): """ Function used to create a spectrum in the REDMONSTER software format :param spec: XSpectrum1D object :param mag: List containing 2 elements, the first is the keyword in the header that will contain the magnitud saved in the second element :param fitsname: Name of the fitsfile that will be created :return: """ from scipy import interpolate wave = spec.wavelength.value wave_log = np.log10(wave) n = len(wave) spec.wavelength = wave_log * u.angstrom new_wave_log = np.arange(wave_log[1], wave_log[n - 2], 0.0001) spec_rebined = spec.rebin(new_wv=new_wave_log * u.angstrom) flux = spec_rebined.flux.value f = interpolate.interp1d(wave_log, spec.sig.value) sig = f(new_wave_log) inv_sig = 1. / np.array(sig) ** 2 inv_sig = np.where(np.isinf(inv_sig), 0, inv_sig) inv_sig = np.where(np.isnan(inv_sig), 0, inv_sig) hdu1 = fits.PrimaryHDU([flux]) hdu2 = fits.ImageHDU([inv_sig]) hdu1.header['COEFF0'] = new_wave_log[0] hdu1.header['COEFF1'] = new_wave_log[1] - new_wave_log[0] if n_id != None: hdu1.header['ID'] = n_id if mag != None: hdu1.header[mag[0]] = mag[1] hdulist_new = fits.HDUList([hdu1, hdu2]) hdulist_new.writeto(fitsname, clobber=True)
def save(self, filename, clobber=False): hdulist = pf.HDUList() ext = '.oif.fits' if filename.find(ext)==-1: filename += ext hdu = pf.PrimaryHDU() hdu.header.set('EXT', 'MODEL', comment='Type of information in the HDU') hdu.header.set('DATE', strftime('%Y%m%dT%H%M%S'), comment='Creation Date') hdu.header.set('NOBJ', self.nobj, comment='Number of objects in the model') hdu.header.set('NPARAMS', self.nparams, comment='Total number of free parameters') for i, item in enumerate(self._objs): hdu.header.set('OBJ'+str(i), item.name, comment='Name of object '+str(i)) allmodes = ['vis2', 't3phi', 't3amp', 'visphi', 'visamp'] for mode in allmodes: number = [] if getattr(self.oidata, mode): number = [len(i) for i in getattr(self.oidata, "_input_"+mode) if i is not None] hdu.header.set('N'+mode, int(np.sum(number)), comment='Total number of '+mode+' measurements') hdu.header.set('F_'+mode, len(number), comment='Number of '+mode+' files') hdu.header.add_comment('Written by Guillaume SCHWORER') hdulist.append(hdu) hdulist.writeto(filename, clobber=clobber) for item in self._objs: item.save(filename, append=True, clobber=clobber) if self._hasdata: self.oidata.save(filename, append=True, clobber=clobber) return filename
def save(self, image_list=[], spec_list=[]): ''' Save the entire amplifier include the list of fibers. This property is not used often as "amp*.pkl" is large and typically the fibers can be loaded and the other amplifier properties quickly recalculated. ''' self.log.info('Saving images/properties to %s' %self.path) fn = op.join(self.path, 'multi_%s_%s_%s_%s.fits' %(self.specid, self.ifuslot, self.ifuid, self.amp)) fits_list = [] for i,image in enumerate(image_list): if i==0: fits_list.append(fits.PrimaryHDU(np.array(getattr(self, image), dtype='float32'))) else: fits_list.append(fits.ImageHDU(np.array(getattr(self, image), dtype='float32'))) fits_list[-1].header['EXTNAME'] = image for i, spec in enumerate(spec_list): try: s = np.array([getattr(fiber, spec) for fiber in self.fibers], dtype='float32') fits_list.append(fits.ImageHDU(s)) fits_list[-1].header['EXTNAME'] = spec except AttributeError: self.log.warning('Attribute %s does not exist to save' %spec) if fits_list: fits_list[0] = self.write_header(fits_list[0]) hdu = fits.HDUList(fits_list) self.write_to_fits(hdu, fn)
def load_data(self): hdulist = fits.open(self.filename) print("MuseCube: Loading the cube fluxes and variances...") # import pdb; pdb.set_trace() self.cube = ma.MaskedArray(hdulist[1].data) self.stat = ma.MaskedArray(hdulist[2].data) print("MuseCube: Defining master masks (this may take a while but it is for the greater good).") # masking self.mask_init = np.isnan(self.cube) | np.isnan(self.stat) self.cube.mask = self.mask_init self.stat.mask = self.mask_init # for ivar weighting ; consider creating it in init ; takes long # self.flux_over_ivar = self.cube / self.stat self.header_1 = hdulist[1].header # Necesito el header para crear una buena copia del white. self.header_0 = hdulist[0].header if self.filename_white is None: print("MuseCube: No white image given, creating one.") w_data = copy.deepcopy(self.create_white(save=False).data) w_header_0 = copy.deepcopy(self.header_0) w_header_1 = copy.deepcopy(self.header_1) # These loops remove the third dimension from the header's keywords. This is neccesary in order to # create the white image and preserve the cube astrometry for i in w_header_0.keys(): if '3' in i: del w_header_0[i] for i in w_header_1.keys(): if '3' in i: del w_header_1[i] # prepare the header hdu = fits.HDUList() hdu_0 = fits.PrimaryHDU(header=w_header_0) hdu_1 = fits.ImageHDU(data=w_data, header=w_header_1) hdu.append(hdu_0) hdu.append(hdu_1) hdu.writeto('new_white.fits', clobber=True) self.filename_white = 'new_white.fits' print("MuseCube: `new_white.fits` image saved to disk.")
def save(self, filename, append=False, clobber=False): ext = '.oif.fits' if filename.find(ext)==-1: filename += ext if append: hdulist = pf.open(filename, mode='append') else: hdulist = pf.HDUList() hdu = pf.PrimaryHDU() hdu.header.set('DATE', strftime('%Y%m%dT%H%M%S'), comment='Creation Date') hdu.header.set('EXT', 'OBJ', comment='Type of information in the HDU') hdu.header.set('NAME', str(self.name), comment='Name of the object') hdu.header.set('TYP', str(self.typ), comment='Unit-model of the object') hdu.header.set('NVALUE', len(self._vkeys), comment='Number of set parameters in the model') hdu.header.set('NPARAM', len(self._pkeys), comment='Number of free parameters in the model') hdu.header.set('NSAVE', len(getattr(self, '_save', {})), comment='Number of additional parameters in the model') for i, k in enumerate(self._vkeys): hdu.header.set('VALUE'+str(i), k, comment='Name of the set parameter '+str(i)) hdu.header.set(k, getattr(self, "_"+k), comment='Value of the set parameter '+str(i)) for i, k in enumerate(self._pkeys): hdu.header.set('PARAM'+str(i), k, comment='Name of the free parameter '+str(i)) hdu.header.set(k, getattr(self, "_"+k, 'NONE'), comment='Prior of the free parameter '+str(i)) hdu.header.set(k+'L', getattr(self, k+"_bounds")[0], comment='Value of the lower prior-bound ') hdu.header.set(k+'H', getattr(self, k+"_bounds")[1], comment='Value of the higher prior-bound ') for i, k in enumerate(getattr(self, '_save', [])): hdu.header.set('SAVE'+str(i), k, comment='Name of the additional parameter '+str(i)) if not isinstance(getattr(self, k), (np.ndarray, list, tuple)): hdu.header.set(k, getattr(self, k), comment='Value of the additional parameter '+str(i)) else: hdu.header.set(k, 'ARRAY', comment='Value of the additional parameter '+str(i)) key = 'K'+str(int(time()/100%1*10**7)) hdu.header.set('KEY'+str(i), key, comment='Unique key to find the parameter value '+str(i)) paramhdu = pf.PrimaryHDU() paramhdu.header.set('DATE', strftime('%Y%m%dT%H%M%S'), comment='Creation Date') paramhdu.header.set('NAME', str(self.name), comment='Name of the object') paramhdu.header.set('TYP', str(self.typ), comment='Unit-model of the object') paramhdu.header.set('SAVE'+str(i), k, comment='Name of the additional parameter '+str(i)) paramhdu.header.set('EXT', key, comment='Type of information in the HDU : unique key') paramhdu.data = np.asarray(getattr(self, k)) hdulist.append(paramhdu) hdu.header.add_comment('Written by Guillaume SCHWORER') hdulist.append(hdu) if append: hdulist.flush() hdulist.close() else: hdulist.writeto(filename, clobber=clobber) return filename
def fits2ldac (header4ext2, data4ext3, fits_ldac_out, doSort=True): """This function converts the binary FITS table from Astrometry.net to a binary FITS_LDAC table that can be read by PSFex. [header4ext2] is what will be recorded as a single long string in the data part of the 2nd extension of the output table [fits_ldac_out], and [data4ext3] is the data part of an HDU that will define both the header and data parts of extension 3 of [fits_ldac_out]. """ # convert header to single (very) long string ext2_str = header4ext2.tostring(endcard=False, padding=False) # if the following line is not added, the very end of the data # part of extension 2 is written to a fits table such that PSFex # runs into a segmentation fault when attempting to read it (took # me ages to find out!). ext2_str += 'END END' # read into string array ext2_data = np.array([ext2_str]) # determine format string for header of extention 2 formatstr = str(len(ext2_str))+'A' # create table 1 col1 = fits.Column(name='Field Header Card', array=ext2_data, format=formatstr) cols = fits.ColDefs([col1]) ext2 = fits.BinTableHDU.from_columns(cols) # make sure these keywords are in the header ext2.header['EXTNAME'] = 'LDAC_IMHEAD' ext2.header['TDIM1'] = '(80, {0})'.format(len(ext2_str)/80) # simply create extension 3 from [data4ext3] ext3 = fits.BinTableHDU(data4ext3) # extname needs to be as follows ext3.header['EXTNAME'] = 'LDAC_OBJECTS' # sort output table by number column if needed if doSort: index_sort = np.argsort(ext3.data['NUMBER']) ext3.data = ext3.data[index_sort] # create primary HDU prihdr = fits.Header() prihdu = fits.PrimaryHDU(header=prihdr) prihdu.header['EXPTIME'] = header4ext2['EXPTIME'] prihdu.header['FILTNAME'] = header4ext2['FILTNAME'] # prihdu.header['SEEING'] = header4ext2['SEEING'] #need to calculte and add prihdu.header['BKGSIG'] = header4ext2['SEXBKDEV'] # write hdulist to output LDAC fits table hdulist = fits.HDUList([prihdu, ext2, ext3]) hdulist.writeto(fits_ldac_out, clobber=True) hdulist.close() ################################################################################