我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用io.StringIO.StringIO()。
def _render(self, mode='human', close=False): if close: return outfile = StringIO.StringIO() if mode == 'ansi' else sys.stdout row, col = self.s // self.ncol, self.s % self.ncol desc = self.desc.tolist() desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True) outfile.write("\n".join("".join(row) for row in desc)+"\n") if self.lastaction is not None: outfile.write(" ({})\n".format(self.get_action_meanings()[self.lastaction])) else: outfile.write("\n") return outfile
def testRun(self): db = DAL(DEFAULT_URI, check_reserved=['all']) db.define_table('person', Field('name')) db.define_table('pet',Field('friend',db.person),Field('name')) for n in range(2): db(db.pet).delete() db(db.person).delete() for k in range(10): id = db.person.insert(name=str(k)) db.pet.insert(friend=id,name=str(k)) db.commit() stream = StringIO.StringIO() db.export_to_csv_file(stream) db(db.pet).delete() db(db.person).delete() stream = StringIO.StringIO(stream.getvalue()) db.import_from_csv_file(stream) assert db(db.person).count()==10 assert db(db.pet.name).count()==10 drop(db.pet) drop(db.person) db.commit()
def testRun(self): db = DAL(DEFAULT_URI, check_reserved=['all']) db.define_table('person', Field('name'),Field('uuid')) db.define_table('pet',Field('friend',db.person),Field('name')) for n in range(2): db(db.pet).delete() db(db.person).delete() for k in range(10): id = db.person.insert(name=str(k),uuid=str(k)) db.pet.insert(friend=id,name=str(k)) db.commit() stream = StringIO.StringIO() db.export_to_csv_file(stream) db(db.person).delete() db(db.pet).delete() stream = StringIO.StringIO(stream.getvalue()) db.import_from_csv_file(stream) assert db(db.person).count()==10 assert db(db.pet).count()==10 drop(db.pet) drop(db.person) db.commit()
def testRun(self): db = DAL(DEFAULT_URI, check_reserved=['all']) db.define_table('person', Field('name')) db.define_table('pet',Field('friend',db.person),Field('name')) for n in range(2): db(db.pet).delete() db(db.person).delete() for k in range(10): id = db.person.insert(name=str(k)) db.pet.insert(friend=id,name=str(k)) db.commit() stream = StringIO.StringIO() db.export_to_csv_file(stream) db(db.pet).delete() db(db.person).delete() stream = StringIO.StringIO(stream.getvalue()) db.import_from_csv_file(stream) assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==10 db.pet.drop() db.person.drop() db.commit()
def testRun(self): db = DAL(DEFAULT_URI, check_reserved=['all']) db.define_table('person', Field('name'),Field('uuid')) db.define_table('pet',Field('friend',db.person),Field('name')) for n in range(2): db(db.pet).delete() db(db.person).delete() for k in range(10): id = db.person.insert(name=str(k),uuid=str(k)) db.pet.insert(friend=id,name=str(k)) db.commit() stream = StringIO.StringIO() db.export_to_csv_file(stream) stream = StringIO.StringIO(stream.getvalue()) db.import_from_csv_file(stream) assert db(db.person).count()==10 assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==20 db.pet.drop() db.person.drop() db.commit()
def __init__(self): self.body = StringIO.StringIO()
def retrieve(self, name, path=None, nameonly=False): """ if nameonly==True return (filename, fullfilename) instead of (filename, stream) """ self_uploadfield = self.uploadfield if self.custom_retrieve: return self.custom_retrieve(name, path) import http if self.authorize or isinstance(self_uploadfield, str): row = self.db(self == name).select().first() if not row: raise http.HTTP(404) if self.authorize and not self.authorize(row): raise http.HTTP(403) m = REGEX_UPLOAD_PATTERN.match(name) if not m or not self.isattachment: raise TypeError('Can\'t retrieve %s' % name) file_properties = self.retrieve_file_properties(name,path) filename = file_properties['filename'] if isinstance(self_uploadfield, str): # ## if file is in DB stream = StringIO.StringIO(row[self_uploadfield] or '') elif isinstance(self_uploadfield,Field): blob_uploadfield_name = self_uploadfield.uploadfield query = self_uploadfield == name data = self_uploadfield.table(query)[blob_uploadfield_name] stream = StringIO.StringIO(data) elif self.uploadfs: # ## if file is on pyfilesystem stream = self.uploadfs.open(name, 'rb') else: # ## if file is on regular filesystem # this is intentially a sting with filename and not a stream # this propagates and allows stream_file_or_304_or_206 to be called fullname = pjoin(file_properties['path'],name) if nameonly: return (filename, fullname) stream = open(fullname,'rb') return (filename, stream)
def __str__(self): """ serializes the table into a csv file """ s = StringIO.StringIO() self.export_to_csv_file(s) return s.getvalue()
def getText(self, program_name, interval): """ :type interval: Interval.Interval :param program_name: :param interval: :return: """ rewrites = self.programs.get(program_name) start = interval.start stop = interval.stop # ensure start/end are in range if stop > len(self.tokens.tokens) - 1: stop = len(self.tokens.tokens) - 1 if start < 0: start = 0 # if no instructions to execute if not rewrites: return self.tokens.getText(interval) buf = StringIO() indexToOp = self._reduceToSingleOperationPerIndex(rewrites) i = start while all((i <= stop, i < len(self.tokens.tokens))): op = indexToOp.get(i) token = self.tokens.get(i) if op is None: if token.type != Token.EOF: buf.write(token.text) i += 1 else: i = op.execute(buf) if stop == len(self.tokens.tokens)-1: for op in indexToOp.values(): if op.index >= len(self.tokens.tokens)-1: buf.write(op.text) return buf.getvalue()
def execute(self, buf): """ :type buf: StringIO.StringIO :param buf: :return: """ return self.index
def compress_results(res): out = StringIO.StringIO() file_content = json.dumps(res) with gzip.GzipFile(fileobj=out, mode="w") as f: f.write(file_content) return out.getvalue()
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): self.queue = StringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()
def retrieve(self, name, path=None, nameonly=False): """ If `nameonly==True` return (filename, fullfilename) instead of (filename, stream) """ self_uploadfield = self.uploadfield if self.custom_retrieve: return self.custom_retrieve(name, path) import http if self.authorize or isinstance(self_uploadfield, str): row = self.db(self == name).select().first() if not row: raise http.HTTP(404) if self.authorize and not self.authorize(row): raise http.HTTP(403) file_properties = self.retrieve_file_properties(name, path) filename = file_properties['filename'] if isinstance(self_uploadfield, str): # ## if file is in DB stream = StringIO.StringIO(row[self_uploadfield] or '') elif isinstance(self_uploadfield, Field): blob_uploadfield_name = self_uploadfield.uploadfield query = self_uploadfield == name data = self_uploadfield.table(query)[blob_uploadfield_name] stream = StringIO.StringIO(data) elif self.uploadfs: # ## if file is on pyfilesystem stream = self.uploadfs.open(name, 'rb') else: # ## if file is on regular filesystem # this is intentially a sting with filename and not a stream # this propagates and allows stream_file_or_304_or_206 to be called fullname = pjoin(file_properties['path'], name) if nameonly: return (filename, fullname) stream = open(fullname, 'rb') return (filename, stream)
def __str__(self): """ Serializes the table into a csv file """ s = StringIO.StringIO() self.export_to_csv_file(s) return s.getvalue()
def detect_face_task(img): """Detect faces from image @input: image @output: - all faces information """ # paramter for detect # image_size = 160 # margin = 44 minsize = 20 # minimum size of face threshold = [0.6, 0.7, 0.7] # three steps's threshold factor = 0.709 # scale factor # caffe model pnet = caffe_model.get_pnet() rnet = caffe_model.get_rnet() onet = caffe_model.get_onet() bounding_boxes, _ = detect_face.detect_face(img, minsize, pnet, rnet, onet, threshold, factor) print('detect bounding: ', bounding_boxes) print('Find faces: ', bounding_boxes.shape[0]) # all_faces is faces information list, include face bytes, face position all_faces = [] for face_position in bounding_boxes: face_position = face_position.astype(int) print('face position: ', face_position) # each face information, include position, face image head_rect = face_position[:4].tolist() # numpy array to python list head_img = misc.toimage(img).crop(head_rect) head_img_io = StringIO.StringIO() head_img.save(head_img_io, format='JPEG') head_img_b64 = base64.b64encode(head_img_io.getvalue()) # construct response face_info = {} face_info['rect'] = head_rect face_info['image'] = head_img_b64 all_faces.append(face_info) return all_faces
def draw_label(label, img, n_class, label_titles, bg_label=0): """Convert label to rgb with label titles. @param label_title: label title for each labels. @type label_title: dict """ from PIL import Image from scipy.misc import fromimage from skimage.color import label2rgb from skimage.transform import resize colors = labelcolormap(n_class) label_viz = label2rgb(label, img, colors=colors[1:], bg_label=bg_label) # label 0 color: (0, 0, 0, 0) -> (0, 0, 0, 255) label_viz[label == 0] = 0 # plot label titles on image using matplotlib plt.subplots_adjust(left=0, right=1, top=1, bottom=0, wspace=0, hspace=0) plt.margins(0, 0) plt.gca().xaxis.set_major_locator(plt.NullLocator()) plt.gca().yaxis.set_major_locator(plt.NullLocator()) plt.axis('off') # plot image plt.imshow(label_viz) # plot legend plt_handlers = [] plt_titles = [] for label_value in np.unique(label): if label_value not in label_titles: continue fc = colors[label_value] p = plt.Rectangle((0, 0), 1, 1, fc=fc) plt_handlers.append(p) plt_titles.append(label_titles[label_value]) plt.legend(plt_handlers, plt_titles, loc='lower right', framealpha=0.5) # convert plotted figure to np.ndarray f = StringIO.StringIO() plt.savefig(f, bbox_inches='tight', pad_inches=0) result_img_pil = Image.open(f) result_img = fromimage(result_img_pil, mode='RGB') result_img = resize(result_img, img.shape, preserve_range=True) result_img = result_img.astype(img.dtype) return result_img