-
Notifications
You must be signed in to change notification settings - Fork 83
/
Copy pathpytest_virtualenv.py
337 lines (291 loc) · 13.4 KB
/
pytest_virtualenv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
""" Python virtual environment fixtures
"""
import os
import pathlib
import re
import shutil
import sys
from enum import Enum
from typing import Optional, Tuple
from importlib_metadata import distribution, distributions, PackageNotFoundError
from pytest import yield_fixture
from pytest_shutil.workspace import Workspace
from pytest_shutil import run, cmdline
from pytest_fixture_config import Config, yield_requires_config
class PackageVersion(Enum):
LATEST = 1
CURRENT = 2
class FixtureConfig(Config):
__slots__ = ('virtualenv_executable')
# Default values for system resource locations - patch this to change defaults
# Can be a string or list of them
DEFAULT_VIRTUALENV_FIXTURE_EXECUTABLE = [sys.executable, '-m', 'virtualenv']
CONFIG = FixtureConfig(
virtualenv_executable=os.getenv('VIRTUALENV_FIXTURE_EXECUTABLE', DEFAULT_VIRTUALENV_FIXTURE_EXECUTABLE),
)
@yield_fixture(scope='function')
@yield_requires_config(CONFIG, ['virtualenv_executable'])
def virtualenv():
""" Function-scoped virtualenv in a temporary workspace.
Methods
-------
run() : run a command using this virtualenv's shell environment
run_with_coverage() : run a command in this virtualenv, collecting coverage
install_package() : install a package in this virtualenv
installed_packages() : return a dict of installed packages
Attributes
----------
virtualenv (`path.path`) : Path to this virtualenv's base directory
python (`path.path`) : Path to this virtualenv's Python executable
pip (`path.path`) : Path to this virtualenv's pip executable
.. also inherits all attributes from the `workspace` fixture
"""
venv = VirtualEnv()
yield venv
venv.teardown()
class PackageEntry(object):
# TODO: base this off of setuptools Distribution class or something not home-grown
PACKAGE_TYPES = (ANY, DEV, SRC, REL) = ('ANY', 'DEV', 'SRC', 'REL')
def __init__(self, name, version, source_path=None):
self.name = name
self.version = version
self.source_path = source_path
@property
def issrc(self):
return ("dev" in self.version and
self.source_path is not None and
not self.source_path.endswith(".egg"))
@property
def isrel(self):
return not self.isdev
@property
def isdev(self):
return ('dev' in self.version and
(not self.source_path or self.source_path.endswith(".egg")))
def match(self, package_type):
if package_type is self.ANY:
return True
elif package_type is self.REL:
if self.isrel:
return True
elif package_type is self.DEV:
if self.isdev:
return True
elif package_type is self.SRC:
if self.issrc:
return True
return False
class VirtualEnv(Workspace):
"""
Creates a virtualenv in a temporary workspace, cleans up on exit.
Attributes
----------
python : `str`
path to the python exe
virtualenv : `str`
path to the virtualenv base dir
env : 'list'
environment variables used in creation of virtualenv
delete_workspace: `None or bool`
If True then the workspace will be deleted
If False then the workspace will be kept
If None (default) then the workspace will be deleted if workspace is also None, but it will be kept otherwise
"""
# TODO: update to use pip, remove distribute
def __init__(self, env=None, workspace=None, name='.env', python=None, args=None, delete_workspace=None):
if delete_workspace is None:
delete_workspace = workspace is None
Workspace.__init__(self, workspace, delete_workspace)
self.virtualenv = self.workspace / name
self.args = args or []
if sys.platform == 'win32':
# In virtualenv on windows "Scripts" folder is used instead of "bin".
self.python = self.virtualenv / 'Scripts' / 'python.exe'
self.pip = self.virtualenv / 'Scripts' / 'pip.exe'
self.coverage = self.virtualenv / 'Scripts' / 'coverage.exe'
else:
self.python = self.virtualenv / 'bin' / 'python'
self.pip = self.virtualenv / "bin" / "pip"
self.coverage = self.virtualenv / 'bin' / 'coverage'
if env is None:
self.env = dict(os.environ)
else:
self.env = dict(env) # ensure we take a copy just in case there's some modification
self.env['VIRTUAL_ENV'] = str(self.virtualenv)
self.env['PATH'] = str(os.path.dirname(self.python)) + ((os.path.pathsep + self.env["PATH"])
if "PATH" in self.env else "")
if 'PYTHONPATH' in self.env:
del(self.env['PYTHONPATH'])
self.virtualenv_cmd = CONFIG.virtualenv_executable
if isinstance(self.virtualenv_cmd, str):
cmd = [self.virtualenv_cmd]
else:
cmd = list(self.virtualenv_cmd)
cmd.extend(['-p', python or cmdline.get_real_python_executable()])
cmd.extend(self.args)
cmd.append(str(self.virtualenv))
self.run(cmd)
self._importlib_metadata_installed = False
self.pip_version = self._get_pip_version()
def _get_pip_version(self) -> Tuple[int, ...]:
output = self.run([self.python, "-m", "pip", "--version"], capture=True)
version_number_strs = output.split(" ")[1].split(".")
return tuple(map(int, version_number_strs))
def run(self, args, **kwargs):
"""
Add our cleaned shell environment into any subprocess execution
"""
if 'env' not in kwargs:
kwargs['env'] = self.env
return super(VirtualEnv, self).run(args, **kwargs)
def run_with_coverage(self, *args, **kwargs):
"""
Run a python script using coverage, run within this virtualenv.
Assumes the coverage module is already installed.
Parameters
----------
args:
Args passed into `pytest_shutil.run.run_with_coverage`
kwargs:
Keyword arguments to pass to `pytest_shutil.run.run_with_coverage`
"""
if 'env' not in kwargs:
kwargs['env'] = self.env
coverage = [str(self.python), str(self.coverage)]
return run.run_with_coverage(*args, coverage=coverage, **kwargs)
def install_package(self, pkg_name, version=PackageVersion.LATEST, installer="pip", installer_command="install"):
"""
Install a given package name. If it's already setup in the
test runtime environment, it will use that.
:param pkg_name: `str`
Name of the package to be installed
:param version: `str` or `PackageVersion`
If PackageVersion.LATEST then installs the latest version of the package from upstream
If PackageVersion.CURRENT then installs the same version that's installed in the current virtual environment
that's running the tests If the package is an egg-link, then copy it over. If the
package is not in the parent, then installs the latest version
If the value is a string, then it will be used as the version to install
:param installer: `str`
The installer used to install packages, `pip` by default
`param installer_command: `str`
The command passed to the installed, `install` by default. So the resulting default install command is
`<venv>/Scripts/pip.exe install` on windows and `<venv>/bin/pip install` elsewhere
"""
if sys.platform == 'win32':
# In virtualenv on windows "Scripts" folder is used instead of "bin".
installer = str(self.virtualenv / 'Scripts' / installer) + '.exe'
else:
installer = str(self.virtualenv / 'bin' / installer)
if not self.debug:
installer += ' -q'
if version == PackageVersion.LATEST:
self.run(
"{python} {installer} {installer_command} {spec}".format(
python=self.python, installer=installer, installer_command=installer_command, spec=pkg_name
)
)
elif version == PackageVersion.CURRENT:
dist = next(
iter([dist for dist in distributions() if _normalize(dist.name) == _normalize(pkg_name)]), None
)
if dist:
pkg_location = (
_get_editable_package_location_from_direct_url(dist.name) if self.pip_version >= (19, 3) else None
)
egg_link = _get_egg_link(dist.name)
if pkg_location:
self.run(
f"{self.python} {installer} {installer_command} -e {pkg_location}"
)
elif egg_link:
self._install_package_from_editable_egg_link(egg_link, dist)
else:
spec = "{pkg_name}=={version}".format(pkg_name=pkg_name, version=dist.version)
self.run(
"{python} {installer} {installer_command} {spec}".format(
python=self.python, installer=installer, installer_command=installer_command, spec=spec
)
)
else:
self.run(
"{python} {installer} {installer_command} {spec}".format(
python=self.python, installer=installer, installer_command=installer_command, spec=pkg_name
)
)
else:
spec = "{pkg_name}=={version}".format(pkg_name=pkg_name, version=version)
self.run(
"{python} {installer} {installer_command} {spec}".format(
python=self.python, installer=installer, installer_command=installer_command, spec=spec
)
)
def installed_packages(self, package_type=None):
"""
Return a package dict with
key = package name, value = version (or '')
"""
# Lazily install importlib_metadata in the underlying virtual environment
self._install_importlib_metadata()
if package_type is None:
package_type = PackageEntry.ANY
elif package_type not in PackageEntry.PACKAGE_TYPES:
raise ValueError('invalid package_type parameter (%s)' % str(package_type))
res = {}
code = "import importlib_metadata as metadata\n"\
"for i in metadata.distributions(): print(i.name + ' ' + i.version + ' ' + str(i.locate_file('')))"
lines = self.run([self.python, "-c", code], capture=True).split('\n')
for line in [i.strip() for i in lines if i.strip()]:
name, version, location = line.split(" ", 2)
res[name] = PackageEntry(name, version, location)
return res
def _install_importlib_metadata(self):
if not self._importlib_metadata_installed:
self.install_package("importlib_metadata", version=PackageVersion.CURRENT)
self._importlib_metadata_installed = True
def _install_package_from_editable_egg_link(self, egg_link, package):
import pkg_resources
if sys.platform == "win32":
shutil.copy(egg_link, self.virtualenv / "Lib" / "site-packages" / egg_link.name)
easy_install_pth_path = self.virtualenv / "Lib" / "site-packages" / "easy-install.pth"
else:
python_dir = "python{}.{}".format(sys.version_info.major, sys.version_info.minor)
shutil.copy(egg_link, self.virtualenv / "lib" / python_dir / "site-packages" / egg_link.name)
easy_install_pth_path = self.virtualenv / "lib" / python_dir / "site-packages" / "easy-install.pth"
with open(easy_install_pth_path, "a") as pth, open(egg_link) as egg_link:
pth.write(egg_link.read())
pth.write("\n")
for spec in package.requires:
if not _is_extra_requirement(spec):
dependency = next(pkg_resources.parse_requirements(spec), None)
if dependency and (not dependency.marker or dependency.marker.evaluate()):
self.install_package(dependency.name, version=PackageVersion.CURRENT)
def _normalize(name):
return re.sub(r"[-_.]+", "-", name).lower()
def _get_egg_link(package_name):
for path in sys.path:
egg_link = pathlib.Path(path) / (package_name + ".egg-link")
if egg_link.is_file():
return egg_link
return None
def _get_editable_package_location_from_direct_url(package_name: str) -> Optional[str]:
"""
Uses the PEP610 direct_url.json to get the installed location of a given
editable package.
Parameters
----------
package_name: The name of the package, for example "pytest_virtualenv".
Returns
-------
The URL of the installed package, e.g. "file:///users/<username>/workspace/pytest-plugins/pytest-virtualenv/".
"""
try:
dist = distribution(package_name)
if dist.read_text('direct_url.json') and dist.origin.dir_info.editable:
return dist.origin.url
except PackageNotFoundError:
return None
except FileNotFoundError:
return None
return None
def _is_extra_requirement(spec):
return any(x.replace(" ", "").startswith("extra==") for x in spec.split(";"))