我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用unicodecsv.DictReader()。
def test_node_csv_download(self, node, testapp): import unicodecsv as csv node.enrolled_on = dt.datetime.utcnow() node.last_checkin = dt.datetime.utcnow() node.last_ip = '1.1.1.1' node.node_info = {'hardware_vendor': "Honest Achmed's Computer Supply"} node.save() resp = testapp.get(url_for('manage.nodes_csv')) assert resp.headers['Content-Type'] == 'text/csv; charset=utf-8' assert resp.headers['Content-Disposition'] == 'attachment; filename=nodes.csv' reader = csv.DictReader(io.BytesIO(resp.body)) row = next(reader) assert row['Display Name'] == node.display_name assert row['Host Identifier'] == node.host_identifier assert row['Enrolled On'] == str(node.enrolled_on) assert row['Last Check-In'] == str(node.last_checkin) assert row['Last Ip Address'] == node.last_ip assert row['Is Active'] == 'True' assert row['Make'] == node.node_info['hardware_vendor']
def upload_recipients(self, request): if not request.method == 'POST': raise PermissionDenied if not self.has_change_permission(request): raise PermissionDenied reader = unicodecsv.DictReader(request.FILES['file']) for lineno, line in enumerate(reader, 1): group = line.pop('group', None) if 'slug' not in line: line['slug'] = slugify(line['name']) recipient = Recipient.objects.create( **line ) if group is not None: rg = RecipientGroup.objects.get(slug=group) recipient.groups.add(rg) return redirect('admin:confrontiv_recipient_changelist')
def dump_grammar(): with open(OPTS.filename) as csvfile: reader = csv.DictReader(csvfile) for row in reader: if OPTS.worker and row['WorkerId'] != OPTS.worker: continue if row['AssignmentStatus'] == 'Rejected': continue print 'HIT %s' % row['HITId'] print 'WorkerId: %s' % row['WorkerId'] print 'Time: %s s' % row['WorkTimeInSeconds'] input_qids = row['Input.qids'].split('\t') input_sents = row['Input.sents'].split('\t') ans_is_good = row['Answer.is-good'].split('\t') ans_responses = row['Answer.responses'].split('\t') for qid, s, is_good, response in zip(input_qids, input_sents, ans_is_good, ans_responses): print (' Example %s' % qid) print (' Sentence: %s' % s).encode('utf-8') print (' Is good? %s' % is_good) print (' Response: %s' % colored(response, 'cyan')).encode('utf-8')
def dump_verify(): with open(OPTS.filename) as csvfile: reader = csv.DictReader(csvfile) for row in reader: if OPTS.worker and row['WorkerId'] != OPTS.worker: continue if row['AssignmentStatus'] == 'Rejected': continue print 'HIT %s' % row['HITId'] print 'WorkerId: %s' % row['WorkerId'] print 'Time: %s s' % row['WorkTimeInSeconds'] qids = row['Input.qids'].split('\t') questions = row['Input.questions'].split('\t') sents = row['Answer.sents'].split('\t') responses = row['Answer.responses'].split('\t') for qid, q, s_str, response_str in zip( qids, questions, sents, responses): print (' Example %s' % qid) print (' Question %s' % q) s_list = s_str.split('|') a_list = response_str.split('|') for s, a in zip(s_list, a_list): print (' Sentence: %s' % sent_format(s)).encode('utf-8') print (' Is good? %s' % colored(a, 'cyan'))
def pred_human_eval(): all_preds = collections.defaultdict(list) with open(OPTS.filename) as csvfile: reader = csv.DictReader(csvfile) for row in reader: all_preds[row['Input.qid']].append(row['Answer.response']) preds = {} for qid in all_preds: if OPTS.ensemble: for a in all_preds[qid]: count = sum(1 for pred in all_preds[qid] if a == pred) if count > 1: preds[qid] = a break else: preds[qid] = random.sample(all_preds[qid], 1)[0] else: preds[qid] = random.sample(all_preds[qid], 1)[0] print json.dumps(preds)
def fetch(): out_path = os.path.dirname(__file__) out_path = os.path.join(out_path, 'fingerprints', 'data', 'types.yml') fh = urllib.urlopen(CSV_URL) types = {} for row in unicodecsv.DictReader(fh): name = stringify(row.get('Name')) abbr = stringify(row.get('Abbreviation')) if name is None or abbr is None: continue if name in types and types[name] != abbr: print name, types[name], abbr types[name] = abbr # print abbr, name with open(out_path, 'w') as fh: yaml.safe_dump({'types': types}, fh, indent=2, allow_unicode=True, canonical=False, default_flow_style=False)
def parse_csv(file_stream, expected_columns=None): """ Parse csv file and return a stream of dictionaries representing each row. First line of CSV file must contain column headers. Arguments: file_stream: input file expected_columns (set[unicode]): columns that are expected to be present Yields: dict: CSV line parsed into a dictionary. """ reader = unicodecsv.DictReader(file_stream, encoding="utf-8") if expected_columns and set(expected_columns) - set(reader.fieldnames): raise ValidationError(ValidationMessages.MISSING_EXPECTED_COLUMNS.format( expected_columns=", ".join(expected_columns), actual_columns=", ".join(reader.fieldnames) )) # "yield from reader" would be nicer, but we're on python2.7 yet. for row in reader: yield row
def _read_input(self, in_file): """Dummy file of inputs :param in_file: path to input file of 2 cols (tab-delim); accession_number, sequence :type string :return dictionary of accession_number to sequence tags """ result = {} with open(in_file, 'r') as f: reader = csv.DictReader(f, delimiter=str('\t')) for row in reader: result[row['accession']] = { 'transcript_sequence': row['transcript_sequence'], 'cds_start_i': int(row['cds_start_i']), 'cds_end_i': int(row['cds_end_i']) } return result
def process(self, activate=False): if not self.is_questions_created: data = list(csv.DictReader(self.csv.file)) for d in data: Question.objects.create(task=self, question=d) self.is_questions_created = True if activate: self.is_active = True self.save()
def read_test_labels(filename): labels = [] csvFile = open(filename) reader = unicodecsv.DictReader(csvFile,encoding='utf-8') for j in reader: labels.append(j['id']) return labels
def read_depths(filename): depths = {} csvFile = open(filename) reader = unicodecsv.DictReader(csvFile,encoding='utf-8') for j in reader: depths[j['key']] = int(j['depth']) return depths
def read_labels(filename): labels = {} csvFile = open(filename) reader = unicodecsv.DictReader(csvFile,encoding='utf-8') for j in reader: #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485': labels[j['id']] = int(j['cancer']) return labels
def read_test_labels(filename): labels = [] csvFile = open(filename) reader = unicodecsv.DictReader(csvFile,encoding='utf-8') for j in reader: #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485': labels.append(j['id']) return labels
def make_inquiry_requests_from_file(inquiry, file): reader = unicodecsv.DictReader(file) for lineno, line in enumerate(reader, 1): try: recipient = Recipient.objects.get(slug=line['recipient']) except Recipient.DoesNotExist: raise ValueError('Recipient on line %s not found' % lineno) if not recipient.groups.filter(id=inquiry.group_id).exists(): raise ValueError('Recipient %s not in inquiry group' % recipient) data = json.loads(line['data']) InquiryRequest.objects.create_from_inquiry(inquiry, recipient, data)
def read_sentences(): id_to_sents = collections.defaultdict(list) with open(OPTS.batch_file) as f: reader = csv.DictReader(f) for row in reader: input_qids = row['Input.qids'].split('\t') input_sents = row['Input.sents'].split('\t') ans_is_good = row['Answer.is-good'].split('\t') ans_responses = row['Answer.responses'].split('\t') for qid, s, is_good, response in zip(input_qids, input_sents, ans_is_good, ans_responses): if is_good == 'yes': response = s if response not in id_to_sents[qid]: id_to_sents[qid].append(response) return id_to_sents
def stats_grammar(): # Read data worker_to_is_good = collections.defaultdict(list) worker_to_times = collections.defaultdict(list) with open(OPTS.filename) as csvfile: reader = csv.DictReader(csvfile) for row in reader: if row['AssignmentStatus'] == 'Rejected': continue worker_id = row['WorkerId'] ans_is_good = row['Answer.is-good'].split('\t') time = float(row['WorkTimeInSeconds']) worker_to_is_good[worker_id].extend(ans_is_good) worker_to_times[worker_id].append(time) # Aggregate by worker print '%d total workers' % len(worker_to_times) worker_stats = {} for worker_id in worker_to_times: times = sorted(worker_to_times[worker_id]) t_median = times[len(times)/2] t_mean = sum(times) / float(len(times)) is_good_list = worker_to_is_good[worker_id] num_qs = len(is_good_list) frac_good = sum(1.0 for x in is_good_list if x == 'yes') / num_qs worker_stats[worker_id] = (t_median, t_mean, num_qs, frac_good) # Print sorted_ids = sorted(list(worker_stats), key=lambda x: worker_stats[x][3]) for worker_id in sorted_ids: t_median, t_mean, num_qs, frac_good = worker_stats[worker_id] print 'Worker %s: t_median %.1f, t_mean %.1f, %d questions, %.1f%% good' % ( worker_id, t_median, t_mean, num_qs, 100.0 * frac_good)
def stats_verify(): # Read data worker_to_is_good = collections.defaultdict(list) worker_to_times = collections.defaultdict(list) with open(OPTS.filename) as csvfile: reader = csv.DictReader(csvfile) for row in reader: if row['AssignmentStatus'] == 'Rejected': continue worker_id = row['WorkerId'] ans_is_good = [x for s in row['Answer.responses'].split('\t') for x in s.split('|')] time = float(row['WorkTimeInSeconds']) worker_to_is_good[worker_id].extend(ans_is_good) worker_to_times[worker_id].append(time) # Aggregate by worker print '%d total workers' % len(worker_to_times) worker_stats = {} for worker_id in worker_to_times: times = sorted(worker_to_times[worker_id]) t_median = times[len(times)/2] t_mean = sum(times) / float(len(times)) is_good_list = worker_to_is_good[worker_id] num_qs = len(is_good_list) frac_good = sum(1.0 for x in is_good_list if x == 'yes') / num_qs worker_stats[worker_id] = (t_median, t_mean, num_qs, frac_good) # Print sorted_ids = sorted(list(worker_stats), key=lambda x: worker_stats[x][3]) for worker_id in sorted_ids: t_median, t_mean, num_qs, frac_good = worker_stats[worker_id] print 'Worker %s: t_median %.1f, t_mean %.1f, %d questions, %.1f%% good' % ( worker_id, t_median, t_mean, num_qs, 100.0 * frac_good)
def get_csv(csv_file): reader = unicodecsv.DictReader(open(example_file, 'rb')) return list(reader)
def handle(self, *args, **options): translation.activate(settings.LANGUAGE_CODE) filename = options['filename'] reader = unicodecsv.DictReader(open(filename)) importer = CSVImporter() importer.run(reader)
def upload_information_objects(self, request): if not request.method == 'POST': raise PermissionDenied if not self.has_change_permission(request): raise PermissionDenied reader = unicodecsv.DictReader(request.FILES['file']) importer = CSVImporter() importer.run(reader) return redirect('admin:froide_campaign_informationobject_changelist')
def read_ipa_bases(ipa_bases): segments = [] with open(ipa_bases, 'rb') as f: dictreader = csv.DictReader(f, encoding='utf=8') for record in dictreader: form = record['ipa'] features = {k: v for k, v in record.items() if k != 'ipa'} segments.append(Segment(form, features)) return segments
def gcp_file_reader(fn): rdr = csv.DictReader(open(fn, "r"), delimiter=str("\t")) for rec in rdr: if rec["id"].startswith("#"): continue yield rec
def test_parser_test_completeness(self): """ensure that all rules in grammar have tests""" grammar_rule_re = re.compile("^(\w+)") grammar_fn = pkg_resources.resource_filename(__name__, "../hgvs/_data/hgvs.pymeta") with open(grammar_fn, "r") as f: grammar_rules = set(r.group(1) for r in filter(None, map(grammar_rule_re.match, f))) with open(self._test_fn, "r") as f: reader = csv.DictReader(f, delimiter=str("\t")) test_rules = set(row["Func"] for row in reader) untested_rules = grammar_rules - test_rules self.assertTrue(len(untested_rules) == 0, "untested rules: {}".format(untested_rules))
def test_parser_grammar(self): with open(self._test_fn, "r") as f: reader = csv.DictReader(f, delimiter=str("\t")) fail_cases = [] for row in reader: if row["Func"].startswith("#"): continue # setup input inputs = self._split_inputs(row["Test"], row["InType"]) expected_results = self._split_inputs(row["Expected"], row["InType"]) if row["Expected"] else inputs expected_map = dict(zip(inputs, expected_results)) # step through each item and check is_valid = True if row["Valid"].lower() == "true" else False for key in expected_map: expected_result = six.text_type(expected_map[key]).replace("u'", "'") function_to_test = getattr(self.p._grammar(key), row["Func"]) row_str = u"{}\t{}\t{}\t{}\t{}".format(row["Func"], key, row["Valid"], "one", expected_result) try: actual_result = six.text_type(function_to_test()).replace("u'", "'") if not is_valid or (expected_result != actual_result): print("expected: {} actual:{}".format(expected_result, actual_result)) fail_cases.append(row_str) except Exception as e: if is_valid: print("expected: {} Exception: {}".format(expected_result, e)) fail_cases.append(row_str) # everything should have passed - report whatever failed self.assertTrue(len(fail_cases) == 0, pprint.pprint(fail_cases))
def gxp_file_reader(fn): rdr = csv.DictReader(open(fn, "r"), delimiter=str("\t")) for rec in rdr: if rec["id"].startswith("#"): continue yield rec
def load_new_casecops(apps,schema_editor): infile_path = BASE_DIR + '/data/20160614_migration/casecops.csv' infile = open(infile_path) incsv = csv.DictReader(infile) for row in incsv: case_lookup = list(Case.objects.filter(case_no=row['case_no'])) if len(case_lookup) != 1: print 'ambiguous case:', row['case_no'], 'has len:', len(case_lookup) import ipdb; ipdb.set_trace() else: # can only create casecop if we verify there's 1 matching case in cases table case = case_lookup[0] cc = CaseCop.objects.create( id = row['id'], case = case, case_no = row['case_no'], slug = '', cop = Cop.objects.get(id=row['cop_id']) if row['cop_id'] else None, cop_first_name = row['cop_first_name'], cop_middle_initial = row['cop_middle_initial'], cop_last_name = row['cop_last_name'], badge_no = row['badge_no'], officer_atty = row['officer_atty'], officer_atty_firm = row['officer_atty_firm'], entered_by = row['entered_by'], entered_when = parse_str_date(row['entered_when']), fact_checked_by = row['fact_checked_by'], fact_checked_when = parse_str_date(row['fact_checked_when']), matched_by = row['matched_by'], matched_when = parse_str_date(row['matched_when']), note = row['note'], flag = row['flag'] == '1' ) cc.save()
def load_new_cases(apps,schema_editor): in_file_path = BASE_DIR + '/data/20160620_migration/cases.csv' in_file = open(in_file_path) in_csv = csv.DictReader(in_file) for row in in_csv: c = Case.objects.create( id = row['id'], case_no = row['CaseNumber'], date_filed = parse_str_date(row['DateFiled']), date_closed = parse_str_date(row['DateClosed']), judge = row['Judge'], plaintiff_atty = row['PlaintiffsLeadAttorney'], plaintiff_firm = row['PlaintiffsAttorneyLawFirm'], city_atty = row['CitysLeadAttorney'], city_firm = row['CitysAttorneyLawFirm'], magistrate = row['MagistrateJudge'], incident_date = parse_str_date(row['DateofIncident']), location = row['LocationListed'], address = row['StreetAddress'], city = row['City'], state = row['State'], lat = float(row['Latitude']) if row['Latitude'] else None, lon = float(row['Longitude']) if row['Longitude'] else None, census_place = row['CensusPlaceFips'], census_msa = row['CensusMsaFips'], census_met_div = row['CensusMetDivFips'], census_mcd = row['CensusMcdFips'], census_micro = row['CensusCbsaMicro'], census_cbsa = row['CensusCbsaFips'], census_block = row['CensusBlock'], census_block_g = row['CensusBlockGroup'], census_tract = row['CensusTract'], census_county = row['CensusCountyFips'], census_state = row['CensusStateFips'], naaccr_cert = row['naaccrCertCode'], m_number = row['MNumber'], m_predirection = row['MPreDirectional'], m_name = row['MName'], m_suffix = row['MSuffix'], m_city = row['MCity'], m_state = row['MState'], narrative = row['Narrative'], primary_cause = row['primary_cause'], federal_causes = row['federal_causes'], state_causes = row['state_causes'], interaction_type = row['interaction_type'], officers = row['officers'], victims = row['victims'], misconduct_type = row['misconduct_type'], weapons_used = row['weapons_used'], outcome = row['outcome'], tags = row['tags'], reporter = row['EnteredBy'], fact_checker = row['Fact-checkedby'], differences = row['Differences'], ) c.save()