Python flask.request 模块,stream() 实例源码
我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用flask.request.stream()。
def changenames():
result = BytesIO()
zipf = zipfile.ZipFile(result, "w")
i=0
for desc, font in _unpack(request.stream):
i += 1
print('#',i,'got oldname', font['name'].getDebugName(6))
changefont(desc, font)
filename = desc['filename']
print('changed', filename)
# write the font file to the zip
fontIO = BytesIO()
font.save(fontIO)
fontData = fontIO.getvalue()
zipf.writestr(filename, fontData)
zipf.close()
data = result.getvalue()
response = make_response(data)
response.headers['Content-Type'] = 'application/octet-stream'
response.headers['Content-Disposition'] = 'attachment; filename=fonts-with-changed-names.zip'
return response
def post_file() -> JSONResponse[str]:
"""Temporarily store some data on the server.
.. :quickref: File; Safe a file temporarily on the server.
.. note::
The posted data will be removed after 60 seconds.
:returns: A response with the JSON serialized name of the file as content
and return code 201.
:raises APIException: If the request is bigger than the maximum upload
size. (REQUEST_TOO_LARGE)
:raises PermissionException: If there is no logged in user. (NOT_LOGGED_IN)
"""
if (
request.content_length and
request.content_length > app.config['MAX_UPLOAD_SIZE']):
raise APIException(
'Uploaded file is too big.',
'Request is bigger than maximum upload size of {}.'.format(
app.config['MAX_UPLOAD_SIZE']
), APICodes.REQUEST_TOO_LARGE, 400
)
path, name = psef.files.random_file_path('MIRROR_UPLOAD_DIR')
FileStorage(request.stream).save(path)
return jsonify(name, status_code=201)
def _unpack(stream):
# L = unsignedlong 4 bytes
while True:
head = stream.read(8)
if not head:
break
jsonlen, fontlen = struct.unpack('II', head)
desc = json.loads(stream.read(jsonlen).decode('utf-8'))
font = TTFont(BytesIO(stream.read(fontlen)))
yield (desc, font)
def content_item_resource(id,resource):
if request.method == 'GET':
wrap = request.args.get('wrap')
status_code,data,contentType = model.getContentResource(id,resource);
if status_code==200:
if contentType.startswith("text/html") and wrap is not None:
blob = io.BytesIO()
for chunk in data:
blob.write(chunk)
content = blob.getvalue().decode("utf-8").strip()
if not content.startswith('<!DOCTYPE'):
editorConfig = app.config.get('EDITOR_CONFIG')
header = ''
bodyStart = ''
bodyEnd = ''
if editorConfig is not None and wrap=='preview':
wheader = editorConfig.get('wrap-header')
pheader = editorConfig.get('preview-wrap-header')
if pheader is not None:
header = pheader
elif wheader is not None:
header = wheader
wbody = editorConfig.get('wrap-body')
pbody = editorConfig.get('preview-body-main')
if pbody is not None:
bodyStart = pbody[0]
bodyEnd = pbody[1]
elif wbody is not None:
bodyStart = wbody[0]
bodyEnd = wbody[1]
elif editorConfig is not None and wrap=='formatted':
wheader = editorConfig.get('wrap-header')
if wheader is not None:
header = wheader
wbody = editorConfig.get('wrap-body')
if wbody is not None:
bodyStart = wbody[0]
bodyEnd = wbody[1]
content = """
<!DOCTYPE html>
<html>
<head><title>""" + resource + '</title>' + header + """
</head>
<body>
""" + bodyStart + content + bodyEnd + '</body></html>'
return Response(stream_with_context(content),content_type = contentType)
else:
return Response(stream_with_context(data),content_type = contentType)
else:
abort(status_code)
if request.method == 'PUT':
status_code,data,contentType = model.updateContentResource(id,resource,request.headers['Content-Type'],request.stream);
if status_code==200 or status_code==201:
return Response(stream_with_context(data),status=status_code,content_type = contentType)
else:
return Response(status=status)
if request.method == 'DELETE':
status = model.deleteContentResource(id,resource)
return Response(status=status)