我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用Bio.Seq.Seq()。
def cast_to_str(obj): """Return a string representation of a Seq or SeqRecord. Args: obj (str, Seq, SeqRecord): Biopython Seq or SeqRecord Returns: str: String representation of the sequence """ if isinstance(obj, str): return obj if isinstance(obj, Seq): return str(obj) if isinstance(obj, SeqRecord): return str(obj.seq) else: raise ValueError('Must provide a string, Seq, or SeqRecord object.')
def cast_to_seq(obj, alphabet=IUPAC.extended_protein): """Return a Seq representation of a string or SeqRecord object. Args: obj (str, Seq, SeqRecord): Sequence string or Biopython SeqRecord object alphabet: See Biopython SeqRecord docs Returns: Seq: Seq representation of the sequence """ if isinstance(obj, Seq): return obj if isinstance(obj, SeqRecord): return obj.seq if isinstance(obj, str): obj = obj.upper() return Seq(obj, alphabet) else: raise ValueError('Must provide a string, Seq, or SeqRecord object.')
def seq(self): """Seq: Dynamically loaded Seq object from the sequence file""" if self.sequence_file: file_to_load = copy(self.sequence_path) log.debug('{}: reading sequence from sequence file {}'.format(self.id, file_to_load)) tmp_sr = SeqIO.read(file_to_load, 'fasta') return tmp_sr.seq else: if not self._seq: log.debug('{}: no sequence stored in memory'.format(self.id)) else: log.debug('{}: reading sequence from memory'.format(self.id)) return self._seq
def to_biopython_record(self): """ Example ------- from Bio import SeqIO gr_record = GraphicRecord(features=features, sequence_length=len(seq), sequence=seq) bio_record = gr_record.to_biopython_record() with open("example.gb", "w+") as f: SeqIO.write(record, f, "genbank") """ if not BIOPYTHON_AVAILABLE: raise ImportError(".to_biopython_record requires Biopython") features = [ SeqFeature(FeatureLocation(f.start, f.end, f.strand), type=f.feature_type, qualifiers={"label": f.label}) for f in self.features ] sequence = Seq(self.data["sequence"], alphabet=DNAAlphabet()) return SeqRecord(sequence=sequence, features=features)
def _sequence(v, deparse=False): if not deparse: try: return Seq(v, IUPAC.ambiguous_dna) except: return None else: try: return str(v) except: return '' # Initializer # # Arguments: row = dictionary of {field:value} data # genotyped = if True assign v_call from genotyped field # Returns: IgRecord
def getSeqField(self, field): if field in IgRecord._field_map: v = getattr(self, IgRecord._field_map[field]) elif field in self.annotations: v = self.annotations[field] else: return None if isinstance(v, Seq): return v elif isinstance(v, str): return Seq(v, IUPAC.ambiguous_dna) else: return None # Returns: dictionary of the namespace
def load_dataframe(db_file, dialect='excel-tab'): df = pd.read_csv(db_file, dialect=dialect) df = df.rename(columns=dict(zip(df.columns, df.columns.str.lower()))) # df = df[df['functional'] == 'F'] # parse v and j genes to speed up computation later df['v_gene_set'] = [set( parseAllele(x, gene_regex, 'set')) for x in df.v_call] df['v_gene_set_str'] = [str(set( parseAllele(x, gene_regex, 'set'))) for x in df.v_call] df['j_gene_set'] = [set( parseAllele(x, gene_regex, 'set')) for x in df.j_call] df['junc'] = [junction_re(x) for x in df.junction] df['aa_junc'] = [str(Seq(x, generic_dna).translate()) for x in df.junc] df['aa_junction_length'] = [len(x) for x in df.aa_junc] return df
def _parse_gff(file, file_name): records = [] with open(file, "r") as infile: for i, rec in enumerate(GFF.parse(infile)): # Enumerates the contigs (can be chromosome, plasmid and unidentified) # based on total number of contigs (not type) rec_id = rec.id + "_" + str(i + 1) if len(rec_id) > 15: rec_id = "contig_" + "_" + str(i + 1) seq_record = SeqRecord(Seq(str(rec.seq), IUPAC.unambiguous_dna), id=rec_id, description=os.path.basename(file_name), features=rec.features) records.append(seq_record) return records
def get_cds_sequence(rna,c_df,chrom_seq): pr_df = c_df[c_df['rna_id'].values==rna] strand = list(set(pr_df[6].tolist())) if len(strand) == 2: assert False, rna+' has both strands' # seqeunce merge chr_seq = Seq('') for start,end in zip(pr_df[3],pr_df[4]): if strand == ['-']: chr_seq += chrom_seq[start-1:end].reverse_complement() else: chr_seq += chrom_seq[start-1:end] # consider the frame information in 7th column frame = int(pr_df[7].tolist()[0]) rna_seq = chr_seq[frame:] return str(rna_seq.translate())
def translation(self): """ Returns the amino acid sequence of self B = "Asx"; Aspartic acid (R) or Asparagine (N) X = "Xxx"; Unknown or 'other' amino acid Z = "Glx"; Glutamic acid (E) or Glutamine (Q) J = "Xle"; Leucine (L) or Isoleucine (I), used in mass-spec (NMR) U = "Sec"; Selenocysteine O = "Pyl"; Pyrrolysine """ codon_table = CodonTable.ambiguous_dna_by_id[self.transl_table] seq = Seq(str(self.sequence()),IUPACAmbiguousDNA()) translated_seq = seq.translate(codon_table).tostring().replace('B','X').replace('Z','X').replace('J','X') if '*' in translated_seq[:-1]: # check if premature stop codon in the translation logging.error('Stop codon found within the CDS. It will rise an error submiting the data to ENA. Please fix your gff file.') # remove the stop character. It's not accepted by embl if translated_seq[-1:] == "*": translated_seq = translated_seq[:-1] return translated_seq
def main(args): for record in SeqIO.parse(args.infile, 'fasta'): if args.discard: if sum([1 for rx in args.discard if re.match(rx, record.id)]) > 0: continue subseqcounter = 0 printlog(args.debug, "DEBUG: convert to upper case", record.id) sequence = str(record.seq).upper() printlog(args.debug, "DEBUG: split seq by Ns", record.id) subseqs = [ss for ss in re.split('[^ACGT]+', sequence) if len(ss) > args.minlength] printlog(args.debug, "DEBUG: print subseqs", record.id) for subseq in subseqs: subseqcounter += 1 subid = '{:s}_chunk_{:d}'.format(record.id, subseqcounter) subrecord = SeqRecord(Seq(subseq), subid, '', '') SeqIO.write(subrecord, args.outfile, 'fasta')
def get_reconstructed_alignment(self): """ Get the multiple sequence alignment including reconstructed sequences for the internal nodes. """ from Bio.Align import MultipleSeqAlignment from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord self.logger("TreeAnc.get_reconstructed_alignment ...",2) if not hasattr(self.tree.root, 'sequence'): self.logger("TreeAnc.reconstructed_alignment... reconstruction not yet done",3) self.reconstruct_anc('ml') new_aln = MultipleSeqAlignment([SeqRecord(id=n.name, seq=Seq("".join(n.sequence)), description="") for n in self.tree.find_clades()]) return new_aln
def make_RefCmap(fasta_file, enz=None, min_len=20, min_nsite=5, path=None): name = fasta_file.rsplit('.',1)[0].split('/')[-1] index = 0 enzymes = {'BspQI':'GCTCTTC', 'BbvCI':'CCTCAGC', 'Bsml':'GAATGC', 'BsrDI':'GCAATG', 'bseCI':'ATCGAT', 'BssSI':'CACGAG'} try: cmap_file='%s/%s_%s.cmap'%(path,name,enz) forwards = enzymes[enz] reverse = str(Seq(forwards).reverse_complement()) with open (cmap_file,'a') as ref_cmap: ref_cmap.write('# CMAP File Version:\t0.1\n') ref_cmap.write('# Label Channels:\t1\n') ref_cmap.write('# Nickase Recognition Site 1:\t%s\n'%forwards) ref_cmap.write('# Enzyme1:\tNt.%s\n'%enz) ref_cmap.write('# Number of Consensus Nanomaps:\tN/A\n') ref_cmap.write('#h CMapId\tContigLength\tNumSites\tSiteID\tLabelChannel\tPosition\tStdDev\tCoverage\tOccurrence\n') ref_cmap.write('#f int\tfloat\tint\tint\tint\tfloat\tfloat\tint\tint\n') for seqs in SeqIO.parse(fasta_file,'fasta'): seq = str(seqs.seq.upper()) seq_len = len(seq) index+=1 if seq_len >= min_len*1000: nsites = len(re.findall('%s|%s'%(forwards,reverse),seq)) if nsites >=min_nsite: j=1 for o in re.finditer('%s|%s'%(forwards,reverse),seq): ref_cmap.write('%s\t%.1f\t%d\t%d\t1\t%.1f\t1.0\t1\t1\n'%(index,seq_len,nsites,j,o.start()+1)) j+=1 ref_cmap.write('%s\t%.1f\t%d\t%d\t0\t%.1f\t0.0\t1\t0\n'%(index,seq_len,nsites,j,seq_len)) except: pass
def generateSeqHandles(anIndexCfg): """ The YAML config file to parse is like: handles: prefix: "TTAGTCTCCGACGGCAGGCTTCAAT" postfix: "ACGCACCCACCGGGACTCAG" indexes: [ "ACAGTC", "TGATGC", "TCTCAG" ] There is a handle at one end of each sequence which is as follows: TTAGTCTCCGACGGCAGGCTTCAAT-ACAGTC-ACGCACCCACCGGGACTCAG prefix -index - postfix """ forwardIdx= [] # the result array to collect handle sequence strings handlePrefix = anIndexCfg["handles"]["prefix"] handlePostfix = anIndexCfg["handles"]["postfix"] for index in anIndexCfg["indexes"]: forwardIdx.append(handlePrefix + index + handlePostfix) reverseIdx = [] # to collect reverse complements for handle in forwardIdx: seq = Seq(handle) rc = str(seq.reverse_complement()) reverseIdx.append(rc) return (forwardIdx,reverseIdx)
def seq_record_example(): """Dummy SeqRecord to load""" return SeqRecord(Seq("MKQHKAMIVALIVICITAVVAALVTRKDLCEVHIRTGQTEVAVF", IUPAC.protein), id="YP_025292.1", name="HokC", description="toxic membrane protein, small", annotations={'hello':'world'})
def test_set_seq_with_str(self, seqprop_with_i, seq_str_example): """Test setting the seq attribute with a sequence string""" seqprop_with_i.seq = seq_str_example assert type(seqprop_with_i.seq) == Seq assert str(seqprop_with_i.seq) == seq_str_example
def test_set_seq_with_seqrecord(self, seqprop_with_i, seq_record_example): """Test setting the seq attribute with a SeqRecord""" seqprop_with_i.seq = seq_record_example assert type(seqprop_with_i.seq) == Seq assert seqprop_with_i.seq == seq_record_example.seq assert seqprop_with_i.name == seq_record_example.name assert seqprop_with_i.description == seq_record_example.description assert seqprop_with_i.annotations == seq_record_example.annotations
def test_write_fasta_file(self, seqprop_with_i, tmpdir, test_files_outputs, seq_record_example): """Test that everything behaves properly when writing the SeqProp to a FASTA file""" # Add dummy annotations to the SeqProp - check to see if they stay in the SeqProp even after Seq is written seqprop_with_i.letter_annotations.update({'test_la_key': 'X' * len(seqprop_with_i.seq)}) seqprop_with_i.features.append(SeqFeature(FeatureLocation(1, 3))) # Write the Seq to a FASTA file outpath = tmpdir.join('test_seqprop_with_i_write_fasta_file.fasta').strpath seqprop_with_i.write_fasta_file(outfile=outpath, force_rerun=True) # Test that the file was written assert op.exists(outpath) assert op.getsize(outpath) > 0 # Test that file paths are correct assert seqprop_with_i.sequence_path == outpath assert seqprop_with_i.sequence_file == 'test_seqprop_with_i_write_fasta_file.fasta' assert seqprop_with_i.sequence_dir == tmpdir # Once a file is written, the annotations should not be lost, even though the sequence now # loads from the written file as a Seq assert seqprop_with_i.description == seq_record_example.description assert seqprop_with_i.annotations == seq_record_example.annotations assert seqprop_with_i.letter_annotations == {'test_la_key': 'X' * len(seq_record_example.seq)} assert len(seqprop_with_i.features) == 1 # Test that sequence cannot be changed with pytest.raises(ValueError): seqprop_with_i.seq = 'THISWILLNOTBETHESEQ' assert seqprop_with_i.seq == seq_record_example.seq
def cast_to_seq_record(obj, alphabet=IUPAC.extended_protein, id="<unknown id>", name="<unknown name>", description="<unknown description>", dbxrefs=None, features=None, annotations=None, letter_annotations=None): """Return a SeqRecord representation of a string or Seq object. Args: obj (str, Seq, SeqRecord): Sequence string or Biopython Seq object alphabet: See Biopython SeqRecord docs id: See Biopython SeqRecord docs name: See Biopython SeqRecord docs description: See Biopython SeqRecord docs dbxrefs: See Biopython SeqRecord docs features: See Biopython SeqRecord docs annotations: See Biopython SeqRecord docs letter_annotations: See Biopython SeqRecord docs Returns: SeqRecord: SeqRecord representation of the sequence """ if isinstance(obj, SeqRecord): return obj if isinstance(obj, Seq): return SeqRecord(obj, id, name, description, dbxrefs, features, annotations, letter_annotations) if isinstance(obj, str): obj = obj.upper() return SeqRecord(Seq(obj, alphabet), id, name, description, dbxrefs, features, annotations, letter_annotations) else: raise ValueError('Must provide a string, Seq, or SeqRecord object.')
def write_fasta_file(self, outfile, force_rerun=False): """Write a FASTA file for the protein sequence, ``seq`` will now load directly from this file. Args: outfile (str): Path to new FASTA file to be written to force_rerun (bool): If an existing file should be overwritten """ if ssbio.utils.force_rerun(flag=force_rerun, outfile=outfile): SeqIO.write(self, outfile, "fasta") # The Seq as it will now be dynamically loaded from the file self.sequence_path = outfile
def createRCs(inputFile, outNameVal): """Creates a .bed file with the reverse complements of the given set of sequences.""" # Determine the stem of the input filename. fileName = str(inputFile).split('.')[0] # Open input file for reading. with open(inputFile, 'r') as f: file_read = [line.strip() for line in f] # Create list to hold output. outList = [] # Parse out probe info, flip sequence to RC, and write to output list. for i in range(0, len(file_read), 1): chrom = file_read[i].split('\t')[0] start = file_read[i].split('\t')[1] stop = file_read[i].split('\t')[2] probeSeq = file_read[i].split('\t')[3] RevSeq = Seq(probeSeq, IUPAC.unambiguous_dna).reverse_complement() Tm = file_read[i].split('\t')[4] outList.append('%s\t%s\t%s\t%s\t%s' % (chrom, start, stop, RevSeq, Tm)) # Determine the name of the output file. if outNameVal is None: outName = '%s_RC' % fileName else: outName = outNameVal # Create the output file. output = open('%s.bed' % outName, 'w') # Write the output file output.write('\n'.join(outList)) output.close()
def _write_fasta(self, target="allele_seq_ref"): """ Write fasta file of sequences with SNP IDs for CD-HIT. """ file_name = os.path.join(self.tmp_path, self.project + "_Seqs") seqs = [SeqRecord(Seq(data[target], IUPAC.unambiguous_dna), id=snp_id, name="", description="") for snp_id, data in self.data.items()] file_name += ".fasta" with open(file_name, "w") as fasta_file: SeqIO.write(seqs, fasta_file, "fasta") return file_name
def reverse_complement(seq): return Seq(seq).reverse_complement()
def consensus_seqrecord(consensus, ref_id): return SeqRecord(Seq(consensus), id=ref_id + '_cns', description='')
def addSegmentToJunctionFileSE(vSeg,jSeg,cSeg,out,fastaDict, bases, idNameDict): vSeq = fastaDict[vSeg] if jSeg != 'NA': jName = idNameDict[jSeg] jSeq = fastaDict[jSeg] else: jSeq = '' jName = 'NoJ' if cSeg != 'NA': cName = idNameDict[cSeg] cSeq = fastaDict[cSeg] else: cName = 'NoC' cSeq = '' jcSeq = jSeq + cSeq lenSeg = min(len(vSeq),len(jcSeq)) if bases != -10: if lenSeg < bases: sys.stdout.write(str(datetime.datetime.now()) + ' Bases parameter is bigger than the length of the V or J segment, taking the length' \ 'of the V/J segment instead, which is: ' + str(lenSeg) + '\n') sys.stdout.flush() else: lenSeg = bases jTrim = jcSeq[:lenSeg] vTrim = vSeq[-1*lenSeg:] junc = vTrim + jTrim recordName = vSeg + '.' + jSeg + '.' + cSeg + '(' + idNameDict[vSeg] + '-' + jName + '-' + cName + ')' record = SeqRecord(Seq(junc,IUPAC.ambiguous_dna), id = recordName, description = '') SeqIO.write(record,out,'fasta')
def findCDR3(fasta, aaDict, vdjFaDict): f = open(fasta, 'rU') fDict = dict() for record in SeqIO.parse(f, 'fasta'): if record.id in fDict: sys.stderr.write(str(datetime.datetime.now()) + ' Error! same name for two fasta entries %s\n' % record.id) sys.stderr.flush() else: idArr = record.id.split('.') vSeg = idArr[0] jSeg = idArr[1] if ((vSeg in aaDict) & (jSeg in aaDict)): currDict = findVandJaaMap(aaDict[vSeg],aaDict[jSeg],record.seq) else: if vSeg in aaDict: newVseg = aaDict[vSeg] else: vId = idArr[3] currSeq = vdjFaDict[vId] newVseg = getBestVaa(Seq(currSeq)) if jSeg in aaDict: newJseg = aaDict[jSeg] else: jId = idArr[4] currSeq= vdjFaDict[jId] newJseg = getBestJaa(Seq(currSeq)) currDict = findVandJaaMap(newVseg,newJseg,record.seq) fDict[record.id] = currDict f.close() return fDict
def extract_genes(output_folder, refs_folder, gff_folder, di): tp = 'gene' refs_files = sorted([join(refs_folder, f) for f in listdir(refs_folder) if isfile(join(refs_folder, f))]) gff_files = sorted([join(gff_folder, f) for f in listdir(gff_folder) if isfile(join(gff_folder, f))]) limit_info = dict( gff_type =['gene']) sequences = [] for ind in range(len(refs_files)): f_fasta = refs_files[ind] f_gff = gff_files[ind] nm = f_gff.split("/")[-1][:-4] record = SeqIO.to_dict(SeqIO.parse(f_fasta, "fasta")) new_record = {} p = re.compile("N\w_\w+\.\d") for it in record: new_id = p.findall(it)[0] new_record[new_id] = record[it] record = new_record in_handle = open(f_gff) for rec in GFF.parse(in_handle, limit_info=limit_info, base_dict=record): for f in rec.features: if f.qualifiers['ID'][0] in di.get(f_gff, []): subrecord = SeqRecord.SeqRecord(Seq.Seq(re.sub(r'[^ATGC]', 'A', str(f.extract(rec.seq)))), id=nm + "_" + rec.id + "_" + str(f.location), name=nm + "_" + str(f.location)+ "_" + tp, description= tp) sequences.append(subrecord) in_handle.close() with open(output_folder+'sc_genes_seq.fasta', "w") as output_handle: SeqIO.write(sequences, output_handle, "fasta")
def partial_genes_extract(contigs_path, outdir): contigs = SeqIO.to_dict(SeqIO.parse(contigs_path, "fasta")) partial_genes = [] with open(outdir+'prodigal_output.gff') as f: for rec in GFF.parse(f, base_dict=contigs): for feature in rec.features: extract_seq = str(feature.extract(rec.seq)) if feature.qualifiers['partial'][0] in ['11', '01', '10'] and len(extract_seq) > 110: subrecord = SeqRecord.SeqRecord(Seq.Seq(extract_seq), id='gene_'+str(len(partial_genes))+'_len_'+str(len(extract_seq)) + '_conf_'+str(feature.qualifiers['conf'][0]), description='patrial gene') partial_genes.append(subrecord) return partial_genes
def write_orfs(self, outdir): """ Write ORFs from self.orfs and create files ORFs.[fasta, path] with sequences and paths :param outdir: saves path (with "/" in the end) """ with open(outdir + 'ORFs.fasta', 'w') as f, open(outdir + 'ORFs.path', 'w') as path: counter = 0 for o in self.orfs: SeqIO.write(SeqRecord(Seq(o), id='ORF_{0}'.format(counter), description=self.orfs[o][1]), f, 'fasta') path.write('ORF_{0} '.format(counter) + 'max_edge_len: {0}\n'.format(self.max_edge_len(o)) + str(self.orfs[o][0][0]) + ',' + ''.join([i[0] + i[1] + ',' for i in self.orfs[o][0][1:-1]]) + str(self.orfs[o][0][-1]) + '\n') counter += 1
def getVSeq(self): # _seq_vdj = self.getField('SEQUENCE_VDJ') _seq_vdj = self.getField('SEQUENCE_IMGT') # _idx_v = int(self.getField('V_SEQ_LENGTH')) return _seq_vdj[:312] # 312: V length without cdr3 # Get a field value converted to a Seq object by column name # # Arguments: field = column name # Returns: value in the field as a Seq object
def pad_nucleotide_sequences(aln_aa, seq_nuc): ''' introduce gaps of 3 (---) into nucleotide sequences corresponding to aligned DNA sequences. Parameters: - aln_aa: amino acid alignment - seq_nuc: unaligned nucleotide sequences. Returns: - aligned nucleotide sequences with all gaps length 3 ''' from Bio.Align import MultipleSeqAlignment from Bio.SeqRecord import SeqRecord from Bio.Seq import Seq aln_nuc = MultipleSeqAlignment([]) for aa_seq in aln_aa: try: tmp_nuc_seq = str(seq_nuc[aa_seq.id].seq) except KeyError as e: print aa_seq.id print 'Key not found, continue with next sequence' continue tmpseq = '' nuc_pos = 0 for aa in aa_seq: if aa=='-': tmpseq+='---' else: tmpseq+=tmp_nuc_seq[nuc_pos:(nuc_pos+3)] nuc_pos+=3 aln_nuc.append(SeqRecord(seq=Seq(tmpseq),id=aa_seq.id)) return aln_nuc
def add_translations(self): ''' translate the nucleotide sequence into the proteins specified in self.proteins. these are expected to be SeqFeatures ''' from Bio import Seq # Sort proteins by start position of the corresponding SeqFeature entry. sorted_proteins = sorted(self.proteins.items(), key=lambda protein_pair: protein_pair[1].start) for node in self.tree.find_clades(order='preorder'): if not hasattr(node, "translations"): # Maintain genomic order of protein translations for easy # assembly by downstream functions. node.translations=OrderedDict() node.aa_mutations = {} for prot, feature in sorted_proteins: node.translations[prot] = Seq.translate(str(feature.extract(Seq.Seq("".join(node.sequence)))).replace('-', 'N')) if node.up is None: node.aa_mutations[prot] = [] else: node.aa_mutations[prot] = [(a,pos,d) for pos, (a,d) in enumerate(zip(node.up.translations[prot], node.translations[prot])) if a!=d] self.dump_attr.append('translations')
def __init__(self, logger, sequences, reference, dateFormat): super(sequence_set, self).__init__() self.log = logger # load sequences from the (parsed) JSON - don't forget to sort out dates self.seqs = {} for name, data in sequences.iteritems(): self.seqs[name] = SeqRecord(Seq(data["seq"], generic_dna), id=name, name=name, description=name) self.seqs[name].attributes = data["attributes"] # tidy up dates date_struc = parse_date(self.seqs[name].attributes["raw_date"], dateFormat) self.seqs[name].attributes["num_date"] = date_struc[1] self.seqs[name].attributes["date"] = date_struc[2] # if the reference is to be analysed it'll already be in the (filtered & subsampled) # sequences, so no need to add it here, and no need to care about attributes etc # we do, however, need it for alignment self.reference_in_dataset = reference["included"] name = reference["strain"] self.reference_seq = SeqRecord(Seq(reference["seq"], generic_dna), id=name, name=name, description=name) if "genes" in reference and len(reference["genes"]): self.proteins = {k:FeatureLocation(start=v["start"], end=v["end"], strand=v["strand"]) for k, v in reference["genes"].iteritems()} else: self.proteins = None # other things: self.run_dir = '_'.join(['temp', time.strftime('%Y%m%d-%H%M%S',time.gmtime()), str(random.randint(0,1000000))]) self.nthreads = 2 # should load from config file
def strip_non_reference(self): ''' remove insertions relative to the reference from the alignment ''' ungapped = np.array(self.reference_aln)!='-' for seq in self.aln: seq.seq = Seq("".join(np.array(seq)[ungapped]))
def make_gaps_ambiguous(self): ''' replace all gaps by 'N' in all sequences in the alignment. TreeTime will treat them as fully ambiguous and replace then with the most likely state ''' for seq in self.aln: seq_array = np.array(seq) gaps = seq_array=='-' seq_array[gaps]='N' seq.seq = Seq("".join(seq_array))
def translate(self): ''' make alignments of translations. ''' from Bio.Seq import CodonTable codon_table = CodonTable.ambiguous_dna_by_name['Standard'].forward_table self.translations={} if not hasattr(self, "proteins"): # ensure dictionary to hold annotation self.proteins={} # add a default translation of the entire sequence unless otherwise specified if len(self.proteins)==0: self.proteins.update({'cds':FeatureLocation(start=0, end=self.aln.get_alignment_length(), strand=1)}) # loop over all proteins and create one MSA for each for prot in self.proteins: aa_seqs = [] for seq in self.aln: tmpseq = self.proteins[prot].extract(seq) translated_seq, translation_exception = safe_translate(str(tmpseq.seq), report_exceptions=True) if translation_exception: self.log.notify("Trouble translating because of invalid codons %s" % seq.id) tmpseq.seq = Seq(translated_seq) # copy attributes tmpseq.attributes = seq.attributes aa_seqs.append(tmpseq) self.translations[prot] = MultipleSeqAlignment(aa_seqs)
def get_aa_seq(chain): ''' Extract amino acid sequence from a PDB chain object and return sequence as Bio.SeqRecord object. ''' aa_list = [] residue_numbers = [] for residue in chain: if is_aa(residue): aa_list.append(SCOPData.protein_letters_3to1[residue.resname]) residue_numbers.append(str(residue.get_id()[1]) + \ residue.get_id()[2].strip()) aa_seq = SeqRecord(Seq(''.join(aa_list)), id='pdb_seq', description='') return aa_seq, residue_numbers
def add_fixed_errors(input_iter, nr_errors, error_type): """Simulate sequencing errors for each SeqRecord object in the input iterator. :param input_iter: Iterator of SeqRecord objects. :para nr_errors: Number of errors to introduce. :param error_type: Error type: substitution, insertion or deletion. :returns: Generator of SeqRecord objects. :rtype: generator """ for record in input_iter: mutated_seq = sim_seq.add_errors(record.seq, nr_errors, error_type) record.seq = Seq(mutated_seq) yield record
def simulate_errors(input_iter, error_rate, error_weights): """Simulate sequencing errors for each SeqRecord object in the input iterator. :param input_iter: Iterator of SeqRecord objects. :para error_rate: Total error rate of substitutions, insertions and deletions. :param error_weights: Relative frequency of substitutions,insertions,deletions. :returns: Generator of SeqRecord objects. :rtype: generator """ for record in input_iter: mutated_seq = sim_seq.simulate_sequencing_errors(record.seq, error_rate, error_weights).seq record.seq = Seq(mutated_seq) yield record
def build_record(id, classification): description = (classification['classifier'] + '|' + str(classification['taxid']) + '|' + classification['sciname'] + '|' + classification['rank'] + '|' + ';'.join(classification['lineage'])) record = SeqRecord(Seq(classification['sequence'], IUPAC.ambiguous_dna), id=id, description=description) return record
def digest(fasta_records, enzyme): """ Divide a genome into restriction fragments. Parameters ---------- fasta_records : OrderedDict Dictionary of chromosome names to sequence records. enzyme: str Name of restriction enzyme. Returns ------- Dataframe with columns: 'chrom', 'start', 'end'. """ import Bio.Restriction as biorst import Bio.Seq as bioseq # http://biopython.org/DIST/docs/cookbook/Restriction.html#mozTocId447698 chroms = fasta_records.keys() try: cut_finder = getattr(biorst, enzyme).search except AttributeError: raise ValueError('Unknown enzyme name: {}'.format(enzyme)) def _each(chrom): seq = bioseq.Seq(str(fasta_records[chrom])) cuts = np.r_[0, np.array(cut_finder(seq)) + 1, len(seq)].astype(int) n_frags = len(cuts) - 1 frags = pd.DataFrame({ 'chrom': [chrom] * n_frags, 'start': cuts[:-1], 'end': cuts[1:]}, columns=['chrom', 'start', 'end']) return frags return pd.concat(map(_each, chroms), axis=0, ignore_index=True)
def sequence(self): """ Returns the nucleotide sequence of self """ if self.location.strand > 0: return SeqFeature(location = self.location).extract(self.seq) seq = Seq("") for part in reversed(self.location.parts): seq += SeqFeature(location = part).extract(self.seq) return seq
def add_str(self, seq, name=None, description=""): self.add_seq(SeqRecord(Seq(seq), id=name, description=description))
def reduce_exons(iterable_of_seqrecords): """Reduce the exons by sequence identity Build a dict whose keys are the sequence in str format and the values are lists in which the first element is the exon_id and the remaining are the coordinates in loci:start-end format """ # Initialize seq_to_data = dict() # Go over each record for record in iterable_of_seqrecords: seq = str(record.seq) if seq in seq_to_data: # Just append seq_to_data[seq].append(record.description) else: # Enter values in both dicts seq_to_data[seq] = [ 'EXON{:011d}'.format(len(seq_to_data) + 1), record.description ] # Collect data from both dicts into a SeqRecord and return for seq in seq_to_data.keys(): identifier, *description = seq_to_data[seq] record = SeqRecord( id=identifier, description=" ".join(description), seq=Seq(seq) ) yield record
def _segments_to_exon_dict(segments): """(list of lists) -> dict Conver a list of ["S", "ident", "seq", *whatever] to a dict {ident: SeqRecord} """ exon_dict = {} for segment in segments: exon_id, sequence = segment[1:3] exon_dict[exon_id] = SeqRecord( id=exon_id, description="", seq=Seq(sequence) ) return exon_dict
def _compose_paths(exon_dict, path_dict, number_of_ns): ns = "N" * number_of_ns for transcript_id, exon_list in sorted(path_dict.items()): exon_seqs = [str(exon_dict[exon_id].seq) for exon_id in exon_list] yield SeqRecord( id=transcript_id, description=",".join(exon_list), seq=Seq(ns.join(exon_seqs)) )
def COMPL_STRING(line): my_seq = Seq(line) compl_line=str(my_seq.reverse_complement()) return compl_line
def gen_sequence(comp, length) : seq_string = '' ##### Part 3 : Your random sequence generating code goes here # Assertion is advised assert abs( sum(comp.values())-1 ) < 0.01 , 'Probabilities do not add up to 1.' # We generate a sequence of given length for i in range(length) : # Get a random number in [0,1) dice = random() limit=0 # We divide [0,1) interval according to probabilities of each nucleotide for nuc in comp : limit += comp[nuc] # We add the letter that dice hits if dice<limit : seq_string += nuc limit = 0 # Roll another dice for the next nucleotide break ##### sequence = Seq(seq_string) return SeqRecord(sequence, id='Random Sequence', description=comp.__repr__()) ##### ALL MODIFICATIONS GO ABOVE THIS LINE ##### ### Part 0 : Argument Parsing ### We want out program to have easy-to-use parameters ### We are using the argparse library for this
def gen_sequence(comp, length) : seq_string = '' ##### Part 3 : Your random sequence generating code goes here ##### Goal : Fill in seq_string with a random sequence of given composition sequence = Seq(seq_string) return SeqRecord(sequence, id='Random Sequence', description=comp.__repr__()) ##### ALL MODIFICATIONS GO ABOVE THIS LINE ##### ### Part 0 : Argument Parsing ### We want out program to have easy-to-use parameters ### We are using the argparse library for this
def cds_seq(self): """Lookup transcript, exon and UTR info from Ensembl Rest API.""" if not self._cds_seq: seq_info = client.perform_rest_action( '/sequence/id/{0}'.format(self.transcript_id), params={'type': 'cds'}) seq_str = seq_info['seq'] if seq_str.startswith('N') or seq_str.endswith('N'): seq_str = seq_str.strip('N') seq = Seq(seq_str, IUPAC.unambiguous_dna) self._cds_seq = seq self._aa_seq = seq.translate() else: seq = self._cds_seq return seq