Python io.StringIO 模块,StringIO() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用io.StringIO.StringIO()

项目:tf-tutorial    作者:zchen0211    | 项目源码 | 文件源码
def _render(self, mode='human', close=False):
    if close:
      return

    outfile = StringIO.StringIO() if mode == 'ansi' else sys.stdout

    row, col = self.s // self.ncol, self.s % self.ncol
    desc = self.desc.tolist()
    desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)

    outfile.write("\n".join("".join(row) for row in desc)+"\n")
    if self.lastaction is not None:
      outfile.write("  ({})\n".format(self.get_action_meanings()[self.lastaction]))
    else:
      outfile.write("\n")

    return outfile
项目:deep-rl-tensorflow    作者:carpedm20    | 项目源码 | 文件源码
def _render(self, mode='human', close=False):
    if close:
      return

    outfile = StringIO.StringIO() if mode == 'ansi' else sys.stdout

    row, col = self.s // self.ncol, self.s % self.ncol
    desc = self.desc.tolist()
    desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)

    outfile.write("\n".join("".join(row) for row in desc)+"\n")
    if self.lastaction is not None:
      outfile.write("  ({})\n".format(self.get_action_meanings()[self.lastaction]))
    else:
      outfile.write("\n")

    return outfile
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def testRun(self):
        db = DAL(DEFAULT_URI, check_reserved=['all'])
        db.define_table('person', Field('name'))
        db.define_table('pet',Field('friend',db.person),Field('name'))
        for n in range(2):
            db(db.pet).delete()
            db(db.person).delete()
            for k in range(10):
                id = db.person.insert(name=str(k))
                db.pet.insert(friend=id,name=str(k))
        db.commit()
        stream = StringIO.StringIO()
        db.export_to_csv_file(stream)
        db(db.pet).delete()
        db(db.person).delete()
        stream = StringIO.StringIO(stream.getvalue())
        db.import_from_csv_file(stream)
        assert db(db.person).count()==10
        assert db(db.pet.name).count()==10
        drop(db.pet)
        drop(db.person)
        db.commit()
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def testRun(self):
        db = DAL(DEFAULT_URI, check_reserved=['all'])
        db.define_table('person', Field('name'),Field('uuid'))
        db.define_table('pet',Field('friend',db.person),Field('name'))
        for n in range(2):
            db(db.pet).delete()
            db(db.person).delete()
            for k in range(10):
                id = db.person.insert(name=str(k),uuid=str(k))
                db.pet.insert(friend=id,name=str(k))
        db.commit()
        stream = StringIO.StringIO()
        db.export_to_csv_file(stream)
        db(db.person).delete()
        db(db.pet).delete()
        stream = StringIO.StringIO(stream.getvalue())
        db.import_from_csv_file(stream)
        assert db(db.person).count()==10
        assert db(db.pet).count()==10
        drop(db.pet)
        drop(db.person)
        db.commit()
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def testRun(self):
        db = DAL(DEFAULT_URI, check_reserved=['all'])
        db.define_table('person', Field('name'))
        db.define_table('pet',Field('friend',db.person),Field('name'))
        for n in range(2):
            db(db.pet).delete()
            db(db.person).delete()
            for k in range(10):
                id = db.person.insert(name=str(k))
                db.pet.insert(friend=id,name=str(k))
        db.commit()
        stream = StringIO.StringIO()
        db.export_to_csv_file(stream)
        db(db.pet).delete()
        db(db.person).delete()
        stream = StringIO.StringIO(stream.getvalue())
        db.import_from_csv_file(stream)
        assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==10
        db.pet.drop()
        db.person.drop()
        db.commit()
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def testRun(self):
        db = DAL(DEFAULT_URI, check_reserved=['all'])
        db.define_table('person', Field('name'),Field('uuid'))
        db.define_table('pet',Field('friend',db.person),Field('name'))
        for n in range(2):
            db(db.pet).delete()
            db(db.person).delete()
            for k in range(10):
                id = db.person.insert(name=str(k),uuid=str(k))
                db.pet.insert(friend=id,name=str(k))
        db.commit()
        stream = StringIO.StringIO()
        db.export_to_csv_file(stream)
        stream = StringIO.StringIO(stream.getvalue())
        db.import_from_csv_file(stream)
        assert db(db.person).count()==10
        assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==20
        db.pet.drop()
        db.person.drop()
        db.commit()
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def __init__(self):
        self.body = StringIO.StringIO()
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def __init__(self):
        self.body = StringIO.StringIO()
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def retrieve(self, name, path=None, nameonly=False):
        """
        if nameonly==True return (filename, fullfilename) instead of
        (filename, stream)
        """
        self_uploadfield = self.uploadfield
        if self.custom_retrieve:
            return self.custom_retrieve(name, path)
        import http
        if self.authorize or isinstance(self_uploadfield, str):
            row = self.db(self == name).select().first()
            if not row:
                raise http.HTTP(404)
        if self.authorize and not self.authorize(row):
            raise http.HTTP(403)
        m = REGEX_UPLOAD_PATTERN.match(name)
        if not m or not self.isattachment:
            raise TypeError('Can\'t retrieve %s' % name)
        file_properties = self.retrieve_file_properties(name,path)
        filename = file_properties['filename']
        if isinstance(self_uploadfield, str):  # ## if file is in DB
            stream = StringIO.StringIO(row[self_uploadfield] or '')
        elif isinstance(self_uploadfield,Field):
            blob_uploadfield_name = self_uploadfield.uploadfield
            query = self_uploadfield == name
            data = self_uploadfield.table(query)[blob_uploadfield_name]
            stream = StringIO.StringIO(data)
        elif self.uploadfs:
            # ## if file is on pyfilesystem
            stream = self.uploadfs.open(name, 'rb')
        else:
            # ## if file is on regular filesystem
            # this is intentially a sting with filename and not a stream
            # this propagates and allows stream_file_or_304_or_206 to be called
            fullname = pjoin(file_properties['path'],name)
            if nameonly:
                return (filename, fullname)
            stream = open(fullname,'rb')
        return (filename, stream)
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def __str__(self):
        """
        serializes the table into a csv file
        """

        s = StringIO.StringIO()
        self.export_to_csv_file(s)
        return s.getvalue()
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def __init__(self):
        self.body = StringIO.StringIO()
项目:blender-fractals    作者:corrodedHash    | 项目源码 | 文件源码
def getText(self, program_name, interval):
        """
        :type interval: Interval.Interval
        :param program_name:
        :param interval:
        :return:
        """
        rewrites = self.programs.get(program_name)
        start = interval.start
        stop = interval.stop

        # ensure start/end are in range
        if stop > len(self.tokens.tokens) - 1: stop = len(self.tokens.tokens) - 1
        if start < 0: start = 0

        # if no instructions to execute
        if not rewrites: return self.tokens.getText(interval)
        buf = StringIO()
        indexToOp = self._reduceToSingleOperationPerIndex(rewrites)
        i = start
        while all((i <= stop, i < len(self.tokens.tokens))):
            op = indexToOp.get(i)
            token = self.tokens.get(i)
            if op is None:
                if token.type != Token.EOF: buf.write(token.text)
                i += 1
            else:
                i = op.execute(buf)

        if stop == len(self.tokens.tokens)-1:
            for op in indexToOp.values():
                if op.index >= len(self.tokens.tokens)-1: buf.write(op.text)

        return buf.getvalue()
项目:blender-fractals    作者:corrodedHash    | 项目源码 | 文件源码
def execute(self, buf):
            """
            :type buf: StringIO.StringIO
            :param buf:
            :return:
            """
            return self.index
项目:web3py    作者:web2py    | 项目源码 | 文件源码
def __init__(self):
        self.body = StringIO.StringIO()
项目:python-lambda-inspector    作者:ThreatResponse    | 项目源码 | 文件源码
def compress_results(res):
    out = StringIO.StringIO()
    file_content = json.dumps(res)
    with gzip.GzipFile(fileobj=out, mode="w") as f:
        f.write(file_content)
    return out.getvalue()
项目:slugiot-client    作者:slugiot    | 项目源码 | 文件源码
def __init__(self):
        self.body = StringIO.StringIO()
项目:inge    作者:cwolfram    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
        self.queue = StringIO.StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def __init__(self):
        self.body = StringIO.StringIO()
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def retrieve(self, name, path=None, nameonly=False):
        """
        If `nameonly==True` return (filename, fullfilename) instead of
        (filename, stream)
        """
        self_uploadfield = self.uploadfield
        if self.custom_retrieve:
            return self.custom_retrieve(name, path)
        import http
        if self.authorize or isinstance(self_uploadfield, str):
            row = self.db(self == name).select().first()
            if not row:
                raise http.HTTP(404)
        if self.authorize and not self.authorize(row):
            raise http.HTTP(403)
        file_properties = self.retrieve_file_properties(name, path)
        filename = file_properties['filename']
        if isinstance(self_uploadfield, str):  # ## if file is in DB
            stream = StringIO.StringIO(row[self_uploadfield] or '')
        elif isinstance(self_uploadfield, Field):
            blob_uploadfield_name = self_uploadfield.uploadfield
            query = self_uploadfield == name
            data = self_uploadfield.table(query)[blob_uploadfield_name]
            stream = StringIO.StringIO(data)
        elif self.uploadfs:
            # ## if file is on pyfilesystem
            stream = self.uploadfs.open(name, 'rb')
        else:
            # ## if file is on regular filesystem
            # this is intentially a sting with filename and not a stream
            # this propagates and allows stream_file_or_304_or_206 to be called
            fullname = pjoin(file_properties['path'], name)
            if nameonly:
                return (filename, fullname)
            stream = open(fullname, 'rb')
        return (filename, stream)
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def __str__(self):
        """
        Serializes the table into a csv file
        """

        s = StringIO.StringIO()
        self.export_to_csv_file(s)
        return s.getvalue()
项目:face    作者:xpzouying    | 项目源码 | 文件源码
def detect_face_task(img):
    """Detect faces from image

    @input: image
    @output:
        - all faces information
    """

    # paramter for detect
    # image_size = 160
    # margin = 44
    minsize = 20 # minimum size of face
    threshold = [0.6, 0.7, 0.7]  # three steps's threshold
    factor = 0.709 # scale factor

    # caffe model
    pnet = caffe_model.get_pnet()
    rnet = caffe_model.get_rnet()
    onet = caffe_model.get_onet()

    bounding_boxes, _ = detect_face.detect_face(img, minsize, pnet, rnet, onet, threshold, factor)
    print('detect bounding: ', bounding_boxes)
    print('Find faces: ', bounding_boxes.shape[0])

    # all_faces is faces information list, include face bytes, face position
    all_faces = []
    for face_position in bounding_boxes:
        face_position = face_position.astype(int)
        print('face position: ', face_position)

        # each face information, include position, face image
        head_rect = face_position[:4].tolist()  # numpy array to python list
        head_img = misc.toimage(img).crop(head_rect)
        head_img_io = StringIO.StringIO()
        head_img.save(head_img_io, format='JPEG')
        head_img_b64 = base64.b64encode(head_img_io.getvalue())

        # construct response
        face_info = {}
        face_info['rect'] = head_rect
        face_info['image'] = head_img_b64

        all_faces.append(face_info)

    return all_faces
项目:Semantic-Segmentation-using-Adversarial-Networks    作者:oyam    | 项目源码 | 文件源码
def draw_label(label, img, n_class, label_titles, bg_label=0):
    """Convert label to rgb with label titles.

    @param label_title: label title for each labels.
    @type label_title: dict
    """
    from PIL import Image
    from scipy.misc import fromimage
    from skimage.color import label2rgb
    from skimage.transform import resize
    colors = labelcolormap(n_class)
    label_viz = label2rgb(label, img, colors=colors[1:], bg_label=bg_label)
    # label 0 color: (0, 0, 0, 0) -> (0, 0, 0, 255)
    label_viz[label == 0] = 0

    # plot label titles on image using matplotlib
    plt.subplots_adjust(left=0, right=1, top=1, bottom=0,
                        wspace=0, hspace=0)
    plt.margins(0, 0)
    plt.gca().xaxis.set_major_locator(plt.NullLocator())
    plt.gca().yaxis.set_major_locator(plt.NullLocator())
    plt.axis('off')
    # plot image
    plt.imshow(label_viz)
    # plot legend
    plt_handlers = []
    plt_titles = []
    for label_value in np.unique(label):
        if label_value not in label_titles:
            continue
        fc = colors[label_value]
        p = plt.Rectangle((0, 0), 1, 1, fc=fc)
        plt_handlers.append(p)
        plt_titles.append(label_titles[label_value])
    plt.legend(plt_handlers, plt_titles, loc='lower right', framealpha=0.5)
    # convert plotted figure to np.ndarray
    f = StringIO.StringIO()
    plt.savefig(f, bbox_inches='tight', pad_inches=0)
    result_img_pil = Image.open(f)
    result_img = fromimage(result_img_pil, mode='RGB')
    result_img = resize(result_img, img.shape, preserve_range=True)
    result_img = result_img.astype(img.dtype)
    return result_img