我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unicodecsv.reader()。
def loadRecord(line): """ ????csv?? """ input_line=StringIO.StringIO(line) #row=unicodecsv.reader(input_line, encoding="utf-8") #return row.next() #reader=csv.DictReader(input_line,fieldnames=["id","qid1","qid2","question1","question2","is_duplicate"]) reader=csv.reader(input_line) return reader.next() #data=[] #for row in reader: # print row # data.append([unicode(cell,"utf-8") for cell in row]) #return data[0] #return reader.next() #raw_data=sc.textFile(train_file_path).map(loadRecord) #print raw_data.take(10)
def read_notes_file_to_dict(ifname): """ Reads a notes file to a dict returns a dictionary where the key is the reference name and the value is the note """ notes_dict = {} # csvfile = codecs.open(ifname, 'r', encoding='latin1') csvfile = open(ifname, 'r') # with open(ifname, 'rU') as csvfile: csv_reader = csv.reader(csvfile, delimiter=";") for row in csv_reader: row_text = row[2].strip() notes_dict[row[1].strip()] = row_text csvfile.close() return notes_dict
def get_kanji(level, current_pos=1): """ get_kanji returns a single record of the current_pos line position level: 1 - 4 (N1 to N4) current_pos: up to number of records """ kanji = {} with open(KANJI_FILENAMES[level], 'rb') as fobj: reader = csv.reader(fobj, delimiter=',', encoding='utf-8') num_of_lines = 0 for line in reader: num_of_lines += 1 if num_of_lines == current_pos: kanji = dict(zip(KANJI_FIELDS, line)) break # Convert to UTF-8 for key, value in kanji.iteritems(): kanji[key] = value.encode("utf-8") return kanji
def get_vocabulary(current_pos=1): """ get_vocabulary returns a single record of the current_pos line position current_pos: up to number of records """ vocabulary = {} with open(VOCABULARY_FILENAME, 'rb') as fobj: reader = csv.reader(fobj, delimiter=',', encoding='utf-8') num_of_lines = 0 for line in reader: num_of_lines += 1 if num_of_lines == current_pos: vocabulary = dict(zip(VOCABULARY_FIELDS, line)) break # Convert to UTF-8 for key, value in vocabulary.iteritems(): vocabulary[key] = value.encode("utf-8") return vocabulary
def load_csv_dataset(filename): """ Loads a csv filename as a dataset :param str filename: name of the file :return List[DataSample]: a list of DataSample """ dataset = [] with open(os.path.join(DIR_GENERATED_DATA, filename), 'rb') as file: reader = csv.reader(file, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL, errors='ignore') for row in reader: id = int(row[0]) text = row[1] gene = row[2] variation = row[3] try: real_class = int(row[4]) except: real_class = None dataset.append(DataSample(id, text, gene, variation, real_class)) return dataset
def load(self): global dictionaries if dictionaries == {}: for file in os.listdir('./dictionaries'): metadata_name = re.sub(r'.dic', r'', file) print "Loading dictionary for %s" % metadata_name with open('./dictionaries/' + file, 'rb') as concepts_dictionary: Tag = namedtuple('Tag', 'concept, pos, semanticType') dictionary = [] for tag in map(Tag._make, unicodecsv.reader(concepts_dictionary, delimiter='\t', encoding='utf-8')): dictionary.append(tag) dictionaries[metadata_name] = dictionary return dictionaries
def restart_harvest(args): harvest = get_harvest(args) data_dir = os.path.join(os.getcwd(), 'data', harvest) meta = get_metadata(data_dir) if meta: try: with open(os.path.join(data_dir, 'results.csv'), 'rb') as csv_file: reader = csv.reader(csv_file, delimiter=',', encoding='utf-8') rows = list(reader) if len(rows) > 1: start = len(rows) - 2 # Remove the last row in the CSV just in case there was a problem rows = rows[:-1] with open(os.path.join(data_dir, 'results.csv'), 'wb') as csv_file: writer = csv.writer(csv_file, delimiter=',', encoding='utf-8') for row in rows: writer.writerow(row) else: start = 0 except IOError: # Nothing's been harvested start = 0 start_harvest(data_dir=data_dir, key=meta['key'], query=meta['query'], pdf=meta['pdf'], text=meta['text'], start=start, max=meta['max'])
def cbsa_lookup(): """ Construct a County->CBSA Lookup table from NBER data Returns: dict each key is a (State Code, County FIPS code) tuple each value is a (CBSA FIPS code, CBSA Name) tuple """ logging.info("Beginning CBSA lookup") cbsa_lookup = defaultdict(dict) download = requests.get(URL) decoded_content = download.content.decode('latin-1').encode('utf-8') reader = csv.reader(decoded_content.splitlines(), delimiter=',') # skip header line next(reader) for row in reader: state_code = row[1] fipscounty = row[3][-3:] cbsa = row[4] cbsaname = row[5] cbsa_lookup[state_code][fipscounty] = (cbsa, cbsaname) return cbsa_lookup
def _skills_lookup(self): """Create skills lookup Reads the object's filename containing skills into a lookup Returns: (set) skill names """ logging.info('Creating skills lookup from %s', self.skill_lookup_path) lookup = defaultdict(set) with smart_open(self.skill_lookup_path) as infile: reader = csv.reader(infile, delimiter='\t') header = next(reader) ksa_index = header.index(self.nlp.transforms[0]) soc_index = header.index('O*NET-SOC Code') for row in reader: lookup[row[soc_index]].add(row[ksa_index]) return lookup
def fetch_from_datapackage(self, **kwargs): if not self._skip_resource(**kwargs): # IMPORTANT! # after this point - kwargs are ignored as we are fetching from previously prepared csv data if self.csv_path and os.path.exists(self.csv_path): with open(self.csv_path, 'rb') as csv_file: csv_reader = unicodecsv.reader(csv_file) header_row = None for row in csv_reader: if not header_row: header_row = row else: csv_row = OrderedDict(zip(header_row, row)) parsed_row = [] for field in self.descriptor["schema"]["fields"]: try: parsed_row.append((field["name"], self._get_field_original_value(csv_row[field["name"]], field))) except Exception as e: import logging message = "error parsing field %s in file %s : %s" % (field["name"],self.csv_path, str(e)) logging.exception(message) raise Exception(message) yield OrderedDict(parsed_row)
def _assert_no_duplicates(self, input_path, encoding, sep, quotechar): if input_path.endswith('.csv'): with open(input_path, 'r') as csvfile: reader = unicodecsv.reader(csvfile, encoding=encoding, delimiter=sep, quotechar=quotechar) fields = reader.next() for col in fields: if fields.count(col) > 1: raise DuplicatedField(col) # TODO: Implementar chequeo de que no hay duplicados para XLSX elif input_path.endswith('.xlsx'): pass
def _load_reromanizer(self, table, decompose): path = os.path.join('data', 'reromanize', table + '.csv') try: path = pkg_resources.resource_filename(__name__, path) except: print('Could not locate {}.'.format(path), file=sys.stderr) if os.path.isfile(path): mapping = {} with open(path, 'rb') as f: reader = csv.reader(f, encoding='utf-8') next(reader) for ipa, rom in reader: rom = normalize('NFD', rom) if decompose else normalize('NFC', rom) mapping[ipa] = rom return mapping else: print('File {} does not exist.'.format(path), file=sys.stderr) return {}
def _read_bases(self, fn, weights): fn = pkg_resources.resource_filename(__name__, fn) segments = [] with open(fn, 'rb') as f: reader = csv.reader(f, encoding='utf-8') header = next(reader) names = header[1:] for row in reader: ipa = row[0] vals = [{'-': -1, '0': 0, '+': 1}[x] for x in row[1:]] vec = Segment(names, {n: v for (n, v) in zip(names, vals)}, weights=weights) segments.append((ipa, vec)) seg_dict = dict(segments) return segments, seg_dict, names
def _read_table(self, filename): """Read the data from data/ipa_all.csv into self.segments, a list of 2-tuples of unicode strings and sets of feature tuples and self.seg_dict, a dictionary mapping from unicode segments and sets of feature tuples. """ filename = pkg_resources.resource_filename( __name__, filename) segments = [] with open(filename, 'rb') as f: reader = csv.reader(f, encoding='utf-8') header = next(reader) names = header[1:] for row in reader: seg = row[0] vals = row[1:] specs = set(zip(vals, names)) segments.append((seg, specs)) seg_dict = dict(segments) return segments, seg_dict, names
def _create_filtered_index(self, source=dir_path + '../data/character_index.csv', destination=dir_path + '../data/character_index_filtered.csv'): with io.open(source, 'rb') as fin_index, io.open(destination, 'w', encoding='utf8') as fout: total_lines_relations = line_counting.cached_counter.count_lines(self.path_relations) self.logger.print_info('Collecting important entities...') important_articles = set() nt_reader = NTReader(self.path_relations) for subject, predicate, object in tqdm(nt_reader.yield_cleaned_entry_names(), total=total_lines_relations): important_articles.add(subject) total_lines_index = line_counting.cached_counter.count_lines(source) self.logger.print_info('Filtering important entities...') index_reader = csv.reader(fin_index, delimiter=self.delimiter, encoding='utf-8', quoting=csv.QUOTE_NONE) for line in tqdm(index_reader, total=total_lines_index): subject, character_offset = line if subject in important_articles: fout.write(subject + self.delimiter + character_offset + '\n')
def __init__(self, file_handle, delimiter='\t'): self.reader = csv.reader(file_handle, delimiter=delimiter, encoding='utf-8') self.fields = list(six.next(self.reader))
def __next__(self): return KbartRecord(six.next(self.reader), fields=self.fields)
def positive_and_negative_to_full(): fpos = open('positive.csv') positive_units = [row for row in csv.reader(fpos)] fneg = open('negative.csv') negative_units = [row for row in csv.reader(fneg)] for item in positive_units: item.append('positive') for item in negative_units: item.append('negative') del negative_units[0] positive_units[0][0] = 'review_content' positive_units[0][1] = 'sentiment' full = positive_units full.extend(negative_units) with open('positiveandnegative.csv', 'wb') as csvfile: writer = csv.writer(csvfile, dialect='excel') writer.writerows(full) #this will open the review scraped data and write two files from that info: #positive.csv, containing positive opinion units #negative.csv, containing negative opinion units
def load_csv_wikipedia_gen(filename): """ Loads a csv filename as a wikipedia genes dataset :param str filename: name of the file :return List[WikipediaGene]: a list of WikipediaGene """ dataset = [] with open(os.path.join(DIR_GENERATED_DATA, filename)) as file: reader = csv.reader(file, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL) for row in reader: dataset.append(WikipediaGene(row[0], row[1])) return dataset
def convert(input_file_name, **kwargs): """Convert CSV file to HTML table""" delimiter = kwargs["delimiter"] or "," quotechar = kwargs["quotechar"] or "|" if six.PY2: delimiter = delimiter.encode("utf-8") quotechar = quotechar.encode("utf-8") # Read CSV and form a header and rows list with open(input_file_name, "rb") as input_file: reader = csv.reader(input_file, encoding="utf-8", delimiter=delimiter, quotechar=quotechar) csv_headers = [] if not kwargs.get("no_header"): # Read header from first line csv_headers = next(reader) csv_rows = [row for row in reader if row] # Set default column name if header is not present if not csv_headers and len(csv_rows) > 0: end = len(csv_rows[0]) + 1 csv_headers = ["Column {}".format(n) for n in range(1, end)] # Render csv to HTML html = render_template(csv_headers, csv_rows, **kwargs) # Freeze all JS files in template return freeze_js(html)
def compareMelodicSimiResults(path_largerPyin,list_lessNRank_phrase_name): ''' compare with the results of melodic similarity find the intersection set, melodic similarity ranking > N, phonetic similarity ranking < N :param path_largerPyin: path of the melodic similarity csv :param list_lessNRank_phrase_name: ranking less than N phrase name by phonetic similarity :return: intersection set of the phrase name ''' phrase_names_largerN = [] with open(path_largerPyin,'r') as openfile: csv_reader = csv.reader(openfile,delimiter=',') for row in csv_reader: phrase_names_largerN.append(row[0]) return set.intersection(set(phrase_names_largerN),set(list_lessNRank_phrase_name))
def parse_csv(self): if not getattr(self, 'csv', None): with codecs.open(self.filename) as f: self.csv = list(unicodecsv.reader(f)) return self.csv
def load_accounts(self): filename = self.get_accounts_filename() if not os.path.exists(filename): return [] with codecs.open(filename) as f: return map(self._csv_row_to_account, unicodecsv.reader(f))
def load_transactions(self, filename): if not os.path.exists(filename): return [] with codecs.open(filename) as f: return map(self._csv_row_to_transaction, unicodecsv.reader(f))
def get_results(data_dir): results = {} try: with open(os.path.join(data_dir, 'results.csv'), 'rb') as csv_file: reader = csv.reader(csv_file, delimiter=',', encoding='utf-8') rows = list(reader) results['num_rows'] = len(rows) - 1 results['last_row'] = rows[-1] except IOError: results['num_rows'] = 0 results['last_row'] = None return results
def negative_positive_dict(): """ Construct a dictionary of terms that are considered not to be in job title, including states, states abv, cities Returns: dictionary of set """ logging.info("Beginning negative dictionary build") states = [] states.extend(list(map(lambda x: x.lower(), list(us.states.mapping('name', 'abbr').keys())))) states.extend(list(map(lambda x: x.lower(), list(us.states.mapping('name', 'abbr').values())))) places = [] download = requests.get(PLACEURL) reader = csv.reader(download.content.decode('latin-1').encode('utf-8').splitlines(), delimiter=',') next(reader) for row in reader: cleaned_placename = re.sub(r'\([^)]*\)', '', row[4]).rstrip() for suffix in SUFFIXES: if cleaned_placename.endswith(suffix): cleaned_placename = cleaned_placename.replace(suffix, '').rstrip() places.append(cleaned_placename.lower()) places = list(set(places)) places.remove('not in a census designated place or incorporated place') onetjobs = [] download = requests.get(ONETURL) reader = csv.reader(download.content.splitlines(), delimiter='\t') next(reader) for row in reader: onetjobs.append(row[2].lower()) onetjobs.append(row[3].lower()) onetjobs = list(set(onetjobs)) return {'states': states, 'places': places, 'onetjobs': onetjobs}
def ua_cbsa(): """ Construct a UA->CBSA Lookup table from Census data Returns: dict { UA Fips: [(CBSA FIPS, CBSA Name)] } """ logging.info("Beginning CBSA lookup") lookup = defaultdict(list) download = requests.get(URL) reader = csv.reader( download.content.decode('latin-1').encode('utf-8').splitlines(), delimiter=',' ) not_designated = 0 total = 0 # skip header line next(reader) for row in reader: total += 1 ua_fips = row[0] cbsa_fips = row[2] cbsa_name = row[3] if cbsa_fips == '99999' or ua_fips == '99999': not_designated += 1 continue lookup[ua_fips].append((cbsa_fips, cbsa_name)) logging.info( 'Done extracting CBSAs %s total rows, %s not designated, %s found', total, not_designated, total - not_designated ) return lookup
def _skills_lookup(self): """Create skills lookup Reads the object's filename containing skills into a lookup Returns: (set) skill names """ with smart_open(self.skill_lookup_path) as infile: reader = csv.reader(infile, delimiter='\t') next(reader) index = 3 generator = (self.reg_ex(row[index]) for row in reader) return set(generator)
def test_committees(self): # fetching directly self.assertEqual(list(MockCommitteesResource().fetch()), [dict(COMMITTEE_EXPECTED_DATA, id=3)]) self.assertEqual(list(MockCommitteesResource().fetch(committee_ids=[4])), [dict(COMMITTEE_EXPECTED_DATA, id=4)]) self.assertEqual(list(MockCommitteesResource().fetch(all_committees=True)), [dict(COMMITTEE_EXPECTED_DATA, id=1), dict(COMMITTEE_EXPECTED_DATA, id=2), dict(COMMITTEE_EXPECTED_DATA, id=3), dict(COMMITTEE_EXPECTED_DATA, id=4)]) self.assertEqual(list(MockCommitteesResource().fetch(main_committees=True)), [dict(COMMITTEE_EXPECTED_DATA, id=1), dict(COMMITTEE_EXPECTED_DATA, id=2),]) # making the resource data_root = self.given_temporary_data_root() MockCommitteesResource("committees", data_root).make() with open(os.path.join(data_root, "committees.csv")) as f: lines = unicodecsv.reader(f.readlines()) self.assertEqual(list(lines), [ ['id', 'type_id', 'parent_id', 'name', 'name_eng', 'name_arb', 'begin_date', 'end_date', 'description', 'description_eng', 'description_arb', 'note', 'note_eng', 'portal_link', 'scraper_errors'], ['3', '4', '', 'hebrew name', 'string', 'string', '1950-01-01T00:00:00', '', 'hebrew description', 'string', 'string', 'string', 'string', 'can be used to link to the dedicated page in knesset website', ''] ]) # fetching from the made resource fetched_items = MockCommitteesResource("committees", data_root).fetch_from_datapackage() fetched_items = [dict(oredered_dict.items()) for oredered_dict in fetched_items] self.assertEqual(fetched_items, [dict(COMMITTEE_EXPECTED_DATA, id=3)])
def test_committee_meeting_protocols(self): # protocols only support appending resource = CommitteeMeetingProtocolsResource("committee-meeting-protocols", self.given_temporary_data_root()) committee_id, meeting_id, meeting_datetime = 6, 7, datetime.datetime(1953,5,4) # a contextmanager for mock protocol @contextlib.contextmanager def meeting_protocol(): yield type("MockProtocol", (object,), {"text": "Hello World!", "parts": [type("MockProtocolPart", (object,), {"header": "mock header", "body": "mock body"}), type("MockProtocolPart", (object,), {"header": "mock header 2", "body": "mock body 2"})], "file_name": ""}) # appending using the fake protocol resource.append_for_meeting(committee_id, meeting_id, meeting_datetime, meeting_protocol(), skip_exceptions=True) # checking the created files with open(resource.get_file_path(".csv")) as f: self.assertEqual(list(unicodecsv.reader(f.readlines())), [['committee_id', 'meeting_id', 'text', 'parts', 'original', 'scraper_errors'], ['6', '7', 'committee_6/7_1953-05-04_00-00-00/protocol.txt', 'committee_6/7_1953-05-04_00-00-00/protocol.csv', '', "error getting original file: [Errno 2] No such file or directory: ''"]]) with open(resource.get_path("committee_6", "7_1953-05-04_00-00-00", "protocol.txt")) as f: self.assertEqual(f.readlines(), ["Hello World!"]) with open(resource.get_path("committee_6", "7_1953-05-04_00-00-00", "protocol.csv")) as f: self.assertEqual(f.readlines(), ['header,body\r\n', 'mock header,mock body\r\n', 'mock header 2,mock body 2\r\n'])
def run(self, filename): with open(filename, 'r') as f: source = csv.reader(f, delimiter='\t') header = next(source) pywikibot.output("Header of the input table: " + ', '.join(header) ) titles = namedtuple('titles', ', '.join(header)) titles = [titles._make(row) for row in source] if not titles: pywikibot.output("We were not able to extract the data to work on. Exiting.") return for row in titles: commons = "%s - Musei del cibo - %s - %s.jpg" % (row.nome, row.museo, row.inventario) description = u""" {{Musei del cibo | museo = %s | inventario = %s | nome = %s | ambito = %s | epoca = %s | dimensioni = %s | materia = %s | descrizione = %s | provenienza = %s | note = %s | bibliografia = %s }} """ % (row.museo, row.inventario, row.nome, row.ambito, row.epoca, row.dimensioni, row.materia, row.descrizione, row.provenienza, row.note, row.biblio) try: upload = UploadRobot(row.inventario + ".jpg", description=description, useFilename=commons, keepFilename=True, verifyDescription=False, ignoreWarning=False, aborts=True) upload.run() except: pywikibot.output("ERROR: The upload could not be completed.")
def _assert_correct_csv(self, actual_csv, expected_rows): """ Asserts that CSV file ``actual_csv`` contains ``expected_rows`` """ reader = unicodecsv.reader(actual_csv.getvalue().splitlines(), encoding="utf-8") # preprocess expected - convert everything to strings expected_rows = [ [str(item) for item in row] for row in expected_rows ] actual_rows = list(reader) self.assertEqual(actual_rows, expected_rows)
def _load_punc_norm_map(self): """Load the map table for normalizing 'down' punctuation.""" path = pkg_resources.resource_filename(__name__, 'data/puncnorm.csv') with open(path, 'rb') as f: reader = csv.reader(f, encoding='utf-8', delimiter=str(','), quotechar=str('"')) next(reader) return {punc: norm for (punc, norm) in reader}
def _read_arpabet(self, arpabet): arpa_map = {} with open(arpabet, 'rb') as f: reader = csv.reader(f, encoding='utf-8') for arpa, ipa in reader: arpa_map[arpa] = ipa return arpa_map
def _load_g2p_map(self, code): """Load the code table for the specified language. Args: code (str): ISO 639-3 code plus "-" plus ISO 15924 code for the language/script to be loaded """ g2p = defaultdict(list) gr_by_line = defaultdict(list) try: path = os.path.join('data', 'map', code + '.csv') path = pkg_resources.resource_filename(__name__, path) except IndexError: raise DatafileError('Add an appropriately-named mapping to the data/maps directory.') with open(path, 'rb') as f: reader = csv.reader(f, encoding='utf-8') next(reader) for (i, fields) in enumerate(reader): try: graph, phon = fields except ValueError: raise DatafileError('Map file is not well formed at line {}.'.format(i + 2)) graph = unicodedata.normalize('NFC', graph) phon = unicodedata.normalize('NFC', phon) g2p[graph].append(phon) gr_by_line[graph].append(i) if self._one_to_many_gr_by_line_map(g2p): graph, lines = self._one_to_many_gr_by_line_map(gr_by_line) lines = [l + 2 for l in lines] raise MappingError('One-to-many G2P mapping for "{}" on lines {}'.format(graph, ', '.join(map(str, lines))).encode('utf-8')) return g2p
def _load_punc_norm_map(self): """Load the map table for normalizing 'down' punctuation.""" path = os.path.join('data', 'puncnorm.csv') path = pkg_resources.resource_filename(__name__, path) with open(path, 'rb') as f: reader = csv.reader(f, encoding='utf-8', delimiter=str(','), quotechar=str('"')) next(reader) return {punc: norm for (punc, norm) in reader}
def main(fn): ft = panphon.FeatureTable() xs = epitran.xsampa.XSampa() with open(fn, 'rb') as f: reader = csv.reader(f, encoding='utf-8') next(reader) phones = set() for orth, phon in reader: phones = phones.union(set(ft.segs_safe(phon))) print(len(phones)) print(sorted(list(map(xs.ipa2xs, phones))))
def main(): for csv in glob.glob('*.csv'): txt = re.match('[A-Za-z-]+', csv).group(0) + '.txt' with open(csv, 'rb') as f, io.open(txt, 'w', encoding='utf-8') as g: reader = unicodecsv.reader(f, encoding='utf-8') next(reader) for fields in reader: if re.match('\s*%', fields[0]): print(','.join([x for x in fields if x]), file=g) else: rule = build_rule(fields) rule = re.sub('[ ]+', ' ', rule) rule = re.sub('[ ]$', '', rule) print(rule, file=g)
def main(fns, fnn): punc = set() for fn in fns: print fn with open(fn, 'rb') as f: reader = csv.reader(f, encoding='utf-8') for _, s in reader: if len(s) == 1 and unicodedata.category(s)[0] == u'P': punc.add(s) with open(fnn, 'wb') as f: writer = csv.writer(f, encoding='utf-8') for mark in sorted(list(punc)): writer.writerow([mark])
def read_map(fn): with open(fn, 'rb') as f: reader = csv.reader(f, encoding='utf-8') next(reader) return [(a, b) for [a, b] in reader]
def _read_ipa2xs(self): path = os.path.join('data', self.ipa2xs_fn) path = pkg_resources.resource_filename(__name__, path) pairs = [] with open(path, 'rb') as f: reader = csv.reader(f, encoding='utf-8') next(reader) for ipa, xs, _ in reader: pairs.append((ipa, xs.encode('utf-8'),)) trie = marisa_trie.BytesTrie(pairs) return trie
def csv_data(csv_path, skip_header=True): """Pass in the path to a CSV file, returns a CSV Reader object. """ csv_file = open(csv_path, 'r') # Determine the CSV dialect. dialect = unicodecsv.Sniffer().sniff(csv_file.read(1024)) csv_file.seek(0) data = unicodecsv.reader(csv_file, dialect) if skip_header: data.next() return data
def _read_weights(self, weights_fn): weights_fn = pkg_resources.resource_filename(__name__, weights_fn) with open(weights_fn, 'rb') as f: reader = csv.reader(f, encoding='utf-8') next(reader) weights = [float(x) for x in next(reader)] return weights
def _read_weights(self, filename=os.path.join('data', 'feature_weights.csv')): filename = pkg_resources.resource_filename( __name__, filename) with open(filename, 'rb') as f: reader = csv.reader(f, encoding='utf-8') next(reader) weights = [float(x) for x in next(reader)] return weights
def write_ipa_all(ipa_bases, ipa_all, all_segments, sort_order): with open(ipa_bases, 'rb') as f: reader = csv.reader(f, encoding='utf-8') fieldnames = next(reader) with open(ipa_all, 'wb') as f: writer = csv.DictWriter(f, encoding='utf-8', fieldnames=fieldnames) writer.writerow({k: k for k in fieldnames}) all_segments_list = sort_all_segments(sort_order, all_segments) for segment in all_segments_list: fields = copy.copy(segment.features) fields['ipa'] = segment.form writer.writerow(fields)
def read_xsampa_table(self): filename = os.path.join('data', 'ipa-xsampa.csv') filename = pkg_resources.resource_filename(__name__, filename) with open(filename, 'rb') as f: xs2ipa = {x[1]: x[0] for x in csv.reader(f, encoding='utf-8')} xs = sorted(xs2ipa.keys(), key=len, reverse=True) xs_regex = re.compile('|'.join(map(re.escape, xs))) return xs_regex, xs2ipa
def _read_ipa_bases(self, fn): fn = pkg_resources.resource_filename(__name__, fn) with open(fn, 'rb') as f: reader = csv.reader(f, encoding='utf-8', delimiter=str(',')) names = next(reader)[1:] bases = {} for row in reader: seg, vals = row[0], row[1:] bases[seg] = (set(zip(vals, names))) return bases, names
def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs): # csv.py doesn't do Unicode; encode temporarily as UTF-8: csv_reader = csv.reader(utf_8_encoder(unicode_csv_data), dialect=dialect, **kwargs) for row in csv_reader: # decode UTF-8 back to Unicode, cell by cell: try: yield [unicode(cell, 'utf-8') for cell in row] except: yield [unicode(cell, 'latin-1') for cell in row]
def itervoters(self): if self.voter_file_content: if type(self.voter_file_content) == unicode: content = self.voter_file_content.encode('utf-8') else: content = self.voter_file_content # now we have to handle non-universal-newline stuff # we do this in a simple way: replace all \r with \n # then, replace all double \n with single \n # this should leave us with only \n content = content.replace('\r','\n').replace('\n\n','\n') voter_stream = io.BytesIO(content) else: voter_stream = open(self.voter_file.path, "rU") #reader = unicode_csv_reader(voter_stream) reader = unicodecsv.reader(voter_stream, encoding='utf-8') for voter_fields in reader: # bad line if len(voter_fields) < 1: continue return_dict = {'voter_id': voter_fields[0].strip()} if len(voter_fields) > 1: return_dict['email'] = voter_fields[1].strip() else: # assume single field means the email is the same field return_dict['email'] = voter_fields[0].strip() if len(voter_fields) > 2: return_dict['name'] = voter_fields[2].strip() else: return_dict['name'] = return_dict['email'] yield return_dict