Python botocore.exceptions 模块,ParamValidationError() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用botocore.exceptions.ParamValidationError()。
def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
def retry(attempts=3):
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
tries = attempts
while True:
tries -= 1
try:
return func(*args, **kwargs)
except (ClientError, ParamValidationError) as error:
if tries > 0:
print('[ssha] {}'.format(error))
else:
raise
return wrapped
return wrapper
def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
def start_query_execution(self, db, query):
try:
if not db:
raise ValueError('Schema must be specified when session schema is not set')
result_configuration = {
'OutputLocation': self.bucket,
}
if self.encryption:
result_configuration['EncryptionConfiguration'] = {
'EncryptionOption': 'SSE_S3'
}
return self.athena.start_query_execution(
QueryString=query,
ClientRequestToken=str(uuid.uuid4()),
QueryExecutionContext={
'Database': db
},
ResultConfiguration=result_configuration
)['QueryExecutionId']
except (ClientError, ParamValidationError, ValueError) as e:
print(e)
return
def get_lambda_alias(module, aws):
"""
Returns the lambda function alias if it exists.
:param module: Ansible module reference
:param aws: AWS client connection
:return:
"""
client = aws.client('lambda')
# set API parameters
api_params = set_api_params(module, ('function_name', 'name'))
# check if alias exists and get facts
try:
results = client.get_alias(**api_params)
except (ClientError, ParamValidationError, MissingParametersError) as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
results = None
else:
module.fail_json(msg='Error retrieving function alias: {0}'.format(e))
return results
def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
def info(self, path, refresh=False, **kwargs):
""" Detail on the specific file pointed to by path.
Gets details only for a specific key, directories/buckets cannot be
used with info.
"""
parent = path.rsplit('/', 1)[0]
files = self._lsdir(parent, refresh=refresh)
files = [f for f in files if f['Key'] == path and f['StorageClass'] not
in ['DIRECTORY', 'BUCKET']]
if len(files) == 1:
return files[0]
else:
try:
bucket, key = split_path(path)
out = self._call_s3(self.s3.head_object,
kwargs, Bucket=bucket, Key=key, **self.req_kw)
out = {'ETag': out['ETag'], 'Key': '/'.join([bucket, key]),
'LastModified': out['LastModified'],
'Size': out['ContentLength'], 'StorageClass': "STANDARD"}
return out
except (ClientError, ParamValidationError):
raise FileNotFoundError(path)
def touch(self, path, acl="", **kwargs):
"""
Create empty key
If path is a bucket only, attempt to create bucket.
"""
bucket, key = split_path(path)
acl = acl or self.s3_additional_kwargs.get('ACL', '')
if key:
if acl and acl not in key_acls:
raise ValueError('ACL not in %s', key_acls)
self._call_s3(
self.s3.put_object, kwargs,
Bucket=bucket, Key=key, ACL=acl)
self.invalidate_cache(path)
else:
if acl and acl not in buck_acls:
raise ValueError('ACL not in %s', buck_acls)
try:
self.s3.create_bucket(Bucket=bucket, ACL=acl)
self.invalidate_cache('')
self.invalidate_cache(bucket)
except (ClientError, ParamValidationError):
raise IOError('Bucket create failed', path)
def create_rest_api(client, module, title, description):
"""
Creates a new API with 'default' model and root resource node.
:param client:
:param module:
:param title:
:param description:
:return: API Id
"""
rest_api = None
try:
rest_api = client.create_rest_api(
name=title,
description=description
)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg='Error creating REST API: {0}'.format(e))
return rest_api['id']
def delete_rest_api(client, module, rest_api_id):
"""
Deletes the entire API, including associated models.
:param client:
:param module:
:param rest_api_id:
:return:
"""
try:
client.delete_rest_api(restApiId=rest_api_id)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg='Error deleting REST API: {0}'.format(e))
return
def put_integration_response(client, module, rest_api_id, resource_id, http_method, selection_pattern, integration_response):
response = None
api_params = dict(
restApiId=rest_api_id,
resourceId=resource_id,
httpMethod=http_method,
statusCode=integration_response['statusCode'],
selectionPattern=selection_pattern
)
for optional_params in ('responseParameters', 'responseTemplates', ):
if optional_params in integration_response:
api_params[optional_params] = integration_response[optional_params]
try:
response = client.put_integration_response(**api_params)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg="Error creating integration response '{0}' for method '{1}', rid: {2}: {3}".format(selection_pattern, http_method, resource_id, e))
return response
def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final
def validate_ascii_metadata(params, **kwargs):
"""Verify S3 Metadata only contains ascii characters.
From: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
"Amazon S3 stores user-defined metadata in lowercase. Each name, value pair
must conform to US-ASCII when using REST and UTF-8 when using SOAP or
browser-based uploads via POST."
"""
metadata = params.get('Metadata')
if not metadata or not isinstance(metadata, dict):
# We have to at least type check the metadata as a dict type
# because this handler is called before param validation.
# We'll go ahead and return because the param validator will
# give a descriptive error message for us.
# We might need a post-param validation event.
return
for key, value in metadata.items():
try:
key.encode('ascii')
value.encode('ascii')
except UnicodeEncodeError as e:
error_msg = (
'Non ascii characters found in S3 metadata '
'for key "%s", value: "%s". \nS3 metadata can only '
'contain ASCII characters. ' % (key, value)
)
raise ParamValidationError(
report=error_msg)
def serialize_to_request(self, parameters, operation_model):
input_shape = operation_model.input_shape
if input_shape is not None:
report = self._param_validator.validate(parameters,
operation_model.input_shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
return self._serializer.serialize_to_request(parameters,
operation_model)
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final
def validate_ascii_metadata(params, **kwargs):
"""Verify S3 Metadata only contains ascii characters.
From: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
"Amazon S3 stores user-defined metadata in lowercase. Each name, value pair
must conform to US-ASCII when using REST and UTF-8 when using SOAP or
browser-based uploads via POST."
"""
metadata = params.get('Metadata')
if not metadata or not isinstance(metadata, dict):
# We have to at least type check the metadata as a dict type
# because this handler is called before param validation.
# We'll go ahead and return because the param validator will
# give a descriptive error message for us.
# We might need a post-param validation event.
return
for key, value in metadata.items():
try:
key.encode('ascii')
value.encode('ascii')
except UnicodeEncodeError as e:
error_msg = (
'Non ascii characters found in S3 metadata '
'for key "%s", value: "%s". \nS3 metadata can only '
'contain ASCII characters. ' % (key, value)
)
raise ParamValidationError(
report=error_msg)
def serialize_to_request(self, parameters, operation_model):
input_shape = operation_model.input_shape
if input_shape is not None:
report = self._param_validator.validate(parameters,
operation_model.input_shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
return self._serializer.serialize_to_request(parameters,
operation_model)
def can_retry(self, ex):
"""
Tests if a retry can be done based on the exception of an earlier call
:param ex: Execution raise by earlier call of the boto3 method
:return: True if any of the call_retry_strategy returns True, else False
"""
if type(ex) == ParamValidationError:
return False
return AwsApiServiceRetry.can_retry(self, ex)
def __init__(self, ansible_obj, resources, boto3=True):
try:
self.region, self.endpoint, aws_connect_kwargs = get_aws_connection_info(ansible_obj, boto3=boto3)
self.resource_client = dict()
if not resources:
resources = ['lambda']
resources.append('iam')
for resource in resources:
aws_connect_kwargs.update(dict(region=self.region,
endpoint=self.endpoint,
conn_type='client',
resource=resource
))
self.resource_client[resource] = boto3_conn(ansible_obj, **aws_connect_kwargs)
# if region is not provided, then get default profile/session region
if not self.region:
self.region = self.resource_client['lambda'].meta.region_name
except (ClientError, ParamValidationError, MissingParametersError) as e:
ansible_obj.fail_json(msg="Unable to connect, authorize or access resource: {0}".format(e))
try:
self.account_id = self.resource_client['iam'].get_user()['User']['Arn'].split(':')[4]
except (ClientError, ValueError, KeyError, IndexError):
self.account_id = ''
def __init__(self, ansible_obj, resources, use_boto3=True):
try:
self.region, self.endpoint, aws_connect_kwargs = get_aws_connection_info(ansible_obj, boto3=use_boto3)
self.resource_client = dict()
if not resources:
resources = ['lambda']
resources.append('iam')
for resource in resources:
aws_connect_kwargs.update(dict(region=self.region,
endpoint=self.endpoint,
conn_type='client',
resource=resource
))
self.resource_client[resource] = boto3_conn(ansible_obj, **aws_connect_kwargs)
# if region is not provided, then get default profile/session region
if not self.region:
self.region = self.resource_client['lambda'].meta.region_name
except (ClientError, ParamValidationError, MissingParametersError) as e:
ansible_obj.fail_json(msg="Unable to connect, authorize or access resource: {0}".format(e))
# set account ID
try:
self.account_id = self.resource_client['iam'].get_user()['User']['Arn'].split(':')[4]
except (ClientError, ValueError, KeyError, IndexError):
self.account_id = ''
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final
def validate_ascii_metadata(params, **kwargs):
"""Verify S3 Metadata only contains ascii characters.
From: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
"Amazon S3 stores user-defined metadata in lowercase. Each name, value pair
must conform to US-ASCII when using REST and UTF-8 when using SOAP or
browser-based uploads via POST."
"""
metadata = params.get('Metadata')
if not metadata or not isinstance(metadata, dict):
# We have to at least type check the metadata as a dict type
# because this handler is called before param validation.
# We'll go ahead and return because the param validator will
# give a descriptive error message for us.
# We might need a post-param validation event.
return
for key, value in metadata.items():
try:
key.encode('ascii')
value.encode('ascii')
except UnicodeEncodeError as e:
error_msg = (
'Non ascii characters found in S3 metadata '
'for key "%s", value: "%s". \nS3 metadata can only '
'contain ASCII characters. ' % (key, value)
)
raise ParamValidationError(
report=error_msg)
def serialize_to_request(self, parameters, operation_model):
input_shape = operation_model.input_shape
if input_shape is not None:
report = self._param_validator.validate(parameters,
operation_model.input_shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
return self._serializer.serialize_to_request(parameters,
operation_model)
def destroy_role(connection, module):
params = dict()
params['RoleName'] = module.params.get('name')
if get_role(connection, params['RoleName']):
# We need to remove any instance profiles from the role before we delete it
try:
instance_profiles = connection.list_instance_profiles_for_role(RoleName=params['RoleName'])['InstanceProfiles']
except ClientError as e:
module.fail_json(msg=e.message, **camel_dict_to_snake_dict(e.response))
# Now remove the role from the instance profile(s)
for profile in instance_profiles:
try:
connection.remove_role_from_instance_profile(InstanceProfileName=profile['InstanceProfileName'], RoleName=params['RoleName'])
except ClientError as e:
module.fail_json(msg=e.message, **camel_dict_to_snake_dict(e.response))
# Now remove any attached policies otherwise deletion fails
try:
for policy in get_attached_policy_list(connection, params['RoleName']):
connection.detach_role_policy(RoleName=params['RoleName'], PolicyArn=policy['PolicyArn'])
except (ClientError, ParamValidationError) as e:
module.fail_json(msg=e.message, **camel_dict_to_snake_dict(e.response))
try:
connection.delete_role(**params)
except ClientError as e:
module.fail_json(msg=e.message, **camel_dict_to_snake_dict(e.response))
else:
module.exit_json(changed=False)
module.exit_json(changed=True)
def test_scan_invalid(self):
db = await self.client.db
db.scan = asynctest.CoroutineMock(side_effect=ParamValidationError(report="Exception raised"))
res = await self.client.scan()
db.scan.assert_called_once_with(Limit=5, Select="ALL_ATTRIBUTES", TableName="livebridge_test")
assert res == []
def test_insert_post(self):
api_res = {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'd92c4314-439a-4bc7-90f5-125a615dfaa2'}}
db = await self.client.db
db.put_item = asynctest.CoroutineMock(return_value=api_res)
date = datetime.utcnow()
date_str = datetime.strftime(date, self.client.date_fmt)
kwargs = {"target_id": "target-id",
"post_id": "post-id",
"source_id": "source-id",
"text": "Text",
"sticky": True,
"created": date,
"updated": date,
"target_doc": {"foo": "doc"}}
res = await self.client.insert_post(**kwargs)
assert res is True
assert type(res) == bool
db.put_item.assert_called_once_with(
Item={
'source_id': {'S': 'source-id'},
'text': {'S': 'Text'},
'target_id': {'S': 'target-id'},
'post_id': {'S': 'post-id'},
'created': {'S': date_str},
'updated': {'S': date_str},
'target_doc': {'S': '{"foo": "doc"}'},
'sticky': {'N': '1'}},
TableName='livebridge_test')
# insert_post failing(self):
db.put_item = asynctest.CoroutineMock(side_effect=ParamValidationError(report="Exception raised"))
res = await self.client.insert_post(**kwargs)
assert res is False
assert type(res) == bool
def _validate_keys(dynamodb_data):
"""Helper method to check if query key empty or duplicated"""
result = []
if not dynamodb_data:
err_msg = {'Error': {'Code': 403, 'Message': 'Empty query keys'}}
raise ParamValidationError(report=err_msg)
deserializer = TypeDeserializer()
for raw_data in dynamodb_data:
for _, val in raw_data.iteritems():
python_data = deserializer.deserialize(val).lower()
if not python_data or python_data in result:
err_msg = {'Error': {'Code': 403, 'Message': 'Parameter Validation Error'}}
raise ParamValidationError(report=err_msg)
result.append(python_data)
def invoke(cls, **kwargs):
"""Mocked invoke function that returns a reponse mimicking boto3's reponse
Keyword Arguments:
FuncitonName (str): The AWS Lambda function name being invoked
InvocationType (str): Type of invocation (typically 'Event')
Payload (str): Payload in string or file format to send to lambda
Qualifier (str): Alias for fully qualified AWS ARN
Returns:
dict: Response dictionary containing a fake RequestId
"""
if cls._raise_exception:
# Turn of the raise exception boolean so we don't do this next time
cls._raise_exception = not cls._raise_exception
err = {'Error': {'Code': 400, 'Message': 'raising test exception'}}
raise ClientError(err, 'invoke')
req_keywords = {'FunctionName', 'InvocationType', 'Payload'}
key_diff = req_keywords.difference(set(kwargs))
if key_diff:
message = 'required keyword missing: {}'.format(', '.join(key_diff))
err = {'Error': {'Code': 400, 'Message': message}}
raise ClientError(err, 'invoke')
if not isinstance(kwargs['Payload'], (str, bytearray)):
if not hasattr(kwargs['Payload'], 'read'):
err = ('Invalid type for parameter Payload, value: {}, type: {}, '
'valid types: <type \'str\'>, <type \'bytearray\'>, '
'file-like object').format(kwargs['Payload'], type(kwargs['Payload']))
raise ParamValidationError(response=err)
return {'ResponseMetadata': {'RequestId': '9af88643-7b3c-43cd-baae-addb73bb4d27'}}
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final
def validate_ascii_metadata(params, **kwargs):
"""Verify S3 Metadata only contains ascii characters.
From: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
"Amazon S3 stores user-defined metadata in lowercase. Each name, value pair
must conform to US-ASCII when using REST and UTF-8 when using SOAP or
browser-based uploads via POST."
"""
metadata = params.get('Metadata')
if not metadata or not isinstance(metadata, dict):
# We have to at least type check the metadata as a dict type
# because this handler is called before param validation.
# We'll go ahead and return because the param validator will
# give a descriptive error message for us.
# We might need a post-param validation event.
return
for key, value in metadata.items():
try:
key.encode('ascii')
value.encode('ascii')
except UnicodeEncodeError as e:
error_msg = (
'Non ascii characters found in S3 metadata '
'for key "%s", value: "%s". \nS3 metadata can only '
'contain ASCII characters. ' % (key, value)
)
raise ParamValidationError(
report=error_msg)
def serialize_to_request(self, parameters, operation_model):
input_shape = operation_model.input_shape
if input_shape is not None:
report = self._param_validator.validate(parameters,
operation_model.input_shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
return self._serializer.serialize_to_request(parameters,
operation_model)
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final
def validate_ascii_metadata(params, **kwargs):
"""Verify S3 Metadata only contains ascii characters.
From: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
"Amazon S3 stores user-defined metadata in lowercase. Each name, value pair
must conform to US-ASCII when using REST and UTF-8 when using SOAP or
browser-based uploads via POST."
"""
metadata = params.get('Metadata')
if not metadata or not isinstance(metadata, dict):
# We have to at least type check the metadata as a dict type
# because this handler is called before param validation.
# We'll go ahead and return because the param validator will
# give a descriptive error message for us.
# We might need a post-param validation event.
return
for key, value in metadata.items():
try:
key.encode('ascii')
value.encode('ascii')
except UnicodeEncodeError as e:
error_msg = (
'Non ascii characters found in S3 metadata '
'for key "%s", value: "%s". \nS3 metadata can only '
'contain ASCII characters. ' % (key, value)
)
raise ParamValidationError(
report=error_msg)
def serialize_to_request(self, parameters, operation_model):
input_shape = operation_model.input_shape
if input_shape is not None:
report = self._param_validator.validate(parameters,
operation_model.input_shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
return self._serializer.serialize_to_request(parameters,
operation_model)
def copy(self, path1, path2, **kwargs):
""" Copy file between locations on S3 """
buc1, key1 = split_path(path1)
buc2, key2 = split_path(path2)
try:
self._call_s3(
self.s3.copy_object,
kwargs,
Bucket=buc2, Key=key2, CopySource='/'.join([buc1, key1])
)
except (ClientError, ParamValidationError):
raise IOError('Copy failed', (path1, path2))
self.invalidate_cache(path2)
def get_rest_api(client, module, swagger_spec):
rest_api = None
info_title = None
rest_api_id = module.params['rest_api_id']
try:
info_title = swagger_spec['info']['title']
except KeyError:
module.fail_json(msg="Missing required value in swagger spec: info.title")
if rest_api_id == '*':
try:
rest_apis = client.get_rest_apis(limit=500)['items']
choices = [api for api in rest_apis if api['name'] == info_title]
except ClientError as e:
choices = None
module.fail_json(msg="Error retrieving REST APIs: {0}".format(e))
if len(choices) > 1:
module.fail_json(msg="More than one API found: {0}".format(choices))
elif len(choices) > 0:
try:
rest_api_id = choices[0]['id']
rest_api = client.get_rest_api(restApiId=rest_api_id)
except (ClientError, ParamValidationError, MissingParametersError) as e:
if not e.response['Error']['Code'] == 'NotFoundException':
module.fail_json(msg='Error retrieving REST API: {0}'.format(e))
return rest_api
def create_models(client, module, rest_api_id, schemas):
"""
Creates models based on schemas.
"""
models = None
try:
for model in schemas.keys():
schema = schemas[model]
schema.update({
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"title": "{0} schema".format(model)
})
models = client.create_model(
restApiId=rest_api_id,
name=model,
description='added by Ansible module',
contentType='application/json',
schema=json.dumps(schema)
)
except (ClientError, ParamValidationError, MissingParametersError) as e:
#TODO: should report warning or update existing model
if not e.response['Error']['Code'] == 'ConflictException':
module.fail_json(msg='Error creating API model {0}: {1}'.format(model, e))
return models
def create_resource(client, module, rest_api_id, parent_id, path_part):
resource = None
try:
resource = client.create_resource(
restApiId=rest_api_id,
pathPart=path_part,
parentId=parent_id
)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg="Error creating API resource {0} pid: {1}: {2}".format(path_part, parent_id, e))
return resource
def put_method_response(client, module, rest_api_id, resource_id, http_method, status_code, response):
method_response = None
if not re.match(r'^[2-6]\d\d$', str(status_code)):
module.fail_json(msg="Error creating response {0} for method {1} rid: {2}: invalid response code.".format(status_code, http_method, resource_id))
api_params = dict(
restApiId=rest_api_id,
resourceId=resource_id,
httpMethod=http_method,
statusCode=str(status_code)
)
if 'headers' in response:
response_parameters = dict()
for header in response['headers'].keys():
destination = 'method.response.header.{0}'.format(header)
response_parameters[destination] = True
if response_parameters:
api_params['responseParameters'] = response_parameters
try:
method_response = client.put_method_response(**api_params)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg="Error creating response {0} for method {1} rid: {2}: {3}".format(status_code, http_method, resource_id, e))
return method_response
def put_integration(client, module, rest_api_id, resource_id, http_method, integration):
method_integration = None
api_params = dict(
restApiId=rest_api_id,
resourceId=resource_id,
httpMethod=http_method,
type=integration['type'].upper(),
)
if 'httpMethod' in integration:
api_params['integrationHttpMethod'] = integration['httpMethod']
for optional_params in ('uri', 'credentials', 'requestParameters', 'requestTemplates', 'cacheNameSpace'):
if optional_params in integration:
api_params[optional_params] = integration[optional_params]
if 'cacheKeyParameters' in integration:
cache_key_parameters = []
for parameter in integration['cacheKeyParameters']:
cache_key_parameters.append('method.request.querystring.{0}'.format(parameter.split('.')[-1]))
if cache_key_parameters:
api_params['cacheKeyParameters'] = cache_key_parameters
try:
method_integration = client.put_integration(**api_params)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg="Error creating integration for method {0} rid: {1}: {2}".format(http_method, resource_id, e))
return method_integration
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final