我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用socket.getfqdn()。
def __init__(self, server, conn, addr): asynchat.async_chat.__init__(self, conn) self.__server = server self.__conn = conn self.__addr = addr self.__line = [] self.__state = self.COMMAND self.__greeting = 0 self.__mailfrom = None self.__rcpttos = [] self.__data = '' self.__fqdn = socket.getfqdn() self.__peer = conn.getpeername() print >> DEBUGSTREAM, 'Peer:', repr(self.__peer) self.push('220 %s %s' % (self.__fqdn, __version__)) self.set_terminator('\r\n') # Overrides base class for convenience
def __init__(self, server_address): # Create a listening socket self.listen_socket = listen_socket = socket.socket( self.address_family, self.socket_type ) # Allow to reuse the same address listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Bind listen_socket.bind(server_address) # Activate listen_socket.listen(self.request_queue_size) # Get server host name and port host, port = self.listen_socket.getsockname()[:2] self.server_name = socket.getfqdn(host) self.server_port = port # Return headers set by Web framework/Web application self.headers_set = []
def generate_comment(self): docs = [] for ext in self.bot.extensions: doc = ext.__class__.__doc__ if not doc: continue docs.append(inspect.cleandoc(doc)) help_ = '\n\n'.join(docs) return self.HELP % dict( extensions=','.join(sorted(self.bot.extensions_map.keys())), help=help_, host=socket.getfqdn(), me=self.current.head.repository.SETTINGS.NAME, mentions=', '.join(sorted([ '@' + m for m in self.current.help_mentions ])), software=self.DISTRIBUTION.project_name, version=self.DISTRIBUTION.version, )
def report_failure(self, server_uuid, errno): """Report failure to Fabric This method sets the status of a MySQL server identified by server_uuid. """ if not self._report_errors: return errno = int(errno) current_host = socket.getfqdn() if errno in REPORT_ERRORS or errno in REPORT_ERRORS_EXTRA: _LOGGER.debug("Reporting error %d of server %s", errno, server_uuid) inst = self.get_instance() try: data = inst.execute('threat', 'report_failure', server_uuid, current_host, errno) FabricResponse(data) except (Fault, socket.error) as exc: _LOGGER.debug("Failed reporting server to Fabric (%s)", str(exc)) # Not requiring further action
def override_system_resolver(resolver=None): """Override the system resolver routines in the socket module with versions which use dnspython's resolver. This can be useful in testing situations where you want to control the resolution behavior of python code without having to change the system's resolver settings (e.g. /etc/resolv.conf). The resolver to use may be specified; if it's not, the default resolver will be used. @param resolver: the resolver to use @type resolver: dns.resolver.Resolver object or None """ if resolver is None: resolver = get_default_resolver() global _resolver _resolver = resolver socket.getaddrinfo = _getaddrinfo socket.getnameinfo = _getnameinfo socket.getfqdn = _getfqdn socket.gethostbyname = _gethostbyname socket.gethostbyname_ex = _gethostbyname_ex socket.gethostbyaddr = _gethostbyaddr
def __init__(self, regr, key, meta=None): self.key = key self.regr = regr self.meta = self.Meta( # pyrfc3339 drops microseconds, make sure __eq__ is sane creation_dt=datetime.datetime.now( tz=pytz.UTC).replace(microsecond=0), creation_host=socket.getfqdn()) if meta is None else meta self.id = hashlib.md5( self.key.key.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) ).hexdigest() # Implementation note: Email? Multiple accounts can have the # same email address. Registration URI? Assigned by the # server, not guaranteed to be stable over time, nor # canonical URI can be generated. ACME protocol doesn't allow # account key (and thus its fingerprint) to be updated...
def get_host_ip(hostIP=None): if hostIP is None or hostIP == 'auto': hostIP = 'ip' if hostIP == 'dns': hostIP = socket.getfqdn() elif hostIP == 'ip': from socket import gaierror try: hostIP = socket.gethostbyname(socket.getfqdn()) except gaierror: logging.warn('gethostbyname(socket.getfqdn()) failed... trying on hostname()') hostIP = socket.gethostbyname(socket.gethostname()) if hostIP.startswith("127."): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # doesn't have to be reachable s.connect(('10.255.255.255', 1)) hostIP = s.getsockname()[0] return hostIP
def __init__(self, client, path, num_clients, identifier=None): """Create a Double Barrier :param client: A :class:`~kazoo.client.KazooClient` instance. :param path: The barrier path to use. :param num_clients: How many clients must enter the barrier to proceed. :type num_clients: int :param identifier: An identifier to use for this member of the barrier when participating. Defaults to the hostname + process id. """ self.client = client self.path = path self.num_clients = num_clients self._identifier = identifier or '%s-%s' % ( socket.getfqdn(), os.getpid()) self.participating = False self.assured_path = False self.node_name = uuid.uuid4().hex self.create_path = self.path + "/" + self.node_name
def __init__(self, swf=None, identity=None): """Base class for deciders. Parameters ---------- swf: floto.api.Swf The SWF client, if swf is None an instance is created """ self.task_token = None self.last_response = None self.history = None self.decisions = [] self.run_id = None self.workflow_id = None self.domain = None self.identity = identity or socket.getfqdn(socket.gethostname()) self.swf = swf or floto.api.Swf() self.task_list = None self.terminate_workflow = False self.terminate_decider = False self._separate_process = None
def select_playbook(self, path): playbook = None if len(self.args) > 0 and self.args[0] is not None: playbook = os.path.join(path, self.args[0]) rc = self.try_playbook(playbook) if rc != 0: display.warning("%s: %s" % (playbook, self.PLAYBOOK_ERRORS[rc])) return None return playbook else: fqdn = socket.getfqdn() hostpb = os.path.join(path, fqdn + '.yml') shorthostpb = os.path.join(path, fqdn.split('.')[0] + '.yml') localpb = os.path.join(path, self.DEFAULT_PLAYBOOK) errors = [] for pb in [hostpb, shorthostpb, localpb]: rc = self.try_playbook(pb) if rc == 0: playbook = pb break else: errors.append("%s: %s" % (pb, self.PLAYBOOK_ERRORS[rc])) if playbook is None: display.warning("\n".join(errors)) return playbook
def _check_address(address): ipv4 = "" hostname = "" if is_valid_address(address): ipv4 = address try: hostname = socket.gethostbyaddr(address)[0] except Exception: pass # no DNS else: hostname = socket.getfqdn(address) try: ipv4 = socket.gethostbyname(hostname) except Exception: pass # host not valid or offline return ipv4, hostname
def post(content, token=None, username=None, public=False, debug=False): ''' Post a gist on GitHub. ''' random = hashlib.sha1(os.urandom(16)).hexdigest() username = getuser() if username is None else username now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") description = ('{hash} (twitter-message-bus); from {host} by {user} ' 'at {time} UTC.').format(host=getfqdn(), user=username, time=now, hash=random) payload = json.dumps({ 'files': { 'message': { 'content': content if content is not None else '' } }, 'public': public, 'description': description }) response = github(http='post', uri='gists', token=token, payload=payload, debug=debug) return (response['id'], random) if 'id' in response else (None, None)
def welcome(run=0): if run == 0: form = wizard.config.get('General', {}) else: form = _delist(request.form) if _has_all_parameters(form, ['host', 'country', 'state', 'locality', 'orgname', 'orgunit', 'commonname', 'email']): wizard.change_config('General', **form) return redirect(url_for('determine_ssh_status', run=0)) if 'host' not in form: form['host'] = socket.getfqdn() all_params = {'run': run, **form} return render_template('welcome.html', menu_options=wizard.get_available_options(), **all_params)
def testHostnameRes(self): # Testing hostname resolution mechanisms hostname = socket.gethostname() try: ip = socket.gethostbyname(hostname) except socket.error: # Probably name lookup wasn't set up right; skip this test return self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") try: hname, aliases, ipaddrs = socket.gethostbyaddr(ip) except socket.error: # Probably a similar problem as above; skip this test return all_host_names = [hostname, hname] + aliases fqhn = socket.getfqdn(ip) if not fqhn in all_host_names: self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
def setUp(self): self.real_getfqdn = socket.getfqdn socket.getfqdn = mock_socket.getfqdn # temporarily replace sys.stdout to capture DebuggingServer output self.old_stdout = sys.stdout self.output = io.StringIO() sys.stdout = self.output self.serv_evt = threading.Event() self.client_evt = threading.Event() # Capture SMTPChannel debug output self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM smtpd.DEBUGSTREAM = io.StringIO() # Pick a random unused port by passing 0 for the port number self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1)) # Keep a note of what port was assigned self.port = self.serv.socket.getsockname()[1] serv_args = (self.serv, self.serv_evt, self.client_evt) self.thread = threading.Thread(target=debugging_server, args=serv_args) self.thread.start() # wait until server thread has assigned a port number self.serv_evt.wait() self.serv_evt.clear()
def get_fqdn(): """ Return the current fqdn of the machine, trying hard to return a meaningful name. In particular, it means working against a NetworkManager bug which seems to make C{getfqdn} return localhost6.localdomain6 for machine without a domain since Maverick. """ fqdn = socket.getfqdn() if "localhost" in fqdn: # Try the heavy artillery fqdn = socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_IP, socket.AI_CANONNAME)[0][3] if "localhost" in fqdn: # Another fallback fqdn = socket.gethostname() return fqdn
def get_unique_code(): # get code from env code_from_env = os.environ.get("STRACK_UNIQUE_CODE") if code_from_env: return code_from_env # read code from cache unique_code_cache_file = os.path.join(STRACK_USER_PATH, "unique") if os.path.isfile(unique_code_cache_file): with open(unique_code_cache_file) as f: code_from_file = f.read() os.environ.update({"STRACK_UNIQUE_CODE": code_from_file}) return code_from_file # make dir if not os.path.isdir(STRACK_USER_PATH): os.makedirs(STRACK_USER_PATH) # generate the code computer_name = socket.getfqdn(socket.gethostname()) ip = socket.gethostbyname(computer_name) unique_code = md5(ip).hexdigest() # save cache os.environ.update({"STRACK_UNIQUE_CODE": unique_code}) with open(unique_code_cache_file, "w") as f: f.write(unique_code) return unique_code
def perform_krb181_workaround(): cmdv = [configuration.get('kerberos', 'kinit_path'), "-c", configuration.get('kerberos', 'ccache'), "-R"] # Renew ticket_cache log.info("Renewing kerberos ticket to work around kerberos 1.8.1: " + " ".join(cmdv)) ret = subprocess.call(cmdv, close_fds=True) if ret != 0: principal = "%s/%s" % (configuration.get('kerberos', 'principal'), socket.getfqdn()) fmt_dict = dict(princ=principal, ccache=configuration.get('kerberos', 'principal')) log.error("Couldn't renew kerberos ticket in order to work around " "Kerberos 1.8.1 issue. Please check that the ticket for " "'%(princ)s' is still renewable:\n" " $ kinit -f -c %(ccache)s\n" "If the 'renew until' date is the same as the 'valid starting' " "date, the ticket cannot be renewed. Please check your KDC " "configuration, and the ticket renewal policy (maxrenewlife) " "for the '%(princ)s' and `krbtgt' principals." % fmt_dict) sys.exit(ret)
def update_warc_info_from_spider(record, spider): """update a WARC warcinfo record from a scrapy Spider""" # make empty header object to use for fields # XXX WARCHeader messes up capitalization here fields = warc.WARCHeader({}, defaults=False) fields['software'] = 'osp_scraper' fields['hostname'] = socket.getfqdn() fields['x-spider-name'] = spider.name fields['x-spider-run-id'] = spider.run_id fields['x-spider-revision'] = git_revision fields['x-spider-parameters'] = json.dumps(spider.get_parameters()) buf = BytesIO() fields.write_to(buf, version_line=False, extra_crlf=False) record.update_payload(buf.getvalue())
def test_login(self): self.hosts = self.add_hosts([ config['lustre_servers'][0]['address'], config['lustre_servers'][1]['address']]) # Chroma puts its FQDN in the manager certificate, but the test config may # be pointing to localhost: if this is the case, substitute the FQDN in the # URL so that the client can validate the certificate. url = config['chroma_managers'][0]['server_http_url'] parsed = urlparse.urlparse(url) if parsed.hostname == 'localhost': parsed = list(parsed) parsed[1] = parsed[1].replace("localhost", socket.getfqdn()) url = urlparse.urlunparse(tuple(parsed)) example_api_client.setup_ca(url) hosts = example_api_client.list_hosts(url, config['chroma_managers'][0]['users'][0]['username'], config['chroma_managers'][0]['users'][0]['password'] ) self.assertListEqual(hosts, [h['fqdn'] for h in self.hosts])
def testHostnameRes(self): # Testing hostname resolution mechanisms hostname = socket.gethostname() try: ip = socket.gethostbyname(hostname) except socket.error: # Probably name lookup wasn't set up right; skip this test self.skipTest('name lookup failure') self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") try: hname, aliases, ipaddrs = socket.gethostbyaddr(ip) except socket.error: # Probably a similar problem as above; skip this test self.skipTest('address lookup failure') all_host_names = [hostname, hname] + aliases fqhn = socket.getfqdn(ip) if not fqhn in all_host_names: self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
def save(self, *args, **kwargs): created = not self.pk self.update_mail_status() if self.status_code: status_code = rfc3463_regex.match(self.status_code) if status_code: self.status_code = status_code.group(0) if not self.pk: if not self.creation_date: self.creation_date = timezone.now() if not self.source_hostname: self.source_hostname = socket.getfqdn() super().save(*args, **kwargs) if created and self.status in [self.DROPPED, self.BOUNCED]: self.should_optout(create=True)
def store_process_system_document(_process_id, _name, _parent_id=None): """ Creates a process instance structure, automatically sets systemPid, spawnedBy, host and spawnedWhen """ _struct = { "_id": _process_id, "systemPid": os.getpid(), "spawnedBy": get_current_login(), "name": _name, "host": socket.getfqdn(), "spawnedWhen": str(datetime.datetime.utcnow()), "schemaRef": "ref://of.process.system" } if _parent_id: _struct["parent_id"] = _parent_id, return _struct
def __init__(self,server_address): #create a listening socket self.listen_socket=listen_socket=socket.socket( self.address_family,self.socket_type ) #allow the same socket address listen_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #bind to the address listen_socket.bind(server_address) #activate to listening listen_socket.listen(self.request_queue_size) #get server host name and port host,port = self.listen_socket.getsockname()[:2] self.server_name=socket.getfqdn(host) self.server_port=port # return headers set by web frameworks/applications self.headers_set=[]
def register(args, config): path = '/clients/register/' my_hostname = socket.getfqdn().lower() agent_root_dir = os.path.dirname(os.path.dirname(__file__)) url = lib.lib.get_config_url(config, path) req = requests.post(url, data=json.dumps({'fqdn': my_hostname})) resp_body = lib.lib.get_response_body(req) obj = json.loads(resp_body) lib.lib.check_for_error(obj) print('Host `{}` successfully registered with CMDB!'.format(my_hostname)) print() print('API Key: {}'.format(obj.get('api_key'))) print() print('!!! WARNING: save this API Key in your conifg file: `{}`'.format( os.path.join(agent_root_dir, 'config.py') ))
def setUp(self): self.real_getfqdn = socket.getfqdn socket.getfqdn = mock_socket.getfqdn self.serv_evt = threading.Event() self.client_evt = threading.Event() # Pick a random unused port by passing 0 for the port number self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1)) # Keep a note of what port was assigned self.port = self.serv.socket.getsockname()[1] serv_args = (self.serv, self.serv_evt, self.client_evt) self.thread = threading.Thread(target=debugging_server, args=serv_args) self.thread.start() # wait until server thread has assigned a port number self.serv_evt.wait() self.serv_evt.clear()
def __init__(self): super(TransformService, self).__init__() self.coordinator = None self.group = CONF.service.coordinator_group # A unique name used for establishing election candidacy self.my_host_name = socket.getfqdn() # periodic check leader_check = loopingcall.FixedIntervalLoopingCall( self.periodic_leader_check) leader_check.start(interval=float( CONF.service.election_polling_frequency))
def _connect(self): self._transport = paramiko.Transport((self.url.host, self.url.port or self._DEFAULT_PORT)) self._transport.connect(self._get_hostkey(), self.url.user, self.url.password, gss_host=socket.getfqdn(self.url.host), gss_auth=self.GSS_AUTH, gss_kex=self.GSS_KEX) self.__client = paramiko.SFTPClient.from_transport(self._transport) self.__client.chdir()
def __init__(self, server, conn, addr): asynchat.async_chat.__init__(self, conn) self.__server = server self.__conn = conn self.__addr = addr self.__line = [] self.__state = self.COMMAND self.__greeting = 0 self.__mailfrom = None self.__rcpttos = [] self.__data = '' self.__fqdn = socket.getfqdn() try: self.__peer = conn.getpeername() except socket.error, err: # a race condition may occur if the other end is closing # before we can get the peername self.close() if err[0] != errno.ENOTCONN: raise return print >> DEBUGSTREAM, 'Peer:', repr(self.__peer) self.push('220 %s %s' % (self.__fqdn, __version__)) self.set_terminator('\r\n') # Overrides base class for convenience