我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用socket.getservbyname()。
def init(self): """ Method used to setup the AOS-server URL given the `server` and `port` arguments. Args: server (str): server-name/ip-address (does not include the http:// prefix) port (int): the AOS server port """ target = self.session._target res = urlparse(target) if res.scheme: self.server = res.hostname self.port = res.port or socket.getservbyname(res.scheme) self.url = target + "/api" else: raise ValueError("AOS-server target(%s) not a proper URL" % target)
def heart_beat(): count = 0 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: port = int(PORT) except ValueError: port = socket.getservbyname(PORT, 'udp') s.connect((HOST, port)) s.settimeout(5) while True: try: s.sendall(b'#Hi') count += 1 print('Send a heart beat', count) time.sleep(10) # Set your heart beat delay except KeyboardInterrupt: sys.exit(1) except: continue
def test(): """Test program for telnetlib. Usage: python telnetlib.py [-d] ... [host [port]] Default host is localhost; default port is 23. """ debuglevel = 0 while sys.argv[1:] and sys.argv[1] == '-d': debuglevel = debuglevel+1 del sys.argv[1] host = 'localhost' if sys.argv[1:]: host = sys.argv[1] port = 0 if sys.argv[2:]: portstr = sys.argv[2] try: port = int(portstr) except ValueError: port = socket.getservbyname(portstr, 'tcp') tn = Telnet() tn.set_debuglevel(debuglevel) tn.open(host, port, timeout=0.5) tn.interact() tn.close()
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True): address = tok.get_string() protocol = tok.get_string() if protocol.isdigit(): protocol = int(protocol) else: protocol = socket.getprotobyname(protocol) bitmap = bytearray() while 1: token = tok.get().unescape() if token.is_eol_or_eof(): break if token.value.isdigit(): serv = int(token.value) else: if protocol != _proto_udp and protocol != _proto_tcp: raise NotImplementedError("protocol must be TCP or UDP") if protocol == _proto_udp: protocol_text = "udp" else: protocol_text = "tcp" serv = socket.getservbyname(token.value, protocol_text) i = serv // 8 l = len(bitmap) if l < i + 1: for j in xrange(l, i + 1): bitmap.append(0) bitmap[i] = bitmap[i] | (0x80 >> (serv % 8)) bitmap = dns.rdata._truncate_bitmap(bitmap) return cls(rdclass, rdtype, address, protocol, bitmap)
def __init__(self, host, port, factory, timeout, bindAddress, reactor=None): self.host = host if isinstance(port, types.StringTypes): try: port = socket.getservbyname(port, 'tcp') except socket.error, e: raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port)) self.port = port self.bindAddress = bindAddress base.BaseConnector.__init__(self, factory, timeout, reactor)
def prepareAddress(self): host, port = self.addr if iocpdebug.debug: print "connecting to (%s, %s)" % (host, port) if isinstance(port, types.StringTypes): try: port = socket.getservbyname(port, 'tcp') except socket.error, e: raise error.ServiceNameUnknownError(string=str(e)) self.addr= (host, port)
def testConnectByService(self): serv = socket.getservbyname d = defer.succeed(None) try: s = MyServerFactory() port = reactor.listenTCP(0, s, interface="127.0.0.1") self.n = port.getHost().port socket.getservbyname = (lambda s, p,n=self.n: s == 'http' and p == 'tcp' and n or 10) self.ports.append(port) cf = MyClientFactory() try: c = reactor.connectTCP('127.0.0.1', 'http', cf) except: socket.getservbyname = serv raise d = loopUntil( lambda: (getattr(s, 'protocol', None) is not None and getattr(cf, 'protocol', None) is not None)) d.addBoth(lambda x: self.cleanPorts(port, c.transport, cf.protocol.transport)) finally: socket.getservbyname = serv d.addCallback(lambda x : self.assert_(s.called, '%s was not called' % (s,))) return d
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): address = tok.get_string() protocol = tok.get_string() if protocol.isdigit(): protocol = int(protocol) else: protocol = socket.getprotobyname(protocol) bitmap = [] while 1: token = tok.get().unescape() if token.is_eol_or_eof(): break if token.value.isdigit(): serv = int(token.value) else: if protocol != _proto_udp and protocol != _proto_tcp: raise NotImplementedError("protocol must be TCP or UDP") if protocol == _proto_udp: protocol_text = "udp" else: protocol_text = "tcp" serv = socket.getservbyname(token.value, protocol_text) i = serv // 8 l = len(bitmap) if l < i + 1: for j in xrange(l, i + 1): bitmap.append('\x00') bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8))) bitmap = dns.rdata._truncate_bitmap(bitmap) return cls(rdclass, rdtype, address, protocol, bitmap)
def testGetServBy(self): eq = self.assertEqual # Find one service that exists, then check all the related interfaces. # I've ordered this by protocols that have both a tcp and udp # protocol, at least for modern Linuxes. if (sys.platform.startswith('linux') or sys.platform.startswith('freebsd') or sys.platform.startswith('netbsd') or sys.platform == 'darwin'): # avoid the 'echo' service on this platform, as there is an # assumption breaking non-standard port/protocol entry services = ('daytime', 'qotd', 'domain') else: services = ('echo', 'daytime', 'domain') for service in services: try: port = socket.getservbyname(service, 'tcp') break except socket.error: pass else: raise socket.error # Try same call with optional protocol omitted port2 = socket.getservbyname(service) eq(port, port2) # Try udp, but don't barf it it doesn't exist try: udpport = socket.getservbyname(service, 'udp') except socket.error: udpport = None else: eq(udpport, port) # Now make sure the lookup by port returns the same service name eq(socket.getservbyport(port2), service) eq(socket.getservbyport(port, 'tcp'), service) if udpport is not None: eq(socket.getservbyport(udpport, 'udp'), service) # Make sure getservbyport does not accept out of range ports. self.assertRaises((OverflowError, ValueError), socket.getservbyport, -1) self.assertRaises((OverflowError, ValueError), socket.getservbyport, 65536)
def testGetServBy(self): eq = self.assertEqual # Find one service that exists, then check all the related interfaces. # I've ordered this by protocols that have both a tcp and udp # protocol, at least for modern Linuxes. if (sys.platform.startswith('linux') or sys.platform.startswith('freebsd') or sys.platform.startswith('netbsd') or sys.platform == 'darwin'): # avoid the 'echo' service on this platform, as there is an # assumption breaking non-standard port/protocol entry services = ('daytime', 'qotd', 'domain') else: services = ('echo', 'daytime', 'domain') for service in services: try: port = socket.getservbyname(service, 'tcp') break except socket.error: pass else: raise socket.error # Try same call with optional protocol omitted port2 = socket.getservbyname(service) eq(port, port2) # Try udp, but don't barf if it doesn't exist try: udpport = socket.getservbyname(service, 'udp') except socket.error: udpport = None else: eq(udpport, port) # Now make sure the lookup by port returns the same service name eq(socket.getservbyport(port2), service) eq(socket.getservbyport(port, 'tcp'), service) if udpport is not None: eq(socket.getservbyport(udpport, 'udp'), service) # Make sure getservbyport does not accept out of range ports. self.assertRaises(OverflowError, socket.getservbyport, -1) self.assertRaises(OverflowError, socket.getservbyport, 65536)
def testGetServBy(self): eq = self.assertEqual # Find one service that exists, then check all the related interfaces. # I've ordered this by protocols that have both a tcp and udp # protocol, at least for modern Linuxes. if (sys.platform.startswith(('freebsd', 'netbsd')) or sys.platform in ('linux', 'darwin')): # avoid the 'echo' service on this platform, as there is an # assumption breaking non-standard port/protocol entry services = ('daytime', 'qotd', 'domain') else: services = ('echo', 'daytime', 'domain') for service in services: try: port = socket.getservbyname(service, 'tcp') break except socket.error: pass else: raise socket.error # Try same call with optional protocol omitted port2 = socket.getservbyname(service) eq(port, port2) # Try udp, but don't barf if it doesn't exist try: udpport = socket.getservbyname(service, 'udp') except socket.error: udpport = None else: eq(udpport, port) # Now make sure the lookup by port returns the same service name eq(socket.getservbyport(port2), service) eq(socket.getservbyport(port, 'tcp'), service) if udpport is not None: eq(socket.getservbyport(udpport, 'udp'), service) # Make sure getservbyport does not accept out of range ports. self.assertRaises(OverflowError, socket.getservbyport, -1) self.assertRaises(OverflowError, socket.getservbyport, 65536)
def testGetServBy(self): eq = self.assertEqual # Find one service that exists, then check all the related interfaces. # I've ordered this by protocols that have both a tcp and udp # protocol, at least for modern Linuxes. if (sys.platform.startswith('linux') or sys.platform.startswith('freebsd') or sys.platform.startswith('netbsd') or sys.platform == 'darwin'): # avoid the 'echo' service on this platform, as there is an # assumption breaking non-standard port/protocol entry services = ('daytime', 'qotd', 'domain') else: services = ('echo', 'daytime', 'domain') for service in services: try: port = socket.getservbyname(service, 'tcp') break except socket.error: pass else: raise socket.error # Try same call with optional protocol omitted port2 = socket.getservbyname(service) eq(port, port2) # Try udp, but don't barf if it doesn't exist try: udpport = socket.getservbyname(service, 'udp') except socket.error: udpport = None else: eq(udpport, port) # Now make sure the lookup by port returns the same service name eq(socket.getservbyport(port2), service) eq(socket.getservbyport(port, 'tcp'), service) if udpport is not None: eq(socket.getservbyport(udpport, 'udp'), service) # Make sure getservbyport does not accept out of range ports. self.assertRaises((OverflowError, ValueError), socket.getservbyport, -1) self.assertRaises((OverflowError, ValueError), socket.getservbyport, 65536)
def testGetServBy(self): eq = self.assertEqual # Find one service that exists, then check all the related interfaces. # I've ordered this by protocols that have both a tcp and udp # protocol, at least for modern Linuxes. if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd')) or sys.platform in ('linux', 'darwin')): # avoid the 'echo' service on this platform, as there is an # assumption breaking non-standard port/protocol entry services = ('daytime', 'qotd', 'domain') else: services = ('echo', 'daytime', 'domain') for service in services: try: port = socket.getservbyname(service, 'tcp') break except OSError: pass else: raise OSError # Try same call with optional protocol omitted port2 = socket.getservbyname(service) eq(port, port2) # Try udp, but don't barf if it doesn't exist try: udpport = socket.getservbyname(service, 'udp') except OSError: udpport = None else: eq(udpport, port) # Now make sure the lookup by port returns the same service name eq(socket.getservbyport(port2), service) eq(socket.getservbyport(port, 'tcp'), service) if udpport is not None: eq(socket.getservbyport(udpport, 'udp'), service) # Make sure getservbyport does not accept out of range ports. self.assertRaises(OverflowError, socket.getservbyport, -1) self.assertRaises(OverflowError, socket.getservbyport, 65536)
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): address = tok.get_string() protocol = tok.get_string() if protocol.isdigit(): protocol = int(protocol) else: protocol = socket.getprotobyname(protocol) bitmap = [] while 1: token = tok.get().unescape() if token.is_eol_or_eof(): break if token.value.isdigit(): serv = int(token.value) else: if protocol != _proto_udp and protocol != _proto_tcp: raise NotImplementedError("protocol must be TCP or UDP") if protocol == _proto_udp: protocol_text = "udp" else: protocol_text = "tcp" serv = socket.getservbyname(token.value, protocol_text) i = serv // 8 l = len(bitmap) if l < i + 1: for j in xrange(l, i + 1): bitmap.append('\x00') bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8))) bitmap = rdata._truncate_bitmap(bitmap) return cls(rdclass, rdtype, address, protocol, bitmap)
def __init__(self, host, port, factory, timeout, bindAddress, reactor=None): if isinstance(port, _portNameType): try: port = socket.getservbyname(port, 'tcp') except socket.error as e: raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port)) self.host, self.port = host, port if abstract.isIPv6Address(host): self._addressType = address.IPv6Address self.bindAddress = bindAddress base.BaseConnector.__init__(self, factory, timeout, reactor)
def test_connectByService(self): """ L{IReactorTCP.connectTCP} accepts the name of a service instead of a port number and connects to the port number associated with that service, as defined by L{socket.getservbyname}. """ serverFactory = MyServerFactory() serverConnMade = defer.Deferred() serverFactory.protocolConnectionMade = serverConnMade port = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") self.addCleanup(port.stopListening) portNumber = port.getHost().port clientFactory = MyClientFactory() clientConnMade = defer.Deferred() clientFactory.protocolConnectionMade = clientConnMade def fakeGetServicePortByName(serviceName, protocolName): if serviceName == 'http' and protocolName == 'tcp': return portNumber return 10 self.patch(socket, 'getservbyname', fakeGetServicePortByName) reactor.connectTCP('127.0.0.1', 'http', clientFactory) connMade = defer.gatherResults([serverConnMade, clientConnMade]) def connected(result): serverProtocol, clientProtocol = result self.assertTrue( serverFactory.called, "Server factory was not called upon to build a protocol.") serverProtocol.transport.loseConnection() clientProtocol.transport.loseConnection() connMade.addCallback(connected) return connMade
def get(addr, args={}, headers={}, version='HTTP/1.1', auth=()): """ It does an http/https request. """ addr = addr.strip().rstrip() url = urlparse(addr) default = { 'user-agent':'Websnake/1.0.0', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'connection':'close', 'host': url.hostname} default.update(headers) args = '?%s' % urlencode(args) if args else '' if auth: default['authorization'] = build_auth(*auth) data = 'GET %s%s %s\r\n' % (url.path + ('?' + url.query if \ url.query else ''), args, version) data = data + build_headers(default) port = url.port if url.port else getservbyname(url.scheme) data = data.encode('utf8') return create_con_ssl(url.hostname, port, data) \ if url.scheme == 'https' else create_con(url.hostname, port, data)
def post(addr, payload=b'', version='HTTP/1.1', headers={}, auth=()): """ """ addr = addr.strip().rstrip() url = urlparse(addr) default = {'user-agent':"Untwisted-requests/1.0.0", 'accept-charset':b'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'connection':'close', 'host': url.hostname, 'content-type': 'application/x-www-form-urlencoded', 'content-length': len(payload)} default.update(headers) request = 'POST %s %s\r\n' % (url.path + ('?' + url.query if \ url.query else ''), version) if auth: default['authorization'] = build_auth(*auth) request = (request + build_headers(default)).encode('utf8') + payload port = url.port if url.port else getservbyname(url.scheme) return create_con_ssl(url.hostname, port, request) \ if url.scheme == 'https' else create_con(url.hostname, port, request)
def test(): """Test program for njelib. Usage: python njelib.py [-d] ... [host [port]] [RHOST OHOST] Default host is localhost; default port is 175. """ debuglevel = 0 while sys.argv[1:] and sys.argv[1] == '-d': debuglevel = debuglevel+1 del sys.argv[1] host = 'localhost' if sys.argv[1:]: host = sys.argv[1] port = 175 if sys.argv[2:]: portstr = sys.argv[2] try: port = int(portstr) except ValueError: port = socket.getservbyname(portstr, 'tcp') rhost = ohost = 'FAKE' if sys.argv[3:]: rhost = sys.argv[3] ohost = sys.argv[4] password = '' if sys.argv[5:]: password = sys.argv[5] nje = NJE(ohost,rhost) nje.set_debuglevel(debuglevel) t = nje.signon(host=host,port=port, timeout=2, password=password) if t: print "[+] Connection Successful" else: print "[!] Connection Failed"