1 # Copyright (C) 2002-2007 Python Software Foundation
2 # Author: Ben Gertzfield, Barry Warsaw
3 # Contact: email-sig@python.org
4 
5 """Header encoding and decoding functionality."""
6 
7 __all__ = [
8     'Header',
9     'decode_header',
10     'make_header',
11     ]
12 
13 import re
14 import binascii
15 
16 import email.quoprimime
17 import email.base64mime
18 
19 from email.errors import HeaderParseError
20 from email import charset as _charset
21 Charset = _charset.Charset
22 
23 NL = '\n'
24 SPACE = ' '
25 BSPACE = b' '
26 SPACE8 = ' ' * 8
27 EMPTYSTRING = ''
28 MAXLINELEN = 78
29 FWS = ' \t'
30 
31 USASCII = Charset('us-ascii')
32 UTF8 = Charset('utf-8')
33 
34 # Match encoded-word strings in the form =?charset?q?Hello_World?=
35 ecre = re.compile(r'''
36   =\?                   # literal =?
37   (?P<charset>[^?]*?)   # non-greedy up to the next ? is the charset
38   \?                    # literal ?
39   (?P<encoding>[qQbB])  # either a "q" or a "b", case insensitive
40   \?                    # literal ?
41   (?P<encoded>.*?)      # non-greedy up to the next ?= is the encoded string
42   \?=                   # literal ?=
43   ''', re.VERBOSE | re.MULTILINE)
44 
45 # Field name regexp, including trailing colon, but not separating whitespace,
46 # according to RFC 2822.  Character range is from tilde to exclamation mark.
47 # For use with .match()
48 fcre = re.compile(r'[\041-\176]+:$')
49 
50 # Find a header embedded in a putative header value.  Used to check for
51 # header injection attack.
52 _embedded_header = re.compile(r'\n[^ \t]+:')
53 
54 
55 
56 # Helpers
57 _max_append = email.quoprimime._max_append
58 
59 
60 
61 def decode_header(header):
62     """Decode a message header value without converting charset.
63 
64     Returns a list of (string, charset) pairs containing each of the decoded
65     parts of the header.  Charset is None for non-encoded parts of the header,
66     otherwise a lower-case string containing the name of the character set
67     specified in the encoded string.
68 
69     header may be a string that may or may not contain RFC2047 encoded words,
70     or it may be a Header object.
71 
72     An email.errors.HeaderParseError may be raised when certain decoding error
73     occurs (e.g. a base64 decoding exception).
74     """
75     # If it is a Header object, we can just return the encoded chunks.
76     if hasattr(header, '_chunks'):
77         return [(_charset._encode(string, str(charset)), str(charset))
78                     for string, charset in header._chunks]
79     # If no encoding, just return the header with no charset.
80     if not ecre.search(header):
81         return [(header, None)]
82     # First step is to parse all the encoded parts into triplets of the form
83     # (encoded_string, encoding, charset).  For unencoded strings, the last
84     # two parts will be None.
85     words = []
86     for line in header.splitlines():
87         parts = ecre.split(line)
88         first = True
89         while parts:
90             unencoded = parts.pop(0)
91             if first:
92                 unencoded = unencoded.lstrip()
93                 first = False
94             if unencoded:
95                 words.append((unencoded, None, None))
96             if parts:
97                 charset = parts.pop(0).lower()
98                 encoding = parts.pop(0).lower()
99                 encoded = parts.pop(0)
100                 words.append((encoded, encoding, charset))
101     # Now loop over words and remove words that consist of whitespace
102     # between two encoded strings.
103     droplist = []
104     for n, w in enumerate(words):
105         if n>1 and w[1] and words[n-2][1] and words[n-1][0].isspace():
106             droplist.append(n-1)
107     for d in reversed(droplist):
108         del words[d]
109 
110     # The next step is to decode each encoded word by applying the reverse
111     # base64 or quopri transformation.  decoded_words is now a list of the
112     # form (decoded_word, charset).
113     decoded_words = []
114     for encoded_string, encoding, charset in words:
115         if encoding is None:
116             # This is an unencoded word.
117             decoded_words.append((encoded_string, charset))
118         elif encoding == 'q':
119             word = email.quoprimime.header_decode(encoded_string)
120             decoded_words.append((word, charset))
121         elif encoding == 'b':
122             paderr = len(encoded_string) % 4   # Postel's law: add missing padding
123             if paderr:
124                 encoded_string += '==='[:4 - paderr]
125             try:
126                 word = email.base64mime.decode(encoded_string)
127             except binascii.Error:
128                 raise HeaderParseError('Base64 decoding error')
129             else:
130                 decoded_words.append((word, charset))
131         else:
132             raise AssertionError('Unexpected encoding: ' + encoding)
133     # Now convert all words to bytes and collapse consecutive runs of
134     # similarly encoded words.
135     collapsed = []
136     last_word = last_charset = None
137     for word, charset in decoded_words:
138         if isinstance(word, str):
139             word = bytes(word, 'raw-unicode-escape')
140         if last_word is None:
141             last_word = word
142             last_charset = charset
143         elif charset != last_charset:
144             collapsed.append((last_word, last_charset))
145             last_word = word
146             last_charset = charset
147         elif last_charset is None:
148             last_word += BSPACE + word
149         else:
150             last_word += word
151     collapsed.append((last_word, last_charset))
152     return collapsed
153 
154 
155 
156 def make_header(decoded_seq, maxlinelen=None, header_name=None,
157                 continuation_ws=' '):
158     """Create a Header from a sequence of pairs as returned by decode_header()
159 
160     decode_header() takes a header value string and returns a sequence of
161     pairs of the format (decoded_string, charset) where charset is the string
162     name of the character set.
163 
164     This function takes one of those sequence of pairs and returns a Header
165     instance.  Optional maxlinelen, header_name, and continuation_ws are as in
166     the Header constructor.
167     """
168     h = Header(maxlinelen=maxlinelen, header_name=header_name,
169                continuation_ws=continuation_ws)
170     for s, charset in decoded_seq:
171         # None means us-ascii but we can simply pass it on to h.append()
172         if charset is not None and not isinstance(charset, Charset):
173             charset = Charset(charset)
174         h.append(s, charset)
175     return h
176 
177 
178 
179 class Header:
180     def __init__(self, s=None, charset=None,
181                  maxlinelen=None, header_name=None,
182                  continuation_ws=' ', errors='strict'):
183         """Create a MIME-compliant header that can contain many character sets.
184 
185         Optional s is the initial header value.  If None, the initial header
186         value is not set.  You can later append to the header with .append()
187         method calls.  s may be a byte string or a Unicode string, but see the
188         .append() documentation for semantics.
189 
190         Optional charset serves two purposes: it has the same meaning as the
191         charset argument to the .append() method.  It also sets the default
192         character set for all subsequent .append() calls that omit the charset
193         argument.  If charset is not provided in the constructor, the us-ascii
194         charset is used both as s's initial charset and as the default for
195         subsequent .append() calls.
196 
197         The maximum line length can be specified explicitly via maxlinelen. For
198         splitting the first line to a shorter value (to account for the field
199         header which isn't included in s, e.g. `Subject') pass in the name of
200         the field in header_name.  The default maxlinelen is 78 as recommended
201         by RFC 2822.
202 
203         continuation_ws must be RFC 2822 compliant folding whitespace (usually
204         either a space or a hard tab) which will be prepended to continuation
205         lines.
206 
207         errors is passed through to the .append() call.
208         """
209         if charset is None:
210             charset = USASCII
211         elif not isinstance(charset, Charset):
212             charset = Charset(charset)
213         self._charset = charset
214         self._continuation_ws = continuation_ws
215         self._chunks = []
216         if s is not None:
217             self.append(s, charset, errors)
218         if maxlinelen is None:
219             maxlinelen = MAXLINELEN
220         self._maxlinelen = maxlinelen
221         if header_name is None:
222             self._headerlen = 0
223         else:
224             # Take the separating colon and space into account.
225             self._headerlen = len(header_name) + 2
226 
227     def __str__(self):
228         """Return the string value of the header."""
229         self._normalize()
230         uchunks = []
231         lastcs = None
232         lastspace = None
233         for string, charset in self._chunks:
234             # We must preserve spaces between encoded and non-encoded word
235             # boundaries, which means for us we need to add a space when we go
236             # from a charset to None/us-ascii, or from None/us-ascii to a
237             # charset.  Only do this for the second and subsequent chunks.
238             # Don't add a space if the None/us-ascii string already has
239             # a space (trailing or leading depending on transition)
240             nextcs = charset
241             if nextcs == _charset.UNKNOWN8BIT:
242                 original_bytes = string.encode('ascii', 'surrogateescape')
243                 string = original_bytes.decode('ascii', 'replace')
244             if uchunks:
245                 hasspace = string and self._nonctext(string[0])
246                 if lastcs not in (None, 'us-ascii'):
247                     if nextcs in (None, 'us-ascii') and not hasspace:
248                         uchunks.append(SPACE)
249                         nextcs = None
250                 elif nextcs not in (None, 'us-ascii') and not lastspace:
251                     uchunks.append(SPACE)
252             lastspace = string and self._nonctext(string[-1])
253             lastcs = nextcs
254             uchunks.append(string)
255         return EMPTYSTRING.join(uchunks)
256 
257     # Rich comparison operators for equality only.  BAW: does it make sense to
258     # have or explicitly disable <, <=, >, >= operators?
259     def __eq__(self, other):
260         # other may be a Header or a string.  Both are fine so coerce
261         # ourselves to a unicode (of the unencoded header value), swap the
262         # args and do another comparison.
263         return other == str(self)
264 
265     def append(self, s, charset=None, errors='strict'):
266         """Append a string to the MIME header.
267 
268         Optional charset, if given, should be a Charset instance or the name
269         of a character set (which will be converted to a Charset instance).  A
270         value of None (the default) means that the charset given in the
271         constructor is used.
272 
273         s may be a byte string or a Unicode string.  If it is a byte string
274         (i.e. isinstance(s, str) is false), then charset is the encoding of
275         that byte string, and a UnicodeError will be raised if the string
276         cannot be decoded with that charset.  If s is a Unicode string, then
277         charset is a hint specifying the character set of the characters in
278         the string.  In either case, when producing an RFC 2822 compliant
279         header using RFC 2047 rules, the string will be encoded using the
280         output codec of the charset.  If the string cannot be encoded to the
281         output codec, a UnicodeError will be raised.
282 
283         Optional `errors' is passed as the errors argument to the decode
284         call if s is a byte string.
285         """
286         if charset is None:
287             charset = self._charset
288         elif not isinstance(charset, Charset):
289             charset = Charset(charset)
290         if not isinstance(s, str):
291             input_charset = charset.input_codec or 'us-ascii'
292             if input_charset == _charset.UNKNOWN8BIT:
293                 s = s.decode('us-ascii', 'surrogateescape')
294             else:
295                 s = s.decode(input_charset, errors)
296         # Ensure that the bytes we're storing can be decoded to the output
297         # character set, otherwise an early error is raised.
298         output_charset = charset.output_codec or 'us-ascii'
299         if output_charset != _charset.UNKNOWN8BIT:
300             try:
301                 s.encode(output_charset, errors)
302             except UnicodeEncodeError:
303                 if output_charset!='us-ascii':
304                     raise
305                 charset = UTF8
306         self._chunks.append((s, charset))
307 
308     def _nonctext(self, s):
309         """True if string s is not a ctext character of RFC822.
310         """
311         return s.isspace() or s in ('(', ')', '\\')
312 
313     def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'):
314         r"""Encode a message header into an RFC-compliant format.
315 
316         There are many issues involved in converting a given string for use in
317         an email header.  Only certain character sets are readable in most
318         email clients, and as header strings can only contain a subset of
319         7-bit ASCII, care must be taken to properly convert and encode (with
320         Base64 or quoted-printable) header strings.  In addition, there is a
321         75-character length limit on any given encoded header field, so
322         line-wrapping must be performed, even with double-byte character sets.
323 
324         Optional maxlinelen specifies the maximum length of each generated
325         line, exclusive of the linesep string.  Individual lines may be longer
326         than maxlinelen if a folding point cannot be found.  The first line
327         will be shorter by the length of the header name plus ": " if a header
328         name was specified at Header construction time.  The default value for
329         maxlinelen is determined at header construction time.
330 
331         Optional splitchars is a string containing characters which should be
332         given extra weight by the splitting algorithm during normal header
333         wrapping.  This is in very rough support of RFC 2822's `higher level
334         syntactic breaks':  split points preceded by a splitchar are preferred
335         during line splitting, with the characters preferred in the order in
336         which they appear in the string.  Space and tab may be included in the
337         string to indicate whether preference should be given to one over the
338         other as a split point when other split chars do not appear in the line
339         being split.  Splitchars does not affect RFC 2047 encoded lines.
340 
341         Optional linesep is a string to be used to separate the lines of
342         the value.  The default value is the most useful for typical
343         Python applications, but it can be set to \r\n to produce RFC-compliant
344         line separators when needed.
345         """
346         self._normalize()
347         if maxlinelen is None:
348             maxlinelen = self._maxlinelen
349         # A maxlinelen of 0 means don't wrap.  For all practical purposes,
350         # choosing a huge number here accomplishes that and makes the
351         # _ValueFormatter algorithm much simpler.
352         if maxlinelen == 0:
353             maxlinelen = 1000000
354         formatter = _ValueFormatter(self._headerlen, maxlinelen,
355                                     self._continuation_ws, splitchars)
356         lastcs = None
357         hasspace = lastspace = None
358         for string, charset in self._chunks:
359             if hasspace is not None:
360                 hasspace = string and self._nonctext(string[0])
361                 if lastcs not in (None, 'us-ascii'):
362                     if not hasspace or charset not in (None, 'us-ascii'):
363                         formatter.add_transition()
364                 elif charset not in (None, 'us-ascii') and not lastspace:
365                     formatter.add_transition()
366             lastspace = string and self._nonctext(string[-1])
367             lastcs = charset
368             hasspace = False
369             lines = string.splitlines()
370             if lines:
371                 formatter.feed('', lines[0], charset)
372             else:
373                 formatter.feed('', '', charset)
374             for line in lines[1:]:
375                 formatter.newline()
376                 if charset.header_encoding is not None:
377                     formatter.feed(self._continuation_ws, ' ' + line.lstrip(),
378                                    charset)
379                 else:
380                     sline = line.lstrip()
381                     fws = line[:len(line)-len(sline)]
382                     formatter.feed(fws, sline, charset)
383             if len(lines) > 1:
384                 formatter.newline()
385         if self._chunks:
386             formatter.add_transition()
387         value = formatter._str(linesep)
388         if _embedded_header.search(value):
389             raise HeaderParseError("header value appears to contain "
390                 "an embedded header: {!r}".format(value))
391         return value
392 
393     def _normalize(self):
394         # Step 1: Normalize the chunks so that all runs of identical charsets
395         # get collapsed into a single unicode string.
396         chunks = []
397         last_charset = None
398         last_chunk = []
399         for string, charset in self._chunks:
400             if charset == last_charset:
401                 last_chunk.append(string)
402             else:
403                 if last_charset is not None:
404                     chunks.append((SPACE.join(last_chunk), last_charset))
405                 last_chunk = [string]
406                 last_charset = charset
407         if last_chunk:
408             chunks.append((SPACE.join(last_chunk), last_charset))
409         self._chunks = chunks
410 
411 
412 
413 class _ValueFormatter:
414     def __init__(self, headerlen, maxlen, continuation_ws, splitchars):
415         self._maxlen = maxlen
416         self._continuation_ws = continuation_ws
417         self._continuation_ws_len = len(continuation_ws)
418         self._splitchars = splitchars
419         self._lines = []
420         self._current_line = _Accumulator(headerlen)
421 
422     def _str(self, linesep):
423         self.newline()
424         return linesep.join(self._lines)
425 
426     def __str__(self):
427         return self._str(NL)
428 
429     def newline(self):
430         end_of_line = self._current_line.pop()
431         if end_of_line != (' ', ''):
432             self._current_line.push(*end_of_line)
433         if len(self._current_line) > 0:
434             if self._current_line.is_onlyws() and self._lines:
435                 self._lines[-1] += str(self._current_line)
436             else:
437                 self._lines.append(str(self._current_line))
438         self._current_line.reset()
439 
440     def add_transition(self):
441         self._current_line.push(' ', '')
442 
443     def feed(self, fws, string, charset):
444         # If the charset has no header encoding (i.e. it is an ASCII encoding)
445         # then we must split the header at the "highest level syntactic break"
446         # possible. Note that we don't have a lot of smarts about field
447         # syntax; we just try to break on semi-colons, then commas, then
448         # whitespace.  Eventually, this should be pluggable.
449         if charset.header_encoding is None:
450             self._ascii_split(fws, string, self._splitchars)
451             return
452         # Otherwise, we're doing either a Base64 or a quoted-printable
453         # encoding which means we don't need to split the line on syntactic
454         # breaks.  We can basically just find enough characters to fit on the
455         # current line, minus the RFC 2047 chrome.  What makes this trickier
456         # though is that we have to split at octet boundaries, not character
457         # boundaries but it's only safe to split at character boundaries so at
458         # best we can only get close.
459         encoded_lines = charset.header_encode_lines(string, self._maxlengths())
460         # The first element extends the current line, but if it's None then
461         # nothing more fit on the current line so start a new line.
462         try:
463             first_line = encoded_lines.pop(0)
464         except IndexError:
465             # There are no encoded lines, so we're done.
466             return
467         if first_line is not None:
468             self._append_chunk(fws, first_line)
469         try:
470             last_line = encoded_lines.pop()
471         except IndexError:
472             # There was only one line.
473             return
474         self.newline()
475         self._current_line.push(self._continuation_ws, last_line)
476         # Everything else are full lines in themselves.
477         for line in encoded_lines:
478             self._lines.append(self._continuation_ws + line)
479 
480     def _maxlengths(self):
481         # The first line's length.
482         yield self._maxlen - len(self._current_line)
483         while True:
484             yield self._maxlen - self._continuation_ws_len
485 
486     def _ascii_split(self, fws, string, splitchars):
487         # The RFC 2822 header folding algorithm is simple in principle but
488         # complex in practice.  Lines may be folded any place where "folding
489         # white space" appears by inserting a linesep character in front of the
490         # FWS.  The complication is that not all spaces or tabs qualify as FWS,
491         # and we are also supposed to prefer to break at "higher level
492         # syntactic breaks".  We can't do either of these without intimate
493         # knowledge of the structure of structured headers, which we don't have
494         # here.  So the best we can do here is prefer to break at the specified
495         # splitchars, and hope that we don't choose any spaces or tabs that
496         # aren't legal FWS.  (This is at least better than the old algorithm,
497         # where we would sometimes *introduce* FWS after a splitchar, or the
498         # algorithm before that, where we would turn all white space runs into
499         # single spaces or tabs.)
500         parts = re.split("(["+FWS+"]+)", fws+string)
501         if parts[0]:
502             parts[:0] = ['']
503         else:
504             parts.pop(0)
505         for fws, part in zip(*[iter(parts)]*2):
506             self._append_chunk(fws, part)
507 
508     def _append_chunk(self, fws, string):
509         self._current_line.push(fws, string)
510         if len(self._current_line) > self._maxlen:
511             # Find the best split point, working backward from the end.
512             # There might be none, on a long first line.
513             for ch in self._splitchars:
514                 for i in range(self._current_line.part_count()-1, 0, -1):
515                     if ch.isspace():
516                         fws = self._current_line[i][0]
517                         if fws and fws[0]==ch:
518                             break
519                     prevpart = self._current_line[i-1][1]
520                     if prevpart and prevpart[-1]==ch:
521                         break
522                 else:
523                     continue
524                 break
525             else:
526                 fws, part = self._current_line.pop()
527                 if self._current_line._initial_size > 0:
528                     # There will be a header, so leave it on a line by itself.
529                     self.newline()
530                     if not fws:
531                         # We don't use continuation_ws here because the whitespace
532                         # after a header should always be a space.
533                         fws = ' '
534                 self._current_line.push(fws, part)
535                 return
536             remainder = self._current_line.pop_from(i)
537             self._lines.append(str(self._current_line))
538             self._current_line.reset(remainder)
539 
540 
541 class _Accumulator(list):
542 
543     def __init__(self, initial_size=0):
544         self._initial_size = initial_size
545         super().__init__()
546 
547     def push(self, fws, string):
548         self.append((fws, string))
549 
550     def pop_from(self, i=0):
551         popped = self[i:]
552         self[i:] = []
553         return popped
554 
555     def pop(self):
556         if self.part_count()==0:
557             return ('', '')
558         return super().pop()
559 
560     def __len__(self):
561         return sum((len(fws)+len(part) for fws, part in self),
562                    self._initial_size)
563 
564     def __str__(self):
565         return EMPTYSTRING.join((EMPTYSTRING.join((fws, part))
566                                 for fws, part in self))
567 
568     def reset(self, startval=None):
569         if startval is None:
570             startval = []
571         self[:] = startval
572         self._initial_size = 0
573 
574     def is_onlyws(self):
575         return self._initial_size==0 and (not self or str(self).isspace())
576 
577     def part_count(self):
578         return super().__len__()
579