Skip to content

Commit c9a5861

Browse files
gh-123803: Support arbitrary code page encodings on Windows
If the cpXXX encoding is not directly implemented in Python, fall back to use the Windows-specific API codecs.code_page_encode() and codecs.code_page_decode().
1 parent 033510e commit c9a5861

File tree

2 files changed

+143
-36
lines changed

2 files changed

+143
-36
lines changed

Lib/encodings/__init__.py

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -156,19 +156,50 @@ def search_function(encoding):
156156
codecs.register(search_function)
157157

158158
if sys.platform == 'win32':
159-
# bpo-671666, bpo-46668: If Python does not implement a codec for current
160-
# Windows ANSI code page, use the "mbcs" codec instead:
161-
# WideCharToMultiByte() and MultiByteToWideChar() functions with CP_ACP.
162-
# Python does not support custom code pages.
163-
def _alias_mbcs(encoding):
159+
def _code_page_search_function(encoding):
160+
encoding = encoding.lower()
161+
if not encoding.startswith('cp'):
162+
return None
164163
try:
165-
import _winapi
166-
ansi_code_page = "cp%s" % _winapi.GetACP()
167-
if encoding == ansi_code_page:
168-
import encodings.mbcs
169-
return encodings.mbcs.getregentry()
170-
except ImportError:
171-
# Imports may fail while we are shutting down
172-
pass
173-
174-
codecs.register(_alias_mbcs)
164+
cp = int(encoding[2:])
165+
except ValueError:
166+
return None
167+
# Test if the code page is supported
168+
try:
169+
codecs.code_page_encode(cp, 'x')
170+
except (OverflowError, OSError):
171+
return None
172+
173+
def encode(input, errors='strict'):
174+
return codecs.code_page_encode(cp, input, errors)
175+
176+
def decode(input, errors='strict'):
177+
return codecs.code_page_decode(cp, input, errors, True)
178+
179+
class IncrementalEncoder(codecs.IncrementalEncoder):
180+
def encode(self, input, final=False):
181+
return codecs.code_page_encode(cp, input, self.errors)[0]
182+
183+
class IncrementalDecoder(codecs.BufferedIncrementalDecoder):
184+
def _buffer_decode(self, input, errors, final):
185+
return codecs.code_page_decode(cp, input, errors, final)
186+
187+
class StreamWriter(codecs.StreamWriter):
188+
def encode(self, input, errors='strict'):
189+
return codecs.code_page_encode(cp, input, errors)
190+
191+
class StreamReader(codecs.StreamReader):
192+
def decode(self, input, errors, final):
193+
return codecs.code_page_decode(cp, input, errors, final)
194+
195+
return codecs.CodecInfo(
196+
name=f'cp{cp}',
197+
encode=encode,
198+
decode=decode,
199+
incrementalencoder=IncrementalEncoder,
200+
incrementaldecoder=IncrementalDecoder,
201+
streamreader=StreamReader,
202+
streamwriter=StreamWriter,
203+
)
204+
205+
codecs.register(_code_page_search_function)

Lib/test/test_codecs.py

Lines changed: 97 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3263,7 +3263,11 @@ def test_code_page_name(self):
32633263
codecs.code_page_decode, self.CP_UTF8, b'\xff', 'strict', True)
32643264

32653265
def check_decode(self, cp, tests):
3266-
for raw, errors, expected in tests:
3266+
for raw, errors, expected, *rest in tests:
3267+
if rest:
3268+
altexpected, = rest
3269+
else:
3270+
altexpected = expected
32673271
if expected is not None:
32683272
try:
32693273
decoded = codecs.code_page_decode(cp, raw, errors, True)
@@ -3280,8 +3284,21 @@ def check_decode(self, cp, tests):
32803284
self.assertRaises(UnicodeDecodeError,
32813285
codecs.code_page_decode, cp, raw, errors, True)
32823286

3287+
if altexpected is not None:
3288+
decoded = raw.decode(f'cp{cp}', errors)
3289+
self.assertEqual(decoded, altexpected,
3290+
'%a.decode("cp%s", %r)=%a != %a'
3291+
% (raw, cp, errors, decoded, altexpected))
3292+
else:
3293+
self.assertRaises(UnicodeDecodeError,
3294+
raw.decode, f'cp{cp}', errors)
3295+
32833296
def check_encode(self, cp, tests):
3284-
for text, errors, expected in tests:
3297+
for text, errors, expected, *rest in tests:
3298+
if rest:
3299+
altexpected, = rest
3300+
else:
3301+
altexpected = expected
32853302
if expected is not None:
32863303
try:
32873304
encoded = codecs.code_page_encode(cp, text, errors)
@@ -3292,18 +3309,26 @@ def check_encode(self, cp, tests):
32923309
'%a.encode("cp%s", %r)=%a != %a'
32933310
% (text, cp, errors, encoded[0], expected))
32943311
self.assertEqual(encoded[1], len(text))
3312+
3313+
encoded = text.encode(f'cp{cp}', errors)
3314+
self.assertEqual(encoded, altexpected,
3315+
'%a.encode("cp%s", %r)=%a != %a'
3316+
% (text, cp, errors, encoded, altexpected))
32953317
else:
32963318
self.assertRaises(UnicodeEncodeError,
32973319
codecs.code_page_encode, cp, text, errors)
3320+
self.assertRaises(UnicodeEncodeError,
3321+
text.encode, f'cp{cp}', errors)
32983322

32993323
def test_cp932(self):
33003324
self.check_encode(932, (
33013325
('abc', 'strict', b'abc'),
33023326
('\uff44\u9a3e', 'strict', b'\x82\x84\xe9\x80'),
3327+
('\uf8f3', 'strict', b'\xff'),
33033328
# test error handlers
33043329
('\xff', 'strict', None),
33053330
('[\xff]', 'ignore', b'[]'),
3306-
('[\xff]', 'replace', b'[y]'),
3331+
('[\xff]', 'replace', b'[y]', b'[?]'),
33073332
('[\u20ac]', 'replace', b'[?]'),
33083333
('[\xff]', 'backslashreplace', b'[\\xff]'),
33093334
('[\xff]', 'namereplace',
@@ -3317,12 +3342,12 @@ def test_cp932(self):
33173342
(b'abc', 'strict', 'abc'),
33183343
(b'\x82\x84\xe9\x80', 'strict', '\uff44\u9a3e'),
33193344
# invalid bytes
3320-
(b'[\xff]', 'strict', None),
3321-
(b'[\xff]', 'ignore', '[]'),
3322-
(b'[\xff]', 'replace', '[\ufffd]'),
3323-
(b'[\xff]', 'backslashreplace', '[\\xff]'),
3324-
(b'[\xff]', 'surrogateescape', '[\udcff]'),
3325-
(b'[\xff]', 'surrogatepass', None),
3345+
(b'[\xff]', 'strict', None, '[\uf8f3]'),
3346+
(b'[\xff]', 'ignore', '[]', '[\uf8f3]'),
3347+
(b'[\xff]', 'replace', '[\ufffd]', '[\uf8f3]'),
3348+
(b'[\xff]', 'backslashreplace', '[\\xff]', '[\uf8f3]'),
3349+
(b'[\xff]', 'surrogateescape', '[\udcff]', '[\uf8f3]'),
3350+
(b'[\xff]', 'surrogatepass', None, '[\uf8f3]'),
33263351
(b'\x81\x00abc', 'strict', None),
33273352
(b'\x81\x00abc', 'ignore', '\x00abc'),
33283353
(b'\x81\x00abc', 'replace', '\ufffd\x00abc'),
@@ -3337,7 +3362,7 @@ def test_cp1252(self):
33373362
# test error handlers
33383363
('\u0141', 'strict', None),
33393364
('\u0141', 'ignore', b''),
3340-
('\u0141', 'replace', b'L'),
3365+
('\u0141', 'replace', b'L', b'?'),
33413366
('\udc98', 'surrogateescape', b'\x98'),
33423367
('\udc98', 'surrogatepass', None),
33433368
))
@@ -3347,6 +3372,59 @@ def test_cp1252(self):
33473372
(b'\xff', 'strict', '\xff'),
33483373
))
33493374

3375+
def test_cp708(self):
3376+
self.check_encode(708, (
3377+
('abc2%', 'strict', b'abc2%'),
3378+
('\u060c\u0621\u064a', 'strict', b'\xac\xc1\xea'),
3379+
('\u2562\xe7\xa0', 'strict', b'\x86\x87\xff'),
3380+
('\x9a\x9f', 'strict', b'\x9a\x9f'),
3381+
('\u256b', 'strict', b'\xc0'),
3382+
# test error handlers
3383+
('[\u0662]', 'strict', None),
3384+
('[\u0662]', 'ignore', b'[]'),
3385+
('[\u0662]', 'replace', b'[?]'),
3386+
('\udca0', 'surrogateescape', b'\xa0'),
3387+
('\udca0', 'surrogatepass', None),
3388+
))
3389+
self.check_decode(708, (
3390+
(b'abc2%', 'strict', 'abc2%'),
3391+
(b'\xac\xc1\xea', 'strict', '\u060c\u0621\u064a'),
3392+
(b'\x86\x87\xff', 'strict', '\u2562\xe7\xa0'),
3393+
(b'\x9a\x9f', 'strict', '\x9a\x9f'),
3394+
(b'\xc0', 'strict', '\u256b'),
3395+
# test error handlers
3396+
(b'\xa0', 'strict', None),
3397+
(b'[\xa0]', 'ignore', '[]'),
3398+
(b'[\xa0]', 'replace', '[\ufffd]'),
3399+
(b'[\xa0]', 'backslashreplace', '[\\xa0]'),
3400+
(b'[\xa0]', 'surrogateescape', '[\udca0]'),
3401+
(b'[\xa0]', 'surrogatepass', None),
3402+
))
3403+
3404+
def test_cp20106(self):
3405+
self.check_encode(20106, (
3406+
('abc', 'strict', b'abc'),
3407+
('\xa7\xc4\xdf', 'strict', b'@[~'),
3408+
# test error handlers
3409+
('@', 'strict', None),
3410+
('@', 'ignore', b''),
3411+
('@', 'replace', b'?'),
3412+
('\udcbf', 'surrogateescape', b'\xbf'),
3413+
('\udcbf', 'surrogatepass', None),
3414+
))
3415+
self.check_decode(20106, (
3416+
(b'abc', 'strict', 'abc'),
3417+
(b'@[~', 'strict', '\xa7\xc4\xdf'),
3418+
(b'\xe1\xfe', 'strict', 'a\xdf'),
3419+
# test error handlers
3420+
(b'(\xbf)', 'strict', None),
3421+
(b'(\xbf)', 'ignore', '()'),
3422+
(b'(\xbf)', 'replace', '(\ufffd)'),
3423+
(b'(\xbf)', 'backslashreplace', '(\\xbf)'),
3424+
(b'(\xbf)', 'surrogateescape', '(\udcbf)'),
3425+
(b'(\xbf)', 'surrogatepass', None),
3426+
))
3427+
33503428
def test_cp_utf7(self):
33513429
cp = 65000
33523430
self.check_encode(cp, (
@@ -3419,17 +3497,15 @@ def test_incremental(self):
34193497
False)
34203498
self.assertEqual(decoded, ('abc', 3))
34213499

3422-
def test_mbcs_alias(self):
3423-
# Check that looking up our 'default' codepage will return
3424-
# mbcs when we don't have a more specific one available
3425-
code_page = 99_999
3426-
name = f'cp{code_page}'
3427-
with mock.patch('_winapi.GetACP', return_value=code_page):
3428-
try:
3429-
codec = codecs.lookup(name)
3430-
self.assertEqual(codec.name, 'mbcs')
3431-
finally:
3432-
codecs.unregister(name)
3500+
def test_mbcs_code_page(self):
3501+
# Check that codec for the current Windows (ANSII) code page is
3502+
# always available.
3503+
try:
3504+
from _winapi import GetACP
3505+
except ImportError:
3506+
self.skipTest('requires _winapi.GetACP')
3507+
cp = GetACP()
3508+
codecs.lookup(f'cp{cp}')
34333509

34343510
@support.bigmemtest(size=2**31, memuse=7, dry_run=False)
34353511
def test_large_input(self, size):

0 commit comments

Comments
 (0)