Python flask.request 模块,content_length() 实例源码
我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用flask.request.content_length()。
def capture_request(self):
if not current_app.debug:
# only capture request body on debug
return
if not self.options.include_request_body:
# only capture request body if requested
return
if (
request.content_length and
self.options.include_request_body is not True and
request.content_length >= self.options.include_request_body
):
# don't capture request body if it's too large
return
if not request.get_json(force=True, silent=True):
# only capture request body if json
return
self.request_body = request.get_json(force=True)
def upload_zip():
# param_dict???????
param_dict = dict.fromkeys(upload_request_param_list, None)
start_time = time.time()
file_size = request.content_length
try:
# ?????????
parameter = request.form
for param in upload_request_param_list:
param_dict[param] = parameter.get(param)
except:
raise Exception
def post_file() -> JSONResponse[str]:
"""Temporarily store some data on the server.
.. :quickref: File; Safe a file temporarily on the server.
.. note::
The posted data will be removed after 60 seconds.
:returns: A response with the JSON serialized name of the file as content
and return code 201.
:raises APIException: If the request is bigger than the maximum upload
size. (REQUEST_TOO_LARGE)
:raises PermissionException: If there is no logged in user. (NOT_LOGGED_IN)
"""
if (
request.content_length and
request.content_length > app.config['MAX_UPLOAD_SIZE']):
raise APIException(
'Uploaded file is too big.',
'Request is bigger than maximum upload size of {}.'.format(
app.config['MAX_UPLOAD_SIZE']
), APICodes.REQUEST_TOO_LARGE, 400
)
path, name = psef.files.random_file_path('MIRROR_UPLOAD_DIR')
FileStorage(request.stream).save(path)
return jsonify(name, status_code=201)
def before_request():
content_length = request.content_length
if content_length is not None and content_length > 1024:
raise ApiError(
message='FROG CANNOT EXCEED MAXIMUM SIZE IN GIRTH, WIDTH OR LENGTH.',
status_code=413)
def validate_filesize(size):
'''
Validate if file size is too large or empty
size: size of file
'''
if size > config.max_file_size * 1024 * 1024:
abort(413)
if not request.content_length or not size:
logger.error('Request {} {} with empty file.'.format(request.method,
request.path))
abort(411)
def webpage_upload():
try:
assert int(request.content_length) < 20000, 'Too big'
file = request.files['file[]']
# fname = secure_filename(file.filename)
# extension = os.path.splitext(file.filename)[1]
# fname = str(uuid.uuid4()) + extension
# file is a mixin, save() is a werkzeug method which calls
# generic builtin open() and copies file.stream()
# file.save(os.path.join(BP.UPLOADS, fname))
contentstr = file.read().decode()
m = ManifestDestiny('', '', contentstr)
msg = m.response.data.decode()
_load_data()
return render_all(okmsg=msg + ': ' + file.filename)
except Exception as e:
return render_all(errmsg='Upload("%s") failed: %s' % (
file.filename, str(e)))
_load_data()
return render_all(okmsg='Upload %s complete' % file.filename)
###########################################################################
# API
# See blueprint registration in manifest_api.py, these are relative paths
def stream_to_storage(project_id: str):
project_oid = utils.str2id(project_id)
projects = current_app.data.driver.db['projects']
project = projects.find_one(project_oid, projection={'_id': 1})
if not project:
raise wz_exceptions.NotFound('Project %s does not exist' % project_id)
log.info('Streaming file to bucket for project=%s user_id=%s', project_id,
current_user.user_id)
log.info('request.headers[Origin] = %r', request.headers.get('Origin'))
log.info('request.content_length = %r', request.content_length)
# Try a check for the content length before we access request.files[].
# This allows us to abort the upload early. The entire body content length
# is always a bit larger than the actual file size, so if we accept here,
# we're sure it'll be accepted in subsequent checks as well.
if request.content_length:
assert_file_size_allowed(request.content_length)
uploaded_file = request.files['file']
# Not every upload has a Content-Length header. If it was passed, we might
# as well check for its value before we require the user to upload the
# entire file. (At least I hope that this part of the code is processed
# before the body is read in its entirety)
if uploaded_file.content_length:
assert_file_size_allowed(uploaded_file.content_length)
override_content_type(uploaded_file)
if not uploaded_file.content_type:
log.warning('File uploaded to project %s without content type.',
project_oid)
raise wz_exceptions.BadRequest('Missing content type.')
if uploaded_file.content_type.startswith('image/') or uploaded_file.content_type.startswith(
'video/'):
# We need to do local thumbnailing and ffprobe, so we have to write the stream
# both to Google Cloud Storage and to local storage.
local_file = tempfile.NamedTemporaryFile(
dir=current_app.config['STORAGE_DIR'])
uploaded_file.save(local_file)
local_file.seek(0) # Make sure that re-read starts from the beginning.
else:
local_file = uploaded_file.stream
result = upload_and_process(local_file, uploaded_file, project_id)
resp = jsonify(result)
resp.status_code = result['status_code']
add_access_control_headers(resp)
return resp
def create_file_doc_for_upload(project_id, uploaded_file):
"""Creates a secure filename and a document in MongoDB for the file.
The (project_id, filename) tuple should be unique. If such a document already
exists, it is updated with the new file.
:param uploaded_file: file from request.files['form-key']
:type uploaded_file: werkzeug.datastructures.FileStorage
:returns: a tuple (file_id, filename, status), where 'filename' is the internal
filename used on GCS.
"""
project_id = ObjectId(project_id)
# Hash the filename with path info to get the internal name. This should
# be unique for the project.
# internal_filename = uploaded_file.filename
_, ext = os.path.splitext(uploaded_file.filename)
internal_filename = uuid.uuid4().hex + ext
# For now, we don't support overwriting files, and create a new one every time.
# # See if we can find a pre-existing file doc.
# files = current_app.data.driver.db['files']
# file_doc = files.find_one({'project': project_id,
# 'name': internal_filename})
file_doc = None
# TODO: at some point do name-based and content-based content-type sniffing.
new_props = {'filename': uploaded_file.filename,
'content_type': uploaded_file.mimetype,
'length': uploaded_file.content_length,
'project': project_id,
'status': 'uploading'}
if file_doc is None:
# Create a file document on MongoDB for this file.
file_doc = create_file_doc(name=internal_filename, **new_props)
file_fields, _, _, status = current_app.post_internal('files', file_doc)
else:
file_doc.update(new_props)
file_fields, _, _, status = current_app.put_internal('files', remove_private_keys(file_doc))
if status not in (200, 201):
log.error('Unable to create new file document in MongoDB, status=%i: %s',
status, file_fields)
raise wz_exceptions.InternalServerError()
log.debug('Created file document %s for uploaded file %s; internal name %s',
file_fields['_id'], uploaded_file.filename, internal_filename)
return file_fields['_id'], internal_filename, status