Description
Bug report
Bug description:
I ran into a data corruption bug that seems to be triggered by interleaving reads/seeks from different files inside of an uncompressed zip file. As far as I can tell from the docs, this is allowed by zipfile
. It works correctly in Python 3.7 and 3.9, but fails in 3.12.
I'm attaching a somewhat convoluted testcase (still working on a simpler one). It parses a dBase IV database by reading records from a .dbf file, and for each record, reading a corresponding record from a .dbt file.
When run using Python 3.9, you will see a bunch of data printed out. When run using Python 3.12, you will get an exception ValueError: Invalid dBase IV block: b'PK\x03\x04\n\x00\x00\x00'
. That block does not appear in the input file at all. (Though, when tested with a larger input, I got a block of bytes that appeared in the wrong file.)
For some context, here is a workaround I used in my project: I changed it to read the .dbf file first, then the .dbt.
Testcase:
#!/usr/bin/env python3
import datetime
import pathlib
import struct
import zipfile
from dataclasses import dataclass
from typing import Any, BinaryIO, List, Tuple
ZIP_PATH = pathlib.Path(__file__).parent / 'notams.zip'
@dataclass
class DbfHeader:
SIZE = 32
VERSION = 3
info: int
last_update: datetime.date
num_records: int
header_bytes: int
record_bytes: int
@classmethod
def from_bytes(cls, data: bytes):
info, year, month, day, num_records, header_bytes, record_bytes = struct.unpack('<4BIHH20x', data)
version = info & 0x3
if version != cls.VERSION:
raise ValueError(f"Unsupported DBF version: {version}")
return cls(info, datetime.date(year + 1900, month, day), num_records, header_bytes, record_bytes)
@dataclass
class DbfField:
SIZE = 32
name: str
type: str
length: int
@classmethod
def from_bytes(cls, data: bytes):
name, typ, length = struct.unpack('<11sc4xB15x', data)
return cls(name.rstrip(b'\x00').decode(), typ.decode(), length)
class DbfFile:
@classmethod
def read_header(cls, fd: BinaryIO) -> Tuple[DbfHeader, List[DbfField]]:
header = DbfHeader.from_bytes(fd.read(DbfHeader.SIZE))
num_fields = (header.header_bytes - 33) // 32
fields = [DbfField.from_bytes(fd.read(DbfField.SIZE)) for _ in range(num_fields)]
if fd.read(1) != b'\x0D':
raise ValueError("Missing array terminator")
return header, fields
@classmethod
def read_record(cls, fd: BinaryIO, fields: List[DbfField]) -> List[Any]:
fd.read(1)
values = []
for field in fields:
data = fd.read(field.length).decode('latin-1').strip(' ')
if field.type == 'C':
value = data
elif field.type == 'D':
s = data.strip(' ')
if s:
value = datetime.datetime.strptime(data, '%Y%m%d').date()
else:
value = None
elif field.type == 'L':
if len(data) != 1:
raise ValueError(f"Incorrect length: {data!r}")
if data in 'YyTt':
value = True
elif data in 'NnFf':
value = False
elif data == '?':
value = None
else:
raise ValueError(f"Incorrect boolean: {data!r}")
elif field.type in ('M', 'N'):
value = int(data) if data else None
else:
raise ValueError(f"Unsupported field: {field.type}")
values.append(value)
return values
@dataclass
class DbtHeader:
SIZE = 512
next_free_block: int
dbf_filename: str
reserved: int
block_length: int
@classmethod
def from_bytes(cls, data: bytes):
next_free_block, dbf_filename, reserved, block_length = struct.unpack('<I4x8sIH490x', data)
return cls(next_free_block, dbf_filename.decode('latin-1'), reserved, block_length)
class DbtFile:
DBT3_BLOCK_SIZE = 512
DBT4_BLOCK_START = b'\xFF\xFF\x08\x00'
@classmethod
def read_header(cls, fd: BinaryIO) -> DbtHeader:
fd.seek(0)
block = fd.read(DbtHeader.SIZE)
return DbtHeader.from_bytes(block)
@classmethod
def read_record(cls, fd: BinaryIO, header: DbtHeader, idx: int) -> str:
fd.seek(header.block_length * idx)
block_start = fd.read(8)
if block_start[0:4] != cls.DBT4_BLOCK_START:
raise ValueError(f"Invalid dBase IV block: {block_start}")
length = int.from_bytes(block_start[4:8], 'little')
data = fd.read(length - len(block_start))
return data.decode('latin-1')
def main():
with zipfile.ZipFile(ZIP_PATH) as z:
with z.open('notams.dbf') as dbf_in, z.open('notams.dbt') as dbt_in:
dbf_header, dbf_fields = DbfFile.read_header(dbf_in)
dbt_header = DbtFile.read_header(dbt_in)
for _ in range(dbf_header.num_records):
record = DbfFile.read_record(dbf_in, dbf_fields)
print(record)
memo = DbtFile.read_record(dbt_in, dbt_header, record[3])
print(memo)
if __name__ == '__main__':
main()
Input file:
notams.zip
CPython versions tested on:
3.9, 3.12
Operating systems tested on:
Linux
Linked PRs
Metadata
Metadata
Assignees
Labels
Projects
Status