我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ansible.module_utils.basic.AnsibleModule()。
def main (): spec = dict (provider = dict (required = True)) spec.update (iosxr_argument_spec) module = AnsibleModule (argument_spec = spec) # make sure "terminal length 0" is set on XR console cmds = [ "show platform", "show version", "show inventory all", "show memory summary", "show install active", "show filesystem", "show media", "show route", "show running-config", "show arp", "show ipv4 int brief", "show ipv6 int brief" ] result = dict (changed = False) for cmd in cmds: result[cmd] = str (run_commands (module, cmd)).split (r"\n") return module.exit_json (**result)
def main (): spec = dict (provider = dict (required = True), cfgname = dict (required = True)) spec.update (iosxr_argument_spec) module = AnsibleModule (argument_spec = spec) args = module.params cfg_name = args["cfgname"] command = "configure terminal" rc, out, err = exec_command (module, command) if rc != 0: module.fail_json (msg="unable to enter configuration mode", err = to_text (err, errors="surrogate_or_strict")) commands = ["load " + cfg_name] commands.append ("show commit changes diff") response = run_commands (module, commands) result = dict (changed = False) result["stdout"] = response result["stdout_lines"] = str (result["stdout"]).split (r"\n") module.exit_json (**result)
def main(): global module module = AnsibleModule( argument_spec={ 'filename': {'required': True, 'aliases': ['name']}, 'cred_type': {'required': False, 'aliases': ['credential_type']}, 'cred_path': {'required': True, 'aliases': ['credential_store']}, 'driver': {'required': True, 'aliases': ['driver_type']}, }, required_one_of=[], supports_check_mode=True ) filename = module.params["filename"] cred_type = module.params["cred_type"] # noqa F841 cred_path = module.params["cred_path"] driver_type = module.params["driver"] # noqa F841 output, path = get_cred(filename, cred_path) changed = True module.exit_json(changed=changed, output=output, params=module.params, path=path)
def main(): module = AnsibleModule( argument_spec=dict( sessions=dict(type='list', required=True), asn=dict(type='int', required=True), ), supports_check_mode=True ) sessions = module.params['sessions'] asn = module.params['asn'] pinder = hammock.Hammock('http://127.0.0.1:8000', append_slash=True) result = [] for session in sessions: if session['sender']['isp']['asn'] == asn: session['sender_is_ready'] = True elif session['receiver']['asn'] == asn: session['receiver_is_ready'] = True else: raise Exception("Couldn't figure out if I was the sender or the receiver") r = pinder.api.requests(session['id']).PUT(data=session) result.append(r.json()) module.exit_json(sessions=result, msg=str(result))
def __init__(self): self.module = AnsibleModule( argument_spec = dict( config = dict(required = True, type = 'path'), state = dict(type = 'str', default = 'started'), name = dict(required = True, type = 'str'), template = dict(type = 'str', default = 'gentoo-overlay'), template_args = dict(type = 'str') ), supports_check_mode = False ) self.args = { 'name': self.module.params.get('name'), 'config': self.module.params.get('config'), 'state': self.module.params.get('state'), 'template': self.module.params.get('template'), 'templateArgs': self.module.params.get('template_args') } self.xl = XenXlClient(self.args['name'], self.args['config'], self.args['template'])
def __init__(self): self.module = AnsibleModule( argument_spec = dict( user = dict(required = True, type = 'str') ), supports_check_mode = False ) self.args = { 'user': self.module.params.get('user') } try: self.userInfo = { 'home': pwd.getpwnam(self.args['user']).pw_dir } except KeyError: self.module.fail_json( msg = 'user (%s) not found' % (self.args['user']) )
def __init__(self): self.module = AnsibleModule( argument_spec = dict( crt = dict(required = True, type = 'path'), ca = dict(required = True, type = 'path'), dest = dict(required = True, type = 'path'), ), add_file_common_args = True, supports_check_mode = False ) self.args = { 'crt': self.module.params.get('crt'), 'ca': self.module.params.get('ca'), 'dest': self.module.params.get('dest') } self.changed = False
def test__clone_resource__happy(self, has_type, clone_resource_expected_count, clone_resource_current_count): mod_cls = create_autospec(AnsibleModule) mod = mod_cls.return_value mod.params = dict( resource="haproxy", max_wait="5" ) has_type.return_value = pacemaker_is_active.Clone(mod, 'haproxy') clone_resource_expected_count.return_value = 3 clone_resource_current_count.return_value = 3 pacemaker_is_active.is_resource_active(mod) self.assertEqual(0, mod.fail_json.call_count) self.assertEqual(1, mod.exit_json.call_count)
def test__master_resource__happy(self, has_type, master_resource_expected_count, master_resource_current_count): mod_cls = create_autospec(AnsibleModule) mod = mod_cls.return_value mod.params = dict( resource="galera", max_wait="5" ) has_type.return_value = pacemaker_is_active.Master(mod, 'galera') master_resource_expected_count.return_value = 3 master_resource_current_count.return_value = 3 pacemaker_is_active.is_resource_active(mod) self.assertEqual(0, mod.fail_json.call_count) self.assertEqual(1, mod.exit_json.call_count)
def test__primitive_resource__happy(self, has_type, primitive_resource_current_count): mod_cls = create_autospec(AnsibleModule) mod = mod_cls.return_value mod.params = dict( resource="openstack-cinder-volume", max_wait="5" ) has_type.return_value = pacemaker_is_active.Primitive( mod, "openstack-cinder-volume") primitive_resource_current_count.return_value = 1 pacemaker_is_active.is_resource_active(mod) self.assertEqual(0, mod.fail_json.call_count) self.assertEqual(1, mod.exit_json.call_count)
def test__primitive_resource__happy(self, has_type, primitive_resource_expected_count, primitive_resource_current_count): mod_cls = create_autospec(AnsibleModule) mod = mod_cls.return_value mod.params = dict( resource="openstack-cinder-volume", max_wait="3" ) has_type.return_value = pacemaker_is_active.Primitive( mod, 'openstack-cinder-volume') primitive_resource_expected_count.side_effect = [1, 1, 1] primitive_resource_current_count.side_effect = [0, 0, 0] pacemaker_is_active.is_resource_active(mod) self.assertEqual(1, mod.fail_json.call_count) self.assertEqual(0, mod.exit_json.call_count)
def main(): # Parsing argument file module = AnsibleModule( argument_spec = dict( user = dict(required=True) ) ) user = module.params.get('user') chkusr = User(user) success, ret_msg = chkusr.check_if_user_exists() # Error handling and JSON return if success: module.exit_json(msg=ret_msg, uid=uid, gid=gid) else: module.fail_json(msg=ret_msg)
def main(): # Parsing argument file module = AnsibleModule( argument_spec = dict( user = dict(required=True) ) ) user = module.params.get('user') chkusr = CheckUser(user) success, ret_msg, uid, gid = chkusr.check_user() # Error handling and JSON return if success: module.exit_json(msg=ret_msg, uid=uid, gid=gid) else: module.fail_json(msg=ret_msg)
def main(): # Parsing argument file module = AnsibleModule( argument_spec = dict( user = dict(required=True) ) ) user = module.params.get('user') # Check if user exists try: pwd.getpwnam(user) success = True ret_msg = 'User %s exists' % user except KeyError: success = False ret_msg = 'User %s does not exists' % user # Error handling and JSON return if success: module.exit_json(msg=ret_msg) else: module.fail_json(msg=ret_msg)
def main(): module = AnsibleModule( argument_spec = dict( state = dict(choices=['present', 'absent'], default='present'), api_token = dict(aliases=['API_TOKEN'], no_log=True), name = dict(type='str'), id = dict(aliases=['droplet_id'], type='int'), ip = dict(type='str'), ), required_one_of = ( ['id', 'name'], ), ) if not HAS_DOPY: module.fail_json(msg='dopy required for this module') try: core(module) except (DoError, Exception) as e: module.fail_json(msg=str(e), exception=traceback.format_exc())
def main(): module = AnsibleModule( argument_spec=dict( state = dict(choices=['present', 'absent'], required=True), command = dict(choices=['create', 'attach'], required=True), api_token = dict(aliases=['API_TOKEN'], no_log=True), block_size = dict(type='int'), volume_name = dict(type='str', required=True), description = dict(type='str'), region = dict(type='str', required=True), droplet_id = dict(type='int'), timeout = dict(type='int', default=10), ), ) try: handle_request(module) except DOBlockStorageException: e = get_exception() module.fail_json(msg=e.message) except KeyError: e = get_exception() module.fail_json(msg='Unable to load %s' % e.message)
def main(): module = AnsibleModule( argument_spec = dict( state = dict(choices=['present', 'absent'], default='present'), client_id = dict(aliases=['CLIENT_ID'], no_log=True), api_key = dict(aliases=['API_KEY'], no_log=True), name = dict(type='str'), id = dict(aliases=['droplet_id'], type='int'), ssh_pub_key = dict(type='str'), ), required_one_of = ( ['id', 'name'], ), ) if not HAS_DOPY: module.fail_json(msg='dopy required for this module') try: core(module) except (DoError, Exception) as e: module.fail_json(msg=str(e), exception=traceback.format_exc())
def main(): module = AnsibleModule( argument_spec = dict( user = dict(required=True, type='str'), key = dict(required=True, type='str'), path = dict(required=False, type='str'), manage_dir = dict(required=False, type='bool', default=True), state = dict(default='present', choices=['absent','present']), key_options = dict(required=False, type='str'), unique = dict(default=False, type='bool'), exclusive = dict(default=False, type='bool'), validate_certs = dict(default=True, type='bool'), ), supports_check_mode=True ) results = enforce_state(module, module.params) module.exit_json(**results)
def main(): argument_spec = ec2_argument_spec() argument_spec.update( dict( duration_seconds = dict(required=False, default=None, type='int'), mfa_serial_number = dict(required=False, default=None), mfa_token = dict(required=False, default=None) ) ) module = AnsibleModule(argument_spec=argument_spec) if not HAS_BOTO3: module.fail_json(msg='boto3 and botocore are required.') region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True) if region: connection = boto3_conn(module, conn_type='client', resource='sts', region=region, endpoint=ec2_url, **aws_connect_kwargs) else: module.fail_json(msg="region must be specified") get_session_token(connection, module)
def main(): argument_spec = ec2_argument_spec() argument_spec.update( dict( filters = dict(default=None, type='dict') ) ) module = AnsibleModule(argument_spec=argument_spec) if not HAS_BOTO: module.fail_json(msg='boto required for this module') region, ec2_url, aws_connect_params = get_aws_connection_info(module) if region: try: connection = connect_to_aws(boto.vpc, region, **aws_connect_params) except (boto.exception.NoAuthHandlerFound, StandardError) as e: module.fail_json(msg=str(e)) else: module.fail_json(msg="region must be specified") list_ec2_vpcs(connection, module)
def main(): argument_spec = ec2_argument_spec() argument_spec.update( dict( filters = dict(default=None, type='dict') ) ) module = AnsibleModule(argument_spec=argument_spec) if not HAS_BOTO: module.fail_json(msg='boto required for this module') region, ec2_url, aws_connect_params = get_aws_connection_info(module) if region: try: connection = connect_to_aws(boto.vpc, region, **aws_connect_params) except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e: module.fail_json(msg=str(e)) else: module.fail_json(msg="region must be specified") list_ec2_vpc_route_tables(connection, module)
def main(): argument_spec = ec2_argument_spec() argument_spec.update( dict( filters = dict(default=None, type='dict') ) ) module = AnsibleModule(argument_spec=argument_spec) if not HAS_BOTO: module.fail_json(msg='boto required for this module') region, ec2_url, aws_connect_params = get_aws_connection_info(module) if region: try: connection = connect_to_aws(boto.vpc, region, **aws_connect_params) except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e: module.fail_json(msg=str(e)) else: module.fail_json(msg="region must be specified") list_ec2_vpc_subnets(connection, module)
def main(): argument_spec = ec2_argument_spec() argument_spec.update( dict( filters = dict(default=None, type='dict') ) ) module = AnsibleModule(argument_spec=argument_spec) if not HAS_BOTO: module.fail_json(msg='boto required for this module') region, ec2_url, aws_connect_params = get_aws_connection_info(module) if region: try: connection = connect_to_aws(boto.ec2, region, **aws_connect_params) except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e: module.fail_json(msg=str(e)) else: module.fail_json(msg="region must be specified") list_ec2_instances(connection, module)
def __new__(cls, module): """Return the platform-specific subclass. It does not use load_platform_subclass() because it need to judge based on whether the `timedatectl` command exists. Args: module: The AnsibleModule. """ if get_platform() == 'Linux': if module.get_bin_path('timedatectl') is not None: return super(Timezone, SystemdTimezone).__new__(SystemdTimezone) else: return super(Timezone, NosystemdTimezone).__new__(NosystemdTimezone) else: # Not supported yet return super(Timezone, Timezone).__new__(Timezone)
def __init__(self, module): """Initialize of the class. Args: module: The AnsibleModule. """ super(Timezone, self).__init__() self.msg = [] # `self.value` holds the values for each params on each phases. # Initially there's only info of "planned" phase, but the # `self.check()` function will fill out it. self.value = dict() for key in module.argument_spec: value = module.params[key] if value is not None: self.value[key] = dict(planned=value) self.module = module
def main(): '''Main function, dispatches logic''' module = AnsibleModule( argument_spec=dict( start=dict(required=False, type='int'), stop=dict(required=False, type='int'), category=dict(required=True), title=dict(required=True), description=dict(required=True), duration=dict(required=False, type='int'), api_key=dict(required=True, no_log=True) ) ) annotation = create_annotation(module) try: resp = post_annotation(annotation, module.params['api_key']) except requests.exceptions.RequestException: err_str = get_exception() module.fail_json(msg='Request Failed', reason=err_str) module.exit_json(changed=True, annotation=resp.json())
def config_absent(self, module, proposed, existing): """ This function is used to determine the appropriate configuration to remove from the FortiManager when the "state" parameter is set to "absent" and to collect the dictionary data that will be returned by the Ansible Module. :param module: The AnsibleModule instance. :param proposed: The proposed config to send to the FortiManager. :param existing: The existing configuration for the item on the FortiManager (using the "name" key to get item). :return: A dictionary containing the module exit values. """ changed = False config = {} if existing: config = self.config_delete(module, proposed["policyid"]) changed = True return {"changed": changed, "config": config, "existing": existing}
def config_absent(self, module, proposed, existing): """ This function is used to determine the appropriate configuration to remove from the FortiManager when the "state" parameter is set to "absent" and to collect the dictionary data that will be returned by the Ansible Module. :param module: The AnsibleModule instance. :param proposed: The proposed config to send to the FortiManager. :param existing: The existing configuration for the item on the FortiManager (using the "name" key to get item). :return: A dictionary containing the module exit values. """ changed = False config = {} # proposed length of 1 is just the sequence number of the route if existing: config = self.config_delete(module, existing["seq-num"]) changed = True else: existing = {} return {"changed": changed, "config": config, "existing": existing}
def main(): module = AnsibleModule( argument_spec={ 'jenkins_home': {'default': '/var/lib/jenkins'}, 'update_center_id': {'required': True}, 'update_center_url': {'required': True} } ) update_ctr_config = os.path.join(module.params['jenkins_home'], 'hudson.model.UpdateCenter.xml') tree = ET.parse(update_ctr_config) if module.params['update_center_id'] == 'default': changed = set_default(tree, module.params['update_center_url']) else: changed = append(tree, module.params['update_center_id'], module.params['update_center_url']) if changed: tree.write(update_ctr_config, encoding='UTF-8') module.exit_json(changed=changed)
def main(): argument_spec = ovirt_full_argument_spec( pattern=dict(default='', required=False), ) module = AnsibleModule(argument_spec) check_sdk(module) try: connection = create_connection(module.params.pop('auth')) templates_service = connection.system_service().templates_service() templates = templates_service.list(search=module.params['pattern']) module.exit_json( changed=False, ansible_facts=dict( ovirt_templates=[ get_dict_of_struct(c) for c in templates ], ), ) except Exception as e: module.fail_json(msg=str(e), exception=traceback.format_exc()) finally: connection.close(logout=False)
def main(): argument_spec = ovirt_full_argument_spec( pattern=dict(default='', required=False), ) module = AnsibleModule(argument_spec) check_sdk(module) try: connection = create_connection(module.params.pop('auth')) clusters_service = connection.system_service().clusters_service() clusters = clusters_service.list(search=module.params['pattern']) module.exit_json( changed=False, ansible_facts=dict( ovirt_clusters=[ get_dict_of_struct(c) for c in clusters ], ), ) except Exception as e: module.fail_json(msg=str(e), exception=traceback.format_exc()) finally: connection.close(logout=False)
def main(): argument_spec = ovirt_full_argument_spec( pattern=dict(default='', required=False), ) module = AnsibleModule(argument_spec) check_sdk(module) try: connection = create_connection(module.params.pop('auth')) storage_domains_service = connection.system_service().storage_domains_service() storage_domains = storage_domains_service.list(search=module.params['pattern']) module.exit_json( changed=False, ansible_facts=dict( ovirt_storage_domains=[ get_dict_of_struct(c) for c in storage_domains ], ), ) except Exception as e: module.fail_json(msg=str(e), exception=traceback.format_exc()) finally: connection.close(logout=False)
def main(): argument_spec = ovirt_full_argument_spec( pattern=dict(default='', required=False), ) module = AnsibleModule(argument_spec) check_sdk(module) try: connection = create_connection(module.params.pop('auth')) vmpools_service = connection.system_service().vm_pools_service() vmpools = vmpools_service.list(search=module.params['pattern']) module.exit_json( changed=False, ansible_facts=dict( ovirt_vm_pools=[ get_dict_of_struct(c) for c in vmpools ], ), ) except Exception as e: module.fail_json(msg=str(e), exception=traceback.format_exc()) finally: connection.close(logout=False)
def main(): argument_spec = ovirt_full_argument_spec( pattern=dict(default='', required=False), ) module = AnsibleModule(argument_spec) check_sdk(module) try: connection = create_connection(module.params.pop('auth')) users_service = connection.system_service().users_service() users = users_service.list(search=module.params['pattern']) module.exit_json( changed=False, ansible_facts=dict( ovirt_users=[ get_dict_of_struct(c) for c in users ], ), ) except Exception as e: module.fail_json(msg=str(e), exception=traceback.format_exc()) finally: connection.close(logout=False)
def main(): argument_spec = ovirt_full_argument_spec( pattern=dict(default='', required=False), ) module = AnsibleModule(argument_spec) check_sdk(module) try: connection = create_connection(module.params.pop('auth')) networks_service = connection.system_service().networks_service() networks = networks_service.list(search=module.params['pattern']) module.exit_json( changed=False, ansible_facts=dict( ovirt_networks=[ get_dict_of_struct(c) for c in networks ], ), ) except Exception as e: module.fail_json(msg=str(e), exception=traceback.format_exc()) finally: connection.close(logout=False)
def main(): from ansible.module_utils.basic import AnsibleModule module = AnsibleModule({ "egg_path": {"required": True, "type": "path"} }) sys.path.append(module.params["egg_path"]) import falafel module.exit_json(nvr=falafel.get_nvr())
def main (): spec = dict (provider = dict (required = True)) spec.update (iosxr_argument_spec) module = AnsibleModule (argument_spec = spec) command = {"command": "clear logging", "prompt": CLI_PROMPT_RE, "answer": "y"} response = run_commands (module, command) result = dict (changed = True) result["stdout"] = response result["stdout_lines"] = response return module.exit_json (**result)
def main (): spec = dict (provider = dict (required = True), confirm= dict (required = True), location= dict (required = False, default = None), force= dict (required = False, type="bool", default = False)) spec.update (iosxr_argument_spec) module = AnsibleModule (argument_spec = spec) args = module.params force = args["force"] location = args["location"] result = dict (changed = False) if args["confirm"] != "yes": result["stdout"] = "reload aborted" module.exit_json (**result) # setup reload command expecting specific prompt to return reload_command = "reload " if location != None: reload_command = reload_command + "location %s " % location if force is True: reload_command = reload_command + "force " command = {"command": reload_command, "prompt": CLI_PROMPT_RE, "answer": "y"} response = run_commands (module, command) result["stdout"] = response result["stdout_lines"] = str (result["stdout"]).split (r"\n") module.exit_json (**result)
def main (): spec = dict (provider = dict (requreid = True), pkgpath= dict (required = False, default = None), pkgname = dict (required = True, default = None), state = dict (required = False, default = "present", choices = ["present", "absent", "updated", "activated", "deactivated", "committed"])) spec.update (iosxr_argument_spec) module = AnsibleModule (argument_spec = spec) args = module.params state = args["state"] # cannot run on classic XR if is_legacy_iosxr (module): module.fail_json (msg = "use 'xr32_install_package' on 32-bit IOS-XR instead") # make sure no other install in progress if is_install_in_progress (module): module.fail_json (msg = "other install operation in progress") install = { "present": install_add, "absent": install_remove, "updated": install_update, "activated": install_activate, "deactivated": install_deactivate, "committed": install_commit } result = install[state] (module, args["pkgpath"], args["pkgname"]) module.exit_json (**result)
def main (): spec = dict (provider = dict (required = True), keyfile = dict (required = True)) spec.update (iosxr_argument_spec) module = AnsibleModule (argument_spec = spec) args = module.params keyfile = args["keyfile"] show_command = "show crypto key authentication rsa" response = run_commands (module, show_command) cmd = "crypto key import authentication rsa " + keyfile if "authentication" in response[0]: crypto_command = {"command": cmd, "prompt": CLI_PROMPT_RE, "answer": "yes"} else: crypto_command = cmd response = run_commands (module, crypto_command) # now show the new key response = run_commands (module, show_command) result = dict (changed = True) result["stdout"] = response result["stdout_lines"] = str (result["stdout"]).split (r"\n") module.exit_json (**result)
def main (): spec = dict (provider = dict (required = True), confirm = dict (required = True)) spec.update (iosxr_argument_spec) module = AnsibleModule (argument_spec = spec) result = dict (changed = False) if module.params["confirm"] != "yes": result["stdout"] = "clear configs aborted" module.exit_json (**result) command = "configure terminal" rc, out, err = exec_command (module, command) if rc != 0: module.fail_json (msg = "unable to enter configuration mode", err = to_text (err, errors = "surrogate_or_strict")) command = {"command": "commit replace", "prompt": CLI_PROMPT_RE, "answer": "yes"} run_commands (module, command, check_rc = False) result = dict (changed = False) result["stdout"] = "all configs cleared" result["stdout_lines"] = str (result["stdout"]).split (r"\n") return module.exit_json (**result)
def main(): spec = dict (provider = dict (required = True), component = dict (required = False, default = "all")) spec.update (iosxr_argument_spec) module = AnsibleModule (argument_spec = spec) args = module.params comp = args["component"] command = "show running-config " + comp response = run_commands (module, command) result = dict (changed = False) result["stdout"] = response result["stdout_lines"] = str (result["stdout"]).split (r"\n") return module.exit_json (**result)
def main(): module = AnsibleModule(argument_spec={ 'whiteboard': {'required': False, 'type': 'str'}, 'recipesets': {'required': True, 'type': 'list'}, 'job_group': {'default': '', 'type': 'str'}, 'state': {'default': 'present', 'choices': ['present', 'absent']}, 'cancel_message': {'default': 'Job canceled by Ansible'} }) params = type('Args', (object,), module.params) factory = BkrFactory() # logs = [] job_ids = [] for recipeset in params.recipesets: for x in range(0, recipeset['count']): if params.state == 'present': extra_params = {} if params.job_group: extra_params['job_group'] = params.job_group # Make provision job_id = factory.provision(debug=True, wait=True, recipesets=[recipeset], whiteboard=params.whiteboard, **extra_params) job_ids.extend(job_id) else: factory.cancel_jobs(recipeset['ids'], params.cancel_message) if params.state == 'present': parsed_ids = [] for id_string in job_ids: parsed_ids.append(id_string[2:]) module.exit_json(success=False, ids=parsed_ids, changed=True) else: module.exit_json(success=True, changed=True)
def main(): mod = AnsibleModule(argument_spec={ 'ids': {'type': 'list'}, 'skip_no_system': {'type': 'bool', 'default': False}, 'max_attempts': {'type': 'int', 'default': 60} }) beaker = BeakerTargets(mod.params) try: results = beaker.get_system_statuses() mod.exit_json(hosts=results, changed=True, success=True) except Exception as ex: mod.fail_json(msg=str(ex)) # import module snippets
def main(): module = AnsibleModule( argument_spec=dict( name=dict(type='str'), count=dict(default=1, type='int'), state=dict(choices=['present', 'absent']), ), ) try: d = Dummy() execute_output = d.execute(module) json_output = {} host = execute_output.get('host') changed = execute_output.get('changed') if host or changed is not None: json_output['changed'] = True json_output.update(execute_output) else: json_output['changed'] = False module.exit_json(**json_output) except Exception as e: module.fail_json(msg=str(e)) # ---- Import Ansible Utilities (Ansible Framework) -------------------#
def main(): module = AnsibleModule( argument_spec={ 'data': {'required': True, 'aliases': ['topology']}, 'schema': {'required': True}, 'data_format': {'required': False, 'choices': ['json', 'yaml', 'yml']}, }, required_one_of=[], supports_check_mode=True ) data_file_path = os.path.expanduser(module.params['data']) schema_file_path = os.path.expanduser(module.params['schema']) check_file_paths(module, data_file_path, schema_file_path) validate_values(module, data_file_path) schema_obj = JSONSchema(data_file_path, schema_file_path) status, out = schema_obj.validate() # module.fail_json(msg=out) if status: changed = True module.exit_json(isvalid=changed, out=out) else: resp = {"path": data_file_path, "schema": schema_file_path, "output": out} module.fail_json(msg=resp)
def main(): module = AnsibleModule( argument_spec=dict( accepted_requests=dict(type='list', required=True), ), supports_check_mode=True ) accepted_requests = module.params['accepted_requests'] peering_info = mocked_data module.exit_json(peering_info=peering_info, msg=str(peering_info))
def main(): module = AnsibleModule( argument_spec=dict( state=dict(type='str', required=True, choices=['in-progress']), ), supports_check_mode=True ) state = module.params['state'] pinder = hammock.Hammock('http://127.0.0.1:8000') sessions = pinder.api.requests.GET(params='state={}'.format(state)).json()['results'] module.exit_json(sessions=sessions, msg=str(sessions))
def main(): argument_spec = { "repos": {"required": True, "type": "list"}, "state": { "default": "present", "choices": ['present', 'absent'], "type": 'str' }, "only": {"default": 'no', "required": False, "type": "str", "choices": ['yes', 'no']}, } module = AnsibleModule(argument_spec=argument_spec) repos = module.params['repos'] state = module.params['state'] only = module.params['only'] if state == 'present': if only == 'yes': os.system("subscription-manager repos --disable='*'" % repos) repos = ' '.join(['--enable=' + repo for repo in repos]) # result = os.system("subscription-manager repos %s" % repos) result = os.popen("subscription-manager repos %s" % repos).read() if 'Error' in result: module.fail_json(msg=result) meta = {'result': result} changed = True skipped = False else: repos = ' '.join(['--disable=' + repo for repo in repos]) result = os.popen("subscription-manager repos %s" % repos).read() if 'Error' in result: module.fail_json(msg=result) meta = {'result': result} changed = True skipped = False module.exit_json(changed=changed, skipped=skipped, meta=meta)