0
0
mirror of https://github.com/python/cpython.git synced 2024-11-21 21:09:37 +01:00
cpython/Lib/test/test_email/test_policy.py
Petr Viktorin 0976339818
gh-121650: Encode newlines in headers, and verify headers are sound (GH-122233)
## Encode header parts that contain newlines

Per RFC 2047:

> [...] these encoding schemes allow the
> encoding of arbitrary octet values, mail readers that implement this
> decoding should also ensure that display of the decoded data on the
> recipient's terminal will not cause unwanted side-effects

It seems that the "quoted-word" scheme is a valid way to include
a newline character in a header value, just like we already allow
undecodable bytes or control characters.
They do need to be properly quoted when serialized to text, though.


## Verify that email headers are well-formed

This should fail for custom fold() implementations that aren't careful
about newlines.


Co-authored-by: Bas Bloemsaat <bas@bloemsaat.org>
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
2024-07-31 00:19:48 +02:00

430 lines
17 KiB
Python

import io
import types
import textwrap
import unittest
import email.errors
import email.policy
import email.parser
import email.generator
import email.message
from email import headerregistry
def make_defaults(base_defaults, differences):
defaults = base_defaults.copy()
defaults.update(differences)
return defaults
class PolicyAPITests(unittest.TestCase):
longMessage = True
# Base default values.
compat32_defaults = {
'max_line_length': 78,
'linesep': '\n',
'cte_type': '8bit',
'raise_on_defect': False,
'mangle_from_': True,
'message_factory': None,
'verify_generated_headers': True,
}
# These default values are the ones set on email.policy.default.
# If any of these defaults change, the docs must be updated.
policy_defaults = compat32_defaults.copy()
policy_defaults.update({
'utf8': False,
'raise_on_defect': False,
'header_factory': email.policy.EmailPolicy.header_factory,
'refold_source': 'long',
'content_manager': email.policy.EmailPolicy.content_manager,
'mangle_from_': False,
'message_factory': email.message.EmailMessage,
})
# For each policy under test, we give here what we expect the defaults to
# be for that policy. The second argument to make defaults is the
# difference between the base defaults and that for the particular policy.
new_policy = email.policy.EmailPolicy()
policies = {
email.policy.compat32: make_defaults(compat32_defaults, {}),
email.policy.default: make_defaults(policy_defaults, {}),
email.policy.SMTP: make_defaults(policy_defaults,
{'linesep': '\r\n'}),
email.policy.SMTPUTF8: make_defaults(policy_defaults,
{'linesep': '\r\n',
'utf8': True}),
email.policy.HTTP: make_defaults(policy_defaults,
{'linesep': '\r\n',
'max_line_length': None}),
email.policy.strict: make_defaults(policy_defaults,
{'raise_on_defect': True}),
new_policy: make_defaults(policy_defaults, {}),
}
# Creating a new policy creates a new header factory. There is a test
# later that proves this.
policies[new_policy]['header_factory'] = new_policy.header_factory
def test_defaults(self):
for policy, expected in self.policies.items():
for attr, value in expected.items():
with self.subTest(policy=policy, attr=attr):
self.assertEqual(getattr(policy, attr), value,
("change {} docs/docstrings if defaults have "
"changed").format(policy))
def test_all_attributes_covered(self):
for policy, expected in self.policies.items():
for attr in dir(policy):
with self.subTest(policy=policy, attr=attr):
if (attr.startswith('_') or
isinstance(getattr(email.policy.EmailPolicy, attr),
types.FunctionType)):
continue
else:
self.assertIn(attr, expected,
"{} is not fully tested".format(attr))
def test_abc(self):
with self.assertRaises(TypeError) as cm:
email.policy.Policy()
msg = str(cm.exception)
abstract_methods = ('fold',
'fold_binary',
'header_fetch_parse',
'header_source_parse',
'header_store_parse')
for method in abstract_methods:
self.assertIn(method, msg)
def test_policy_is_immutable(self):
for policy, defaults in self.policies.items():
for attr in defaults:
with self.assertRaisesRegex(AttributeError, attr+".*read-only"):
setattr(policy, attr, None)
with self.assertRaisesRegex(AttributeError, 'no attribute.*foo'):
policy.foo = None
def test_set_policy_attrs_when_cloned(self):
# None of the attributes has a default value of None, so we set them
# all to None in the clone call and check that it worked.
for policyclass, defaults in self.policies.items():
testattrdict = {attr: None for attr in defaults}
policy = policyclass.clone(**testattrdict)
for attr in defaults:
self.assertIsNone(getattr(policy, attr))
def test_reject_non_policy_keyword_when_called(self):
for policyclass in self.policies:
with self.assertRaises(TypeError):
policyclass(this_keyword_should_not_be_valid=None)
with self.assertRaises(TypeError):
policyclass(newtline=None)
def test_policy_addition(self):
expected = self.policy_defaults.copy()
p1 = email.policy.default.clone(max_line_length=100)
p2 = email.policy.default.clone(max_line_length=50)
added = p1 + p2
expected.update(max_line_length=50)
for attr, value in expected.items():
self.assertEqual(getattr(added, attr), value)
added = p2 + p1
expected.update(max_line_length=100)
for attr, value in expected.items():
self.assertEqual(getattr(added, attr), value)
added = added + email.policy.default
for attr, value in expected.items():
self.assertEqual(getattr(added, attr), value)
def test_fold_utf8(self):
expected_ascii = 'Subject: =?utf-8?q?=C3=A1?=\n'
expected_utf8 = 'Subject: á\n'
msg = email.message.EmailMessage()
s = 'á'
msg['Subject'] = s
p_ascii = email.policy.default.clone()
p_utf8 = email.policy.default.clone(utf8=True)
self.assertEqual(p_ascii.fold('Subject', msg['Subject']), expected_ascii)
self.assertEqual(p_utf8.fold('Subject', msg['Subject']), expected_utf8)
self.assertEqual(p_ascii.fold('Subject', s), expected_ascii)
self.assertEqual(p_utf8.fold('Subject', s), expected_utf8)
def test_fold_zero_max_line_length(self):
expected = 'Subject: =?utf-8?q?=C3=A1?=\n'
msg = email.message.EmailMessage()
msg['Subject'] = 'á'
p1 = email.policy.default.clone(max_line_length=0)
p2 = email.policy.default.clone(max_line_length=None)
self.assertEqual(p1.fold('Subject', msg['Subject']), expected)
self.assertEqual(p2.fold('Subject', msg['Subject']), expected)
def test_register_defect(self):
class Dummy:
def __init__(self):
self.defects = []
obj = Dummy()
defect = object()
policy = email.policy.EmailPolicy()
policy.register_defect(obj, defect)
self.assertEqual(obj.defects, [defect])
defect2 = object()
policy.register_defect(obj, defect2)
self.assertEqual(obj.defects, [defect, defect2])
class MyObj:
def __init__(self):
self.defects = []
class MyDefect(Exception):
pass
def test_handle_defect_raises_on_strict(self):
foo = self.MyObj()
defect = self.MyDefect("the telly is broken")
with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
email.policy.strict.handle_defect(foo, defect)
def test_handle_defect_registers_defect(self):
foo = self.MyObj()
defect1 = self.MyDefect("one")
email.policy.default.handle_defect(foo, defect1)
self.assertEqual(foo.defects, [defect1])
defect2 = self.MyDefect("two")
email.policy.default.handle_defect(foo, defect2)
self.assertEqual(foo.defects, [defect1, defect2])
class MyPolicy(email.policy.EmailPolicy):
defects = None
def __init__(self, *args, **kw):
super().__init__(*args, defects=[], **kw)
def register_defect(self, obj, defect):
self.defects.append(defect)
def test_overridden_register_defect_still_raises(self):
foo = self.MyObj()
defect = self.MyDefect("the telly is broken")
with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
self.MyPolicy(raise_on_defect=True).handle_defect(foo, defect)
def test_overridden_register_defect_works(self):
foo = self.MyObj()
defect1 = self.MyDefect("one")
my_policy = self.MyPolicy()
my_policy.handle_defect(foo, defect1)
self.assertEqual(my_policy.defects, [defect1])
self.assertEqual(foo.defects, [])
defect2 = self.MyDefect("two")
my_policy.handle_defect(foo, defect2)
self.assertEqual(my_policy.defects, [defect1, defect2])
self.assertEqual(foo.defects, [])
def test_default_header_factory(self):
h = email.policy.default.header_factory('Test', 'test')
self.assertEqual(h.name, 'Test')
self.assertIsInstance(h, headerregistry.UnstructuredHeader)
self.assertIsInstance(h, headerregistry.BaseHeader)
class Foo:
parse = headerregistry.UnstructuredHeader.parse
def test_each_Policy_gets_unique_factory(self):
policy1 = email.policy.EmailPolicy()
policy2 = email.policy.EmailPolicy()
policy1.header_factory.map_to_type('foo', self.Foo)
h = policy1.header_factory('foo', 'test')
self.assertIsInstance(h, self.Foo)
self.assertNotIsInstance(h, headerregistry.UnstructuredHeader)
h = policy2.header_factory('foo', 'test')
self.assertNotIsInstance(h, self.Foo)
self.assertIsInstance(h, headerregistry.UnstructuredHeader)
def test_clone_copies_factory(self):
policy1 = email.policy.EmailPolicy()
policy2 = policy1.clone()
policy1.header_factory.map_to_type('foo', self.Foo)
h = policy1.header_factory('foo', 'test')
self.assertIsInstance(h, self.Foo)
h = policy2.header_factory('foo', 'test')
self.assertIsInstance(h, self.Foo)
def test_new_factory_overrides_default(self):
mypolicy = email.policy.EmailPolicy()
myfactory = mypolicy.header_factory
newpolicy = mypolicy + email.policy.strict
self.assertEqual(newpolicy.header_factory, myfactory)
newpolicy = email.policy.strict + mypolicy
self.assertEqual(newpolicy.header_factory, myfactory)
def test_adding_default_policies_preserves_default_factory(self):
newpolicy = email.policy.default + email.policy.strict
self.assertEqual(newpolicy.header_factory,
email.policy.EmailPolicy.header_factory)
self.assertEqual(newpolicy.__dict__, {'raise_on_defect': True})
def test_non_ascii_chars_do_not_cause_inf_loop(self):
policy = email.policy.default.clone(max_line_length=20)
actual = policy.fold('Subject', 'ą' * 12)
self.assertEqual(
actual,
'Subject: \n' +
12 * ' =?utf-8?q?=C4=85?=\n')
def test_short_maxlen_error(self):
# RFC 2047 chrome takes up 7 characters, plus the length of the charset
# name, so folding should fail if maxlen is lower than the minimum
# required length for a line.
# Note: This is only triggered when there is a single word longer than
# max_line_length, hence the 1234567890 at the end of this whimsical
# subject. This is because when we encounter a word longer than
# max_line_length, it is broken down into encoded words to fit
# max_line_length. If the max_line_length isn't large enough to even
# contain the RFC 2047 chrome (`?=<charset>?q??=`), we fail.
subject = "Melt away the pounds with this one simple trick! 1234567890"
for maxlen in [3, 7, 9]:
with self.subTest(maxlen=maxlen):
policy = email.policy.default.clone(max_line_length=maxlen)
with self.assertRaises(email.errors.HeaderParseError):
policy.fold("Subject", subject)
def test_verify_generated_headers(self):
"""Turning protection off allows header injection"""
policy = email.policy.default.clone(verify_generated_headers=False)
for text in (
'Header: Value\r\nBad: Injection\r\n',
'Header: NoNewLine'
):
with self.subTest(text=text):
message = email.message_from_string(
"Header: Value\r\n\r\nBody",
policy=policy,
)
class LiteralHeader(str):
name = 'Header'
def fold(self, **kwargs):
return self
del message['Header']
message['Header'] = LiteralHeader(text)
self.assertEqual(
message.as_string(),
f"{text}\nBody",
)
# XXX: Need subclassing tests.
# For adding subclassed objects, make sure the usual rules apply (subclass
# wins), but that the order still works (right overrides left).
class TestException(Exception):
pass
class TestPolicyPropagation(unittest.TestCase):
# The abstract methods are used by the parser but not by the wrapper
# functions that call it, so if the exception gets raised we know that the
# policy was actually propagated all the way to feedparser.
class MyPolicy(email.policy.Policy):
def badmethod(self, *args, **kw):
raise TestException("test")
fold = fold_binary = header_fetch_parser = badmethod
header_source_parse = header_store_parse = badmethod
def test_message_from_string(self):
with self.assertRaisesRegex(TestException, "^test$"):
email.message_from_string("Subject: test\n\n",
policy=self.MyPolicy)
def test_message_from_bytes(self):
with self.assertRaisesRegex(TestException, "^test$"):
email.message_from_bytes(b"Subject: test\n\n",
policy=self.MyPolicy)
def test_message_from_file(self):
f = io.StringIO('Subject: test\n\n')
with self.assertRaisesRegex(TestException, "^test$"):
email.message_from_file(f, policy=self.MyPolicy)
def test_message_from_binary_file(self):
f = io.BytesIO(b'Subject: test\n\n')
with self.assertRaisesRegex(TestException, "^test$"):
email.message_from_binary_file(f, policy=self.MyPolicy)
# These are redundant, but we need them for black-box completeness.
def test_parser(self):
p = email.parser.Parser(policy=self.MyPolicy)
with self.assertRaisesRegex(TestException, "^test$"):
p.parsestr('Subject: test\n\n')
def test_bytes_parser(self):
p = email.parser.BytesParser(policy=self.MyPolicy)
with self.assertRaisesRegex(TestException, "^test$"):
p.parsebytes(b'Subject: test\n\n')
# Now that we've established that all the parse methods get the
# policy in to feedparser, we can use message_from_string for
# the rest of the propagation tests.
def _make_msg(self, source='Subject: test\n\n', policy=None):
self.policy = email.policy.default.clone() if policy is None else policy
return email.message_from_string(source, policy=self.policy)
def test_parser_propagates_policy_to_message(self):
msg = self._make_msg()
self.assertIs(msg.policy, self.policy)
def test_parser_propagates_policy_to_sub_messages(self):
msg = self._make_msg(textwrap.dedent("""\
Subject: mime test
MIME-Version: 1.0
Content-Type: multipart/mixed, boundary="XXX"
--XXX
Content-Type: text/plain
test
--XXX
Content-Type: text/plain
test2
--XXX--
"""))
for part in msg.walk():
self.assertIs(part.policy, self.policy)
def test_message_policy_propagates_to_generator(self):
msg = self._make_msg("Subject: test\nTo: foo\n\n",
policy=email.policy.default.clone(linesep='X'))
s = io.StringIO()
g = email.generator.Generator(s)
g.flatten(msg)
self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX")
def test_message_policy_used_by_as_string(self):
msg = self._make_msg("Subject: test\nTo: foo\n\n",
policy=email.policy.default.clone(linesep='X'))
self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX")
class TestConcretePolicies(unittest.TestCase):
def test_header_store_parse_rejects_newlines(self):
instance = email.policy.EmailPolicy()
self.assertRaises(ValueError,
instance.header_store_parse,
'From', 'spam\negg@foo.py')
if __name__ == '__main__':
unittest.main()