1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
|
# python3
# Copyright 2021 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The download module implements the download action.
The download action will download a crate from crates.io and unpack it into
`third_party/rust/`."""
from __future__ import annotations
from lib import cargo
from lib import common
from lib import consts
import argparse
import certifi
from functools import partial
import io
import os
import re
import shutil
import sys
import tarfile
import tempfile
import toml
import urllib3
class UntarAbsolutePathError(Exception):
def __init__(self, path: str):
self.path = path
class DownloadError(Exception):
pass
class NeedLicenseError(Exception):
def __init__(self, crate_license: str):
self.crate_license = crate_license
def run(args: argparse.Namespace):
"""Entry point for the the 'download' action."""
if _check_if_crate_is_blocked(args.crate_name):
exit(1)
full_version = _find_crate_full_version(args.crate_name, args.crate_version,
args.verbose)
crate_tarball = _download_crate(args.crate_name, full_version)
if not crate_tarball:
exit(1)
try:
try:
_make_dirs_for_crate(args.crate_name, full_version)
except FileExistsError as e:
print("Unable to make directory {} as it already exists".format(
e.filename),
file=sys.stderr)
raise DownloadError
try:
_untar_crate(args.crate_name, full_version, crate_tarball)
except UntarAbsolutePathError as e:
print("Error: Crate has file at an absolute path!", file=sys.stderr)
print(" " + e.path, file=sys.stderr)
raise DownloadError
# This expects to find the untar'd crate with its Cargo.toml present to
# read.
try:
readme_contents = _gen_readme(args, args.crate_name, full_version)
except NeedLicenseError as e:
print("Error: --license is required to override Cargo.toml "
"value of \"{}\" (or add this to "
"lib.consts.ALLOWED_LICENSES)".format(e.crate_license),
file=sys.stderr)
raise DownloadError
readme_path = common.os_crate_version_dir(args.crate_name,
full_version,
rel_path=["README.chromium"])
with open(readme_path, "w") as readme_file:
readme_file.write(readme_contents)
print("Downloaded {} {} to {}".format(
args.crate_name, full_version,
common.os_crate_version_dir(args.crate_name, full_version)))
except DownloadError:
# Remove the crate-name/vX/crate/ dir which we have downloaded, but
# nothing else, in case there's patches/ or something there.
shutil.rmtree(common.os_crate_cargo_dir(args.crate_name, full_version))
# Try remove the crate-name/vX/ dir if it's empty, but there may be
# patches/ or other stuff there if we're updating an existing crate to a
# new vers so it may fail.
try:
shutil.rmtree(
common.os_crate_version_dir(args.crate_name, full_version))
except:
pass
# Also try remove the crate-name/ dir, but there might be other versions
# present so it can fail.
try:
os.rmdir(common.os_crate_name_dir(args.crate_name))
except:
pass
def _gen_readme(args: argparse.Namespace, crate_name: str, version: str) -> str:
"""Generate the contents of a README.chromium file for a crate."""
cargo = common.load_toml(
common.os_crate_cargo_dir(crate_name, version, rel_path=["Cargo.toml"]))
if args.license:
license = args.license
else:
crate_license = cargo["package"]["license"]
license = None
for allow in consts.ALLOWED_LICENSES:
if allow[0] == crate_license:
license = allow[1]
break
if not license:
raise NeedLicenseError(crate_license)
return consts.README_CHROMIUM.format(
crate_name=cargo["package"]["name"],
url=common.crate_view_url(cargo["package"]["name"]),
description=cargo["package"]["description"].rstrip(),
version=cargo["package"]["version"],
security=args.security_critical,
license=license,
)
def _check_if_crate_is_blocked(crate_name: str) -> bool:
"""Checks whether a crate is considered blocked (and should not be used).
Prints a message and returns True if it is.
"""
if crate_name in consts.BLOCKED_CRATES:
reason = consts.BLOCKED_CRATES[crate_name]
print("The crate \"{}\" is blocked and should not be downloaded: {}".
format(crate_name, reason),
file=sys.stderr)
return True
return False
def _find_crate_full_version(crate_name: str, partial_version: str,
verbose: bool) -> str:
"""Look up the latest matching version from crates.io.
Returns:
If `partial_version` is a full semver (1.2.3), then that is returned
immediately. Always returns a full version with 3 components, which
will be determined from crates.io."""
# Find the version we want to download from crates.io.
if common.version_is_complete(partial_version):
return partial_version
# Go to crates.io through `cargo tree`.
with tempfile.TemporaryDirectory() as workdir:
cargo_toml_path = os.path.join(workdir, "Cargo.toml")
# Generate a fake Cargo.toml which depends on the crate and version.
toml_version = {"dependencies": {crate_name: partial_version}}
cargo.write_cargo_toml_in_tempdir(
workdir,
None,
orig_toml_parsed=cargo.add_required_cargo_fields(toml_version),
verbose=verbose)
# `cargo tree` will tell us the actual version number of the dependency,
# finding the latest matching version on crates.io.
out = cargo.run_cargo_tree(cargo_toml_path,
cargo.CrateBuildOutput.NORMAL, None, 1, [])
# Depth 1 should give only two output lines.
assert len(out) == 2
m = re.search(consts.CARGO_DEPS_REGEX, out[1])
# If these fail, we have invalid output from `cargo tree`?
assert m
assert m.group("version")
return m.group("version")
def _download_crate(crate_name: str, version: str) -> bytes:
"""Downloads a crate from crates.io and returns it as `bytes`.
Returns:
The `bytes` of the downloaded crate tarball, or None if the download
fails.
"""
url = common.crate_download_url(crate_name, version)
http = urllib3.PoolManager(cert_reqs="CERT_REQUIRED",
ca_certs=certifi.where())
resp = http.request("GET", url)
if resp.status != 200:
print("Unable to download {}, status {}".format(url, resp.status),
file=sys.stderr)
return None
return resp.data
def _make_dirs_for_crate(crate_name: str, version: str) -> bool:
"""Recursively make directories to hold a downloaded crate."""
# This is the crate-name/vX/ directory, where the BUILD.gn lives and any
# patches/ directory that are locally applied to the crate.
ver_dir = common.os_crate_version_dir(crate_name, version)
# This is the dir inside the crate-name/vX/ directory where the crate's
# contents will be extracted. If it already exists, we can't download and
# extract the crate, as we'd end up with a mixture of files.
cargo_dir = common.os_crate_cargo_dir(crate_name, version)
if ver_dir != cargo_dir:
try:
os.makedirs(ver_dir)
except FileExistsError:
pass
os.mkdir(cargo_dir)
def _untar_crate(crate_name: str, version: str, crate_tarball: bytes):
"""Untar a downloaded crate tarball."""
with tarfile.open(mode="r", fileobj=io.BytesIO(crate_tarball)) as contents:
for m in contents.getmembers():
# Tar files always have "/" as a path separator.
if m.name.startswith("/") or m.name.startswith(".."):
raise UntarAbsolutePathError(m.name)
# Drop the first path component, which is the crate's name-version.
m.name = re.sub("^.+?/", "", m.name)
contents.extractall(path=common.os_crate_cargo_dir(crate_name, version))
|