我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用poplib.POP3。
def checkMailAccount(server,user,password,ssl=False,port=None): ''' ????Mail?????? ''' if not port: port = 995 if ssl else 110 try: pop3 = poplib.POP3_SSL(server, port) if ssl else poplib.POP3(server, port) pop3.user(user) auth = pop3.pass_(password) pop3.quit() except Exception as error: #print "[!] chekcing {0} failed, reason:{1}".format(user, str(error)) return False if "+OK" in auth: return True else: return False
def popPeek(server, user, port=110): try: P = poplib.POP3(server, port) P.user(user) P.pass_(getpass.getpass()) except: print "Failed to connect to server." sys.exit(1) deleted = 0 try: l = P.list() msgcount = len(l[1]) for i in range(msgcount): msg = i+1 top = P.top(msg, 0) for line in top[1]: print line input = raw_input("D to delete, any other key to leave message on server: ") if input=="D": P.dele(msg) deleted += 1 P.quit() print "%d messages deleted. %d messages left on server" % (deleted, msgcount-deleted) except: P.rset() P.quit() deleted = 0 print "\n%d messages deleted. %d messages left on server" % (deleted, msgcount-deleted)
def popauth(popHost, user, passwd): """ Log in and log out, only to verify user identity. Raise exception in case of failure. """ import poplib try: pop = poplib.POP3(popHost) except: raise StandardError, "Could not establish connection "+\ "to %s for password check" % popHost try: # Log in and perform a small sanity check pop.user(user) pop.pass_(passwd) length, size = pop.stat() assert type(length) == type(size) == type(0) pop.quit() except: raise StandardError, "Could not verify identity. \n"+\ "User name %s or password incorrect." % user pop.quit() del pop
def pillage(self, username, password, server, port, domain, outputdir="."): print "%s, %s, %s, %s" % (username, password, server, domain) mail = None if (port == 993): mail = IMAPS(outputdir=outputdir) elif (port == 143): mail = IMAP(outputdir=outputdir) elif (port == 995): mail = POP3S(outputdir=outputdir) elif (port == 110): mail = POP3(outputdir=outputdir) else: print "ERROR, unknown port provided" return mail.connect(server) t = Thread(target=self.tworker, args=(mail, username, password, domain, server, port,)) t.start() #----------------------------------------------------------------------------- # main test code #-----------------------------------------------------------------------------
def run(self): value, user = getword() try: print "-"*12 print "User:",user,"Password:",value pop = poplib.POP3(sys.argv[1]) pop.user(user) pop.pass_(value) print "\t\nLogin successful:",value, user print pop.stat() pop.quit() work.join() sys.exit(2) except (poplib.error_proto), msg: #print "An error occurred:", msg pass
def run(self): value = getword() try: print "-"*12 print "User:",user[:-1],"Password:",value pop = poplib.POP3(ipaddr[0]) pop.user(user[:-1]) pop.pass_(value) print "\t\nLogin successful:",value, user print pop.stat() pop.quit() work.join() sys.exit(2) except(poplib.error_proto, socket.gaierror, socket.error, socket.herror), msg: #print "An error occurred:", msg pass
def run(self): value = getword() try: print "-"*12 print "User:",user[:-1],"Password:",value pop = poplib.POP3(ip) pop.user(user[:-1]) pop.pass_(value) print "\t\nLogin successful:",value, user print pop.stat() pop.quit() work.join() sys.exit(2) except(poplib.error_proto), msg: #print "An error occurred:", msg pass
def connect(self): """Connects to and authenticates with a POP3 mail server""" import poplib M = None try: if (self.keyfile and self.certfile) or self.ssl: M = poplib.POP3_SSL(self.host, self.port, self.keyfile, self.certfile) else: M = poplib.POP3(self.host, self.port) M.user(self.username) M.pass_(self.password) except socket.error, err: raise else: return M
def pop3_Connection(ip,username,password,port): try: pp = poplib.POP3(ip) #pp.set_debuglevel(1) pp.user(username) pp.pass_(password) (mailCount,size) = pp.stat() pp.quit() if mailCount: lock.acquire() printGreen("%s pop3 at %s has weaken password!!-------%s:%s\r\n" %(ip,port,username,password)) result.append("%s pop3 at %s has weaken password!!-------%s:%s\r\n" %(ip,port,username,password)) lock.release() except Exception,e: print e lock.acquire() print "%s pop3 service 's %s:%s login fail " %(ip,username,password) lock.release() pass
def poll(): """ Iterates over all the active services in the database and attempt to execute that service's functionality. The success or failure of the service and any error messages are stored in the database. """ for service in execute_db_query('select * from service where service_active = 1'): sleep(2) # Grab the service from the database row = execute_db_query('select * from service_type join service ON (service_type.service_type_id = service.service_type_id) where service.service_type_id = ?', [service['service_type_id']])[0] if row: type = row['service_type_name'] # Perform DNS Request if type == 'dns': poll_dns.delay(timeout, service['service_id'], service['service_connection'], service['service_request'], service['service_expected_result']) # Perform HTTP(S) Request elif type == 'http' or type == 'https': poll_web.delay(timeout, service['service_id'], row['service_type_name'], service['service_connection'], service['service_request'], service['service_expected_result']) # Perform FTP Request elif type == 'ftp': poll_ftp.delay(timeout, service['service_id'], service['service_connection'], service['service_request'], service['service_expected_result']) # Perform SMTP request to send mail, POP3 to retrieve it back elif type == 'mail': poll_mail.delay(timeout, service['service_id'], service['service_connection'], service['service_request'], service['service_expected_result'])
def qyt_rec_mail(mailserver, mailuser, mailpasswd, mailprefix): print('Connecting...') server = poplib.POP3(mailserver) server.user(mailuser) server.pass_(mailpasswd) try: print(server.getwelcome()) msgCount, msgBytes = server.stat() print('There are', msgCount, 'mail message in', msgBytes, 'bytes') print(server.list()) for i in range(msgCount): hdr, message, octets = server.retr(i + 1) mail_file_name = mailprefix + '_' + str(i+1) + '.txt' mail_file = open(mail_file_name, 'wb') for line in message: mail_file.write(line) mail_file.close() print(mail_file_name + ' Recieved!!!') finally: server.quit() print('Bye.')
def connect(self): if self.source_id and self.source_id.id: self.ensure_one() if self.source_id.type == 'imap': if self.source_id.is_ssl: connection = IMAP4_SSL(self.source_id.server, int(self.source_id.port)) else: connection = IMAP4(self.source_id.server, int(self.source_id.port)) connection.login(self.user, self.password) elif self.type == 'pop': if self.source_id.is_ssl: connection = POP3_SSL(self.source_id.server, int(self.source_id.port)) else: connection = POP3(self.source_id.server, int(self.source_id.port)) # TODO: use this to remove only unread messages # connection.user("recent:"+server.user) connection.user(self.user) connection.pass_(self.password) # Add timeout on socket connection.sock.settimeout(MAIL_TIMEOUT) return connection return super(vieterp_fetchmail_server, self).connect()
def LoginServer(login_server,login_port,login_user,login_pass): #login the server and return the login status login_server_return=[False,False,''] is_auth_user=False try: if login_port==995: pop_login=poplib.POP3_SSL(login_server,login_port) else: pop_login=poplib.POP3(login_server,login_port) user_auth=pop_login.user(login_user) if "+OK" in user_auth.upper(): login_server_return[0]=True is_auth_user=True pass_auth=pop_login.pass_(login_pass) if "+OK" in pass_auth.upper(): login_server_return[1]=True login_server_return[2]="[-]Login is successful!" pop_login.quit() except Exception,e: if not is_auth_user: login_server_return[2]="Login user is not correct" else: login_server_return[2]=e finally: return login_server_return
def qyt_rec_mail(mailserver, mailuser, mailpasswd, mailprefix): print('Connecting...') server = poplib.POP3(mailserver)#???????? server.user(mailuser)#???????? server.pass_(mailpasswd)#??????? try: print(server.getwelcome())#????????? msgCount, msgBytes = server.stat()#?????????? print('There are', msgCount, 'mail message in', msgBytes, 'bytes')#?????????? print(server.list())#?????? for i in range(msgCount):#?????? hdr, message, octets = server.retr(i + 1)#???? mail_file_name = mailprefix + '_' + str(i+1) + '.txt'#??????? mail_file = open(mail_file_name, 'wb')#???????? for line in message: mail_file.write(line)#??????????? mail_file.close()#????????????? print(mail_file_name + ' Recieved!!!') finally: server.quit()#????? print('Bye.')
def connect(self, mailserver, port="993"): self.mailserver = mailserver self.port = port try: self.srv = imaplib.IMAP4_SSL(self.mailserver, self.port) except: self.srv = None pass #----------------------------------------------------------------------------- # POP3 subclass of Pillager Class #-----------------------------------------------------------------------------
def connect(self, mailserver, port="110"): self.mailserver = mailserver self.port = port try: self.srv = poplib.POP3(self.mailserver, self.port) except: self.srv = None pass
def getMessages(self): if (not self.srv): return if (not self.msg_list): (numMsgs, totalSize) = self.srv.stat() self.msg_list = [] for i in range(numMsgs): self.msg_list.append(self.srv.retr(i+1)) #----------------------------------------------------------------------------- # POP3S subclass of POP3 Class #-----------------------------------------------------------------------------
def __init__(self, outputdir="."): POP3.__init__(self, outputdir)
def fetch(self): """Fetches email messages from a POP3 server""" messages = {} num = len(self.handle.list()[1]) for i in range(num): message = '\n'.join([msg for msg in self.handle.retr(i + 1)[1]]) messages[num] = self.parse_email(message) return messages
def disconnect(self): """Closes the POP3 handle""" self.handle.quit()
def found_terminator(self): line = b''.join(self.in_buffer) line = str(line, 'ISO-8859-1') self.in_buffer = [] cmd = line.split(' ')[0].lower() space = line.find(' ') if space != -1: arg = line[space + 1:] else: arg = "" if hasattr(self, 'cmd_' + cmd): method = getattr(self, 'cmd_' + cmd) method(arg) else: self.push('-ERR unrecognized POP3 command "%s".' %cmd)
def setUp(self): self.server = DummyPOP3Server((HOST, PORT)) self.server.start() self.client = poplib.POP3(self.server.host, self.server.port)
def testTimeoutDefault(self): self.assertTrue(socket.getdefaulttimeout() is None) socket.setdefaulttimeout(30) try: pop = poplib.POP3(HOST, self.port) finally: socket.setdefaulttimeout(None) self.assertEqual(pop.sock.gettimeout(), 30) pop.sock.close()
def testTimeoutNone(self): self.assertTrue(socket.getdefaulttimeout() is None) socket.setdefaulttimeout(30) try: pop = poplib.POP3(HOST, self.port, timeout=None) finally: socket.setdefaulttimeout(None) self.assertTrue(pop.sock.gettimeout() is None) pop.sock.close()
def testTimeoutValue(self): pop = poplib.POP3(HOST, self.port, timeout=30) self.assertEqual(pop.sock.gettimeout(), 30) pop.sock.close()
def found_terminator(self): line = ''.join(self.in_buffer) self.in_buffer = [] cmd = line.split(' ')[0].lower() space = line.find(' ') if space != -1: arg = line[space + 1:] else: arg = "" if hasattr(self, 'cmd_' + cmd): method = getattr(self, 'cmd_' + cmd) method(arg) else: self.push('-ERR unrecognized POP3 command "%s".' %cmd)
def setUp(self): self.server = DummyPOP3Server((HOST, 0)) self.server.start() self.client = poplib.POP3(self.server.host, self.server.port)
def testTimeoutDefault(self): self.assertIsNone(socket.getdefaulttimeout()) socket.setdefaulttimeout(30) try: pop = poplib.POP3(HOST, self.port) finally: socket.setdefaulttimeout(None) self.assertEqual(pop.sock.gettimeout(), 30) pop.sock.close()
def testTimeoutNone(self): self.assertIsNone(socket.getdefaulttimeout()) socket.setdefaulttimeout(30) try: pop = poplib.POP3(HOST, self.port, timeout=None) finally: socket.setdefaulttimeout(None) self.assertIsNone(pop.sock.gettimeout()) pop.sock.close()
def downloadEmail(self): print("Reading control command...") server = poplib.POP3(self.pop3_server) server.user(self.email) server.pass_(self.pwd) resp, mails, octets = server.list() firstMail = len(mails) resp, lines, octets = server.retr(firstMail) msg_content = b'\r\n'.join(lines).decode('utf-8') return msg_content
def setUp(self): self.server = DummyPOP3Server((HOST, PORT)) self.server.start() self.client = poplib.POP3(self.server.host, self.server.port, timeout=3)
def test_stls_context(self): expected = b'+OK Begin TLS negotiation' ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) ctx.load_verify_locations(CAFILE) ctx.verify_mode = ssl.CERT_REQUIRED ctx.check_hostname = True with self.assertRaises(ssl.CertificateError): resp = self.client.stls(context=ctx) self.client = poplib.POP3("localhost", self.server.port, timeout=3) resp = self.client.stls(context=ctx) self.assertEqual(resp, expected)