我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用email.message_from_bytes()。
def test_8bit_in_quopri_body(self): # This is non-RFC compliant data...without 'decode' the library code # decodes the body using the charset from the headers, and because the # source byte really is utf-8 this works. This is likely to fail # against real dirty data (ie: produce mojibake), but the data is # invalid anyway so it is as good a guess as any. But this means that # this test just confirms the current behavior; that behavior is not # necessarily the best possible behavior. With 'decode' it is # returning the raw bytes, so that test should be of correct behavior, # or at least produce the same result that email4 did. m = self.bodytest_msg.format(charset='utf-8', cte='quoted-printable', bodyline='p=C3=B6stál').encode('utf-8') msg = email.message_from_bytes(m) self.assertEqual(msg.get_payload(), 'p=C3=B6stál\n') self.assertEqual(msg.get_payload(decode=True), 'pöstál\n'.encode('utf-8'))
def __init__(self, message=None): """Initialize a Message instance.""" if isinstance(message, email.message.Message): self._become_message(copy.deepcopy(message)) if isinstance(message, Message): message._explain_to(self) elif isinstance(message, bytes): self._become_message(email.message_from_bytes(message)) elif isinstance(message, str): self._become_message(email.message_from_string(message)) elif isinstance(message, io.TextIOWrapper): self._become_message(email.message_from_file(message)) elif hasattr(message, "read"): self._become_message(email.message_from_binary_file(message)) elif message is None: email.message.Message.__init__(self) else: raise TypeError('Invalid message type: %s' % type(message))
def getSubject(self, i): conn = self.conn id_list = conn.search(None, '(UNSEEN)')[1][0].split() try: email_id = id_list[i] except IndexError: return None data = conn.fetch(email_id, '(BODY.PEEK[HEADER.FIELDS (SUBJECT)])')[1] if not PY3: msg = message_from_string(data[0][1]) s, encoding = decode_header(msg['Subject'])[0] subject = s.decode(encoding or 'utf-8').encode('utf-8') else: msg = message_from_bytes(data[0][1]) s, encoding = decode_header(msg['Subject'])[0] subject = s if type(s) is str else s.decode(encoding or 'utf-8') return subject
def test_binary_body_with_encode_7or8bit(self): # Issue 17171. bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff' msg = MIMEApplication(bytesdata, _encoder=encoders.encode_7or8bit) # Treated as a string, this will be invalid code points. self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata)) self.assertEqual(msg.get_payload(decode=True), bytesdata) self.assertEqual(msg['Content-Transfer-Encoding'], '8bit') s = BytesIO() g = BytesGenerator(s) g.flatten(msg) wireform = s.getvalue() msg2 = email.message_from_bytes(wireform) self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata)) self.assertEqual(msg2.get_payload(decode=True), bytesdata) self.assertEqual(msg2['Content-Transfer-Encoding'], '8bit')
def test_binary_body_with_encode_noop(self): # Issue 16564: This does not produce an RFC valid message, since to be # valid it should have a CTE of binary. But the below works in # Python2, and is documented as working this way. bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff' msg = MIMEApplication(bytesdata, _encoder=encoders.encode_noop) # Treated as a string, this will be invalid code points. self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata)) self.assertEqual(msg.get_payload(decode=True), bytesdata) s = BytesIO() g = BytesGenerator(s) g.flatten(msg) wireform = s.getvalue() msg2 = email.message_from_bytes(wireform) self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata)) self.assertEqual(msg2.get_payload(decode=True), bytesdata)
def test_invalid_8bit_in_non_8bit_cte_uses_replace(self): # This is similar to the previous test, but proves that if the 8bit # byte is undecodeable in the specified charset, it gets replaced # by the unicode 'unknown' character. Again, this may or may not # be the ideal behavior. Note that if decode=False none of the # decoders will get involved, so this is the only test we need # for this behavior. m = self.bodytest_msg.format(charset='ascii', cte='quoted-printable', bodyline='p=C3=B6stál').encode('utf-8') msg = email.message_from_bytes(m) self.assertEqual(msg.get_payload(), 'p=C3=B6st\uFFFD\uFFFDl\n') self.assertEqual(msg.get_payload(decode=True), 'pöstál\n'.encode('utf-8')) # test_defect_handling:test_invalid_chars_in_base64_payload
def append_literal(self, data): tag, mailbox_name, size = self.append_literal_command if data == CRLF: if 'UIDPLUS' in self.capabilities: self.send_tagged_line(tag, 'OK [APPENDUID %s %s] APPEND completed.' % (self.uidvalidity, self.server_state.max_uid(self.user_login, mailbox_name))) else: self.send_tagged_line(tag, 'OK APPEND completed.') self.append_literal_command = None return if len(data) != size: self.send_tagged_line(self.append_literal_command[0], 'BAD literal length : expected %s but was %s' % (size, len(data))) self.append_literal_command = None else: m = email.message_from_bytes(data) self.server_state.add_mail(self.user_login, Mail(m), mailbox_name)
def updateTopic(self, topic): if not self.initialized: return [] if time.time() - self.time > 60.: self.connect() try: (_, _, first, last, _) = self.conn.group(topic) except Exception as e: print(e, datetime.datetime.now()) traceback.print_exc() self.connect() return start = max(self.groups[topic] + 1, first) res = [] headers = ("From", "Newsgroups", "Subject", "Date") registry = HeaderRegistry() registry.map_to_type('From', UnstructuredHeader) policy = EmailPolicy(header_factory=registry) while start <= last: try: artic = self.conn.article(start)[1] raw_msg = b'\r\n'.join(artic.lines) mime_msg = email.message_from_bytes(raw_msg, policy=policy) hdr = "<code>" for h in headers: hdr += "%s: %s\r\n" % (h, html.escape(mime_msg[h])) content = "" for part in mime_msg.walk(): if part.get_content_type() == 'text/plain': content += html.escape(part.get_content()) is_plus_one = isPlusOne(content) hdr += "%s: %s\r\n" % ("is_plus_one", is_plus_one) hdr += "</code>\r\n" res.append([is_plus_one, hdr + content]) except Exception as e: print(e, datetime.datetime.now()) traceback.print_exc() start += 1 self.groups[topic] = last
def parse_headers(headers): if PY3: headers = email.message_from_bytes(headers) else: headers = email.message_from_string(headers) return dict(headers.items())
def test__all__(self): module = __import__('email') # Can't use sorted() here due to Python 2.3 compatibility all = module.__all__[:] all.sort() self.assertEqual(all, [ 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'message_from_binary_file', 'message_from_bytes', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quoprimime', 'utils', ])
def test_known_8bit_CTE(self): m = self.bodytest_msg.format(charset='utf-8', cte='8bit', bodyline='pöstal').encode('utf-8') msg = email.message_from_bytes(m) self.assertEqual(msg.get_payload(), "pöstal\n") self.assertEqual(msg.get_payload(decode=True), "pöstal\n".encode('utf-8'))
def test_unknown_8bit_CTE(self): m = self.bodytest_msg.format(charset='notavalidcharset', cte='8bit', bodyline='pöstal').encode('utf-8') msg = email.message_from_bytes(m) self.assertEqual(msg.get_payload(), "p\uFFFD\uFFFDstal\n") self.assertEqual(msg.get_payload(decode=True), "pöstal\n".encode('utf-8'))
def test_invalid_8bit_in_non_8bit_cte_uses_replace(self): # This is similar to the previous test, but proves that if the 8bit # byte is undecodeable in the specified charset, it gets replaced # by the unicode 'unknown' character. Again, this may or may not # be the ideal behavior. Note that if decode=False none of the # decoders will get involved, so this is the only test we need # for this behavior. m = self.bodytest_msg.format(charset='ascii', cte='quoted-printable', bodyline='p=C3=B6stál').encode('utf-8') msg = email.message_from_bytes(m) self.assertEqual(msg.get_payload(), 'p=C3=B6st\uFFFD\uFFFDl\n') self.assertEqual(msg.get_payload(decode=True), 'pöstál\n'.encode('utf-8'))
def test_8bit_in_uuencode_body(self): # Sticking an 8bit byte in a uuencode block makes it undecodable by # normal means, so the block is returned undecoded, but as bytes. m = self.bodytest_msg.format(charset='utf-8', cte='uuencode', bodyline='<,.V<W1A; á ').encode('utf-8') msg = email.message_from_bytes(m) self.assertEqual(msg.get_payload(decode=True), '<,.V<W1A; á \n'.encode('utf-8'))
def test_get_8bit_header(self): msg = email.message_from_bytes(self.headertest_msg) self.assertEqual(str(msg.get('to')), 'b\uFFFD\uFFFDz') self.assertEqual(str(msg['to']), 'b\uFFFD\uFFFDz')
def test_print_8bit_headers(self): msg = email.message_from_bytes(self.headertest_msg) self.assertEqual(str(msg), textwrap.dedent("""\ From: {} To: {} Subject: {} From: {} Yes, they are flying. """).format(*[expected[1] for (_, expected) in self.headertest_headers]))
def test_get_content_type_with_8bit(self): msg = email.message_from_bytes(textwrap.dedent("""\ Content-Type: text/pl\xA7in; charset=utf-8 """).encode('latin-1')) self.assertEqual(msg.get_content_type(), "text/pl\uFFFDin") self.assertEqual(msg.get_content_maintype(), "text") self.assertEqual(msg.get_content_subtype(), "pl\uFFFDin")
def test_get_params_with_8bit(self): msg = email.message_from_bytes( 'X-Header: foo=\xa7ne; b\xa7r=two; baz=three\n'.encode('latin-1')) self.assertEqual(msg.get_params(header='x-header'), [('foo', '\uFFFDne'), ('b\uFFFDr', 'two'), ('baz', 'three')]) self.assertEqual(msg.get_param('Foo', header='x-header'), '\uFFFdne') # XXX: someday you might be able to get 'b\xa7r', for now you can't. self.assertEqual(msg.get_param('b\xa7r', header='x-header'), None)
def test_get_rfc2231_params_with_8bit(self): msg = email.message_from_bytes(textwrap.dedent("""\ Content-Type: text/plain; charset=us-ascii; title*=us-ascii'en'This%20is%20not%20f\xa7n""" ).encode('latin-1')) self.assertEqual(msg.get_param('title'), ('us-ascii', 'en', 'This is not f\uFFFDn'))
def test_set_rfc2231_params_with_8bit(self): msg = email.message_from_bytes(textwrap.dedent("""\ Content-Type: text/plain; charset=us-ascii; title*=us-ascii'en'This%20is%20not%20f\xa7n""" ).encode('latin-1')) msg.set_param('title', 'test') self.assertEqual(msg.get_param('title'), 'test')
def test_del_rfc2231_params_with_8bit(self): msg = email.message_from_bytes(textwrap.dedent("""\ Content-Type: text/plain; charset=us-ascii; title*=us-ascii'en'This%20is%20not%20f\xa7n""" ).encode('latin-1')) msg.del_param('title') self.assertEqual(msg.get_param('title'), None) self.assertEqual(msg.get_content_maintype(), 'text')
def test_bytes_generator(self): msg = email.message_from_bytes(self.non_latin_bin_msg) out = BytesIO() email.generator.BytesGenerator(out).flatten(msg) self.assertEqual(out.getvalue(), self.non_latin_bin_msg)
def test_generator_handles_8bit(self): msg = email.message_from_bytes(self.non_latin_bin_msg) out = StringIO() email.generator.Generator(out).flatten(msg) self.assertEqual(out.getvalue(), self.non_latin_bin_msg_as7bit_wrapped)
def test_bytes_generator_with_unix_from(self): # The unixfrom contains a current date, so we can't check it # literally. Just make sure the first word is 'From' and the # rest of the message matches the input. msg = email.message_from_bytes(self.non_latin_bin_msg) out = BytesIO() email.generator.BytesGenerator(out).flatten(msg, unixfrom=True) lines = out.getvalue().split(b'\n') self.assertEqual(lines[0].split()[0], b'From') self.assertEqual(b'\n'.join(lines[1:]), self.non_latin_bin_msg)
def test_string_generator_reencodes_to_quopri_when_appropriate(self): m = email.message_from_bytes(self.latin_bin_msg) self.assertEqual(str(m), self.latin_bin_msg_as7bit)
def test_decoded_generator_emits_unicode_body(self): m = email.message_from_bytes(self.latin_bin_msg) out = StringIO() email.generator.DecodedGenerator(out).flatten(m) #DecodedHeader output contains an extra blank line compared #to the input message. RDM: not sure if this is a bug or not, #but it is not specific to the 8bit->7bit conversion. self.assertEqual(out.getvalue(), self.latin_bin_msg.decode('latin-1')+'\n')
def _msgobj(self, filename): with openfile(filename, 'rb') as fp: data = fp.read() data = self.normalize_linesep_regex.sub(self.blinesep, data) msg = email.message_from_bytes(data) return msg, data
def get_string(self, key): """Return a string representation or raise a KeyError. Uses email.message.Message to create a 7bit clean string representation of the message.""" return email.message_from_bytes(self.get_bytes(key)).as_string()
def get_string(self, key, from_=False): """Return a string representation or raise a KeyError.""" return email.message_from_bytes( self.get_bytes(key)).as_string(unixfrom=from_)
def __init__(self, message_id: str, fetch_data): message_data, uid_data, flag_data = self._clean_message_data(fetch_data) self.id = message_id self.obj = email.message_from_bytes(message_data) self._uid_data = uid_data self._flag_data = flag_data
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): try: if len(rcpttos) > 1: raise SMTPError(554, 'Too many recipients') msg = email.message_from_bytes(data) text = self._get_text(msg) orig_msg = self._find_msg(rcpttos[0]) thread = orig_msg.thread # XXX: rawmsg can include multiple content-charsets and should be a binaryfield # as a workaround we convert to base64 thread.messages.filter( receiver=orig_msg.receiver).update(unread=False) thread_msg = thread.add_message(orig_msg.receiver, text, rawmsg=base64.b64encode(data), rawmsg_msgid=msg['Message-ID']) logger.info( 'Accepted email from {0} via {1} to {2} id {3} thread {4} orig_msg {5} message {6}'.format( mailfrom, orig_msg.receiver.email, orig_msg.sender.email, msg['Message-ID'], thread.pk, orig_msg.pk, thread_msg.pk)) except SMTPError as e: logger.info('Rejected email: {0}'.format(e)) return str(e) except Exception as e: logger.error('email raised exception: {0}'.format(e)) if self.store_exceptions: self.undeliverable_maildir.add(data) raise return '250 Ok'
def apply(self, envelope): if not envelope.recipients: raise QueueError('Missing recipient') recipient = envelope.recipients[0] if extract_domain(recipient) != settings.RETURNPATH_DOMAIN: raise QueueError('Domain not valid') message = email.message_from_bytes( envelope.flatten()[0] + envelope.flatten()[1]) # Check recipient prefix prefix_allowed = False for prefix in ['return-', 'unsubscribe-', 'abuse']: if recipient.startswith(prefix): prefix_allowed = True break if not prefix_allowed: raise QueueError('Prefix not allowed') # Find handler handler_found = False for handler in [DSNHandler, ARFHandler, UnsubscribeHandler]: if handler.can_handle(message): log.info('Handler {} can handle'.format(handler.__name__)) handler = handler(envelope, message) handler_found = True break if not handler_found: raise QueueError('No handler found') handler.apply()
def check(self, msg): """ @param msg : a string containing the message to check @return status, score, message """ msg_bytes = msg.encode() try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(2) s.connect((self.host, self.port)) s.sendall(b'PROCESS SPAMC/1.4') s.sendall(b"\r\n") s.sendall(("Content-length: %s" % len(msg_bytes)).encode()) s.sendall(b"\r\n") s.sendall(b"\r\n") s.sendall(msg_bytes) s.shutdown(1) socketfile = s.makefile("rb") line1_info = socketfile.readline() socketfile.readline() socketfile.readline() socketfile.readline() content = socketfile.read() if len(line1_info) == 0: raise SpamCheckerError("Spamd response is empty") answer = line1_info.strip().split() if len(answer) != 3: raise SpamCheckerError("Invalid Spamd answer: {}".format( answer)) (version, number, status) = answer if status.decode() != 'EX_OK': raise SpamCheckerError('Invalid Spamd status: {}'.format( status)) except (socket.error) as e: raise SpamCheckerError(e) from None else: # parse mail return SpamResult.from_headers(email.message_from_bytes(content))
def test_cte_type_7bit_handles_unknown_8bit(self): source = ("Subject: Maintenant je vous présente mon " "collègue\n\n").encode('utf-8') expected = ('Subject: Maintenant je vous =?unknown-8bit?q?' 'pr=C3=A9sente_mon_coll=C3=A8gue?=\n\n').encode('ascii') msg = message_from_bytes(source) s = io.BytesIO() g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit')) g.flatten(msg) self.assertEqual(s.getvalue(), expected)
def test_binary_body_with_encode_base64(self): bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff' msg = MIMEApplication(bytesdata, _encoder=encoders.encode_base64) self.assertEqual(msg.get_payload(), '+vv8/f7/\n') self.assertEqual(msg.get_payload(decode=True), bytesdata) s = BytesIO() g = BytesGenerator(s) g.flatten(msg) wireform = s.getvalue() msg2 = email.message_from_bytes(wireform) self.assertEqual(msg.get_payload(), '+vv8/f7/\n') self.assertEqual(msg2.get_payload(decode=True), bytesdata) # Test the basic MIMEText class
def test__all__(self): module = __import__('email') self.assertEqual(sorted(module.__all__), [ 'base64mime', 'charset', 'encoders', 'errors', 'feedparser', 'generator', 'header', 'iterators', 'message', 'message_from_binary_file', 'message_from_bytes', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quoprimime', 'utils', ])