mach-nix/mach_nix/data/providers.py
a-kenji 3faddeaaa9 fix: typos
Fixes some trivial typos, mostly in comments,
except for in `mach_nix/resolver/__init__.py`,
the scope is a context manager, so there shouldn't be arbitrary
sideffects by the rename.
2022-09-08 09:07:24 +02:00

650 lines
26 KiB
Python

import fnmatch
from os import environ
import json
import platform
import re
import sys
from abc import ABC, abstractmethod
from dataclasses import dataclass
from operator import itemgetter
from typing import Any, Iterable, List, Optional, Tuple
import distlib.markers
from pkg_resources import RequirementParseError
from mach_nix.requirements import filter_reqs_by_eval_marker, Requirement, parse_reqs, context, filter_versions
from mach_nix.versions import PyVer, parse_ver, Version
from .bucket_dict import LazyBucketDict
from .nixpkgs import NixpkgsIndex
from ..cache import cached
@dataclass
class Candidate:
name: str
ver: Version
raw_version: str
selected_extras: tuple
provider_info: 'ProviderInfo'
build: str = None
@dataclass
class ProviderInfo:
provider: 'DependencyProviderBase'
wheel_fname: str = None # only required for wheel
url: str = None
hash: str = None
data: Any = None # Provider specific data
def toDict(self):
return dict(
provider=self.provider.name,
wheel_fname=self.wheel_fname,
url=self.url,
hash=self.hash,
)
def normalize_name(key: str) -> str:
return key.replace('_', '-').lower()
class ProviderSettings:
def __init__(self, providers_json):
with open(providers_json) as f:
data = json.load(f)
if isinstance(data, list) or isinstance(data, str):
self.default_providers = self._parse_provider_list(data)
self.pkg_providers = {}
elif isinstance(data, dict):
if '_default' not in data:
raise Exception("Providers must contain '_default' key")
self.pkg_providers = {k: self._parse_provider_list(v) for k, v in data.items()}
self.default_providers = self.pkg_providers['_default']
del self.pkg_providers['_default']
else:
raise Exception('Wrong format for provider settings')
def _parse_provider_list(self, str_or_list) -> Tuple[str]:
if isinstance(str_or_list, str):
return tuple(normalize_name(p.strip()) for p in str_or_list.strip().split(','))
elif isinstance(str_or_list, list):
return tuple(normalize_name(k) for k in str_or_list)
else:
raise Exception("Provider specifiers must be lists or comma separated strings")
def provider_names_for_pkg(self, pkg_name):
name = normalize_name(pkg_name)
if name in self.pkg_providers:
return self.pkg_providers[name]
else:
return self.default_providers
class PackageNotFound(Exception):
def __init__(self, pkg_name, pkg_ver, provider_name, *args, **kwargs):
super(PackageNotFound, self).__init__(f"Provider '{provider_name}' cannot provide {pkg_name}:{pkg_ver}")
class DependencyProviderBase(ABC):
def __init__(self, py_ver: PyVer, platform, system, *args, **kwargs):
self.context = context(py_ver, platform, system)
self.context_wheel = self.context.copy()
self.context_wheel['extra'] = None
self.py_ver = py_ver
self.py_ver_digits = py_ver.digits()
self.platform = platform
self.system = system
@cached(keyfunc=lambda args: (args[0], tuple(args[1])))
def find_matches(self, reqs) -> List[Candidate]:
extras = tuple({extra for req in reqs for extra in req.extras})
builds = tuple({req.build for req in reqs if req.build is not None})
all = list(self.all_candidates_sorted(reqs[0].key, extras, builds))
matching_versions = [c.ver for c in all]
for req in reqs:
matching_versions = filter_versions(matching_versions, req)
matching_versions = set(matching_versions)
matching_candidates = [c for c in all if c.ver in matching_versions]
return matching_candidates
def all_candidates_sorted(self, name, extras, builds) -> Iterable[Candidate]:
candidates = list(self.all_candidates(name, extras, builds))
candidates.sort(key=lambda c: c.ver, reverse=True)
return candidates
@property
@abstractmethod
def name(self):
pass
def unify_key(self, key: str) -> str:
return key.replace('_', '-').lower()
@abstractmethod
@cached()
def get_pkg_reqs(self, candidate: Candidate) -> Tuple[Optional[List[Requirement]], Optional[List[Requirement]]]:
"""
Get all requirements of a candidate for the current platform and the specified extras
returns two lists: install_requires, setup_requires
"""
pass
@abstractmethod
def all_candidates(self, name, extras, builds) -> Iterable[Candidate]:
pass
class CombinedDependencyProvider(DependencyProviderBase):
name = 'combined'
def __init__(
self,
conda_channels_json,
nixpkgs: NixpkgsIndex,
provider_settings: ProviderSettings,
pypi_deps_db_src: str,
*args,
**kwargs):
super(CombinedDependencyProvider, self).__init__(*args, **kwargs)
self.provider_settings = provider_settings
wheel = WheelDependencyProvider(f"{pypi_deps_db_src}/wheel", *args, **kwargs)
sdist = SdistDependencyProvider(f"{pypi_deps_db_src}/sdist", *args, **kwargs)
nixpkgs = NixpkgsDependencyProvider(nixpkgs, wheel, sdist, *args, **kwargs)
self._all_providers = {
f"{wheel.name}": wheel,
f"{sdist.name}": sdist,
f"{nixpkgs.name}": nixpkgs,
}
with open(conda_channels_json) as f:
self._all_providers.update({
f"conda/{channel_name}": CondaDependencyProvider(channel_name, files, *args, **kwargs)
for channel_name, files in json.load(f).items()
})
providers_used = set(provider_settings.default_providers)
for p_list in provider_settings.pkg_providers.values():
for p in p_list:
providers_used.add(p)
unknown_providers = providers_used - set(self._all_providers.keys())
if unknown_providers:
raise Exception(f"Error: Unknown providers '{unknown_providers}'. Please remove from 'providers=...'")
def allowed_providers_for_pkg(self, pkg_name):
provider_keys = self.provider_settings.provider_names_for_pkg(pkg_name)
selected_providers = ((name, p) for name, p in self._all_providers.items() if name in provider_keys)
return dict(sorted(selected_providers, key=lambda x: provider_keys.index(x[0])))
def get_pkg_reqs(self, c: Candidate) -> Tuple[Optional[List[Requirement]], Optional[List[Requirement]]]:
return c.provider_info.provider.get_pkg_reqs(c)
def list_all_providers_for_pkg(self, pkg_name, extras, build):
result = []
for p_name, provider in self._all_providers.items():
if provider.all_candidates(pkg_name, extras, build):
result.append(p_name)
return result
def print_error_no_versions_available(self, pkg_name, extras, build):
provider_names = set(self.allowed_providers_for_pkg(pkg_name).keys())
error_text = \
f"\nThe Package '{pkg_name}' (build: {build}) is not available from any of the " \
f"selected providers {sorted(provider_names)}\n for the selected python version"
if provider_names != set(self._all_providers.keys()):
alternative_providers = self.list_all_providers_for_pkg(pkg_name, extras, build)
if alternative_providers:
error_text += \
f'\nThe package is is available from providers {alternative_providers}\n' \
f"Consider adding them via 'providers='."
else:
error_text += \
f"\nThe required package might just not (yet) be part of the dependency DB currently used.\n" \
f"The DB can be updated by specifying 'pypiDataRev' when importing mach-nix.\n" \
f"For examples see: https://github.com/DavHau/mach-nix/blob/master/examples.md\n" \
f"If it still doesn't work, there might have been an error while building the DB.\n" \
f"Please open an issue at: https://github.com/DavHau/mach-nix/issues/new\n"
print(error_text, file=sys.stderr)
exit(1)
@cached()
def all_candidates_sorted(self, pkg_name, extras, builds) -> Iterable[Candidate]:
# use dict as ordered set
candidates = []
# order by reversed preference expected
for provider in tuple(self.allowed_providers_for_pkg(pkg_name).values()):
candidates += list(provider.all_candidates_sorted(pkg_name, extras, builds))
if not candidates:
self.print_error_no_versions_available(pkg_name, extras, builds)
return tuple(candidates)
def all_candidates(self, name, extras, builds) -> Iterable[Candidate]:
return self.all_candidates_sorted(name, extras, builds)
class NixpkgsDependencyProvider(DependencyProviderBase):
name = 'nixpkgs'
# TODO: implement extras by looking them up via the equivalent wheel
def __init__(
self,
nixpkgs: NixpkgsIndex,
wheel_provider: 'WheelDependencyProvider',
sdist_provider: 'SdistDependencyProvider',
*args, **kwargs):
super(NixpkgsDependencyProvider, self).__init__(*args, **kwargs)
self.nixpkgs = nixpkgs
self.wheel_provider = wheel_provider
self.sdist_provider = sdist_provider
def get_pkg_reqs(self, c: Candidate) -> Tuple[Optional[List[Requirement]], Optional[List[Requirement]]]:
requirements = self.nixpkgs.get_requirements(c.name, c.ver)
if requirements is not None:
return list(parse_reqs(requirements)), None
for provider in (self.sdist_provider, self.wheel_provider):
candidates = [
candidate
for candidate in provider.all_candidates(c.name, None, None)
if candidate.ver == c.ver
]
if len(candidates) > 0:
return provider.get_pkg_reqs(candidates[0])
return None, None
def all_candidates(self, pkg_name, extras, builds) -> Iterable[Candidate]:
if builds:
return []
name = self.unify_key(pkg_name)
if not self.nixpkgs.exists(name):
return []
return [Candidate(
pkg_name,
p.ver,
str(p.ver),
extras,
provider_info=ProviderInfo(self)
) for p in self.nixpkgs.get_all_candidates(name)]
@dataclass
class WheelRelease:
fn_pyver: str # the python version indicated by the filename
name: str
ver: str
fn: str
requires_dist: list
provided_extras: list
requires_python: str # the python version of the wheel metadata
def __hash__(self):
return hash(self.fn)
class WheelDependencyProvider(DependencyProviderBase):
name = 'wheel'
def __init__(self, data_dir: str, *args, **kwargs):
super(WheelDependencyProvider, self).__init__(*args, **kwargs)
self.data = LazyBucketDict(data_dir)
maj = self.py_ver.version.release[0] # major version
min = self.py_ver.version.release[1] # minor version
cp_abi = f"cp{maj}{min}mu" if int(maj) == 2 else f"cp{maj}{min}m?"
if self.system == "linux":
self.preferred_wheels = (
re.compile(rf".*(py{maj}|cp{maj})({min})?[\.-].*({cp_abi}|abi3|none)-manylinux2014_{self.platform}"),
re.compile(rf".*(py{maj}|cp{maj})({min})?[\.-].*({cp_abi}|abi3|none)-manylinux2010_{self.platform}"),
re.compile(rf".*(py{maj}|cp{maj})({min})?[\.-].*({cp_abi}|abi3|none)-manylinux1_{self.platform}"),
re.compile(rf".*(py{maj}|cp{maj})({min})?[\.-].*({cp_abi}|abi3|none)-manylinux_2_5_{self.platform}"),
re.compile(rf".*(py{maj}|cp{maj})({min})?[\.-].*({cp_abi}|abi3|none)-manylinux_2_12_{self.platform}"),
re.compile(rf".*(py{maj}|cp{maj})({min})?[\.-].*({cp_abi}|abi3|none)-manylinux_2_17_{self.platform}"),
re.compile(rf".*(py{maj}|cp{maj})({min})?[\.-].*({cp_abi}|abi3|none)-linux_{self.platform}"),
re.compile(rf".*(py{maj}|cp{maj})({min})?[\.-].*({cp_abi}|abi3|none)-any"),
)
elif self.system == "darwin":
platform = "arm64" if self.platform == "aarch64" else self.platform
self.preferred_wheels = (
re.compile(rf".*(py{maj}|cp{maj})({min})?[\.-].*({cp_abi}|abi3|none)-any"),
re.compile(rf".*(py{maj}|cp{maj})({min})?[\.-].*({cp_abi}|abi3|none)-macosx_\d*_\d*_universal"),
re.compile(rf".*(py{maj}|cp{maj})({min})?[\.-].*({cp_abi}|abi3|none)-macosx_\d*_\d*_{platform}"),
)
else:
raise Exception(f"Unsupported Platform {platform.system()}")
def all_candidates(self, pkg_name, extras, builds) -> List[Candidate]:
if builds:
return []
return [Candidate(
w.name,
parse_ver(w.ver),
w.ver,
extras,
provider_info=ProviderInfo(provider=self, wheel_fname=w.fn, data=w)
) for w in self._suitable_wheels(pkg_name)]
def get_pkg_reqs(self, c: Candidate) -> Tuple[List[Requirement], List[Requirement]]:
"""
Get requirements for package
"""
reqs_raw = c.provider_info.data.requires_dist
if reqs_raw is None:
reqs_raw = []
# handle extras by evaluationg markers
install_reqs = list(filter_reqs_by_eval_marker(parse_reqs(reqs_raw), self.context_wheel, c.selected_extras))
return install_reqs, []
def _all_releases(self, pkg_name):
name = self.unify_key(pkg_name)
if name not in self.data:
return []
for fn_pyver, vers in self.data[name].items():
for ver, fnames in vers.items():
for fn, deps in fnames.items():
if isinstance(deps, str):
key_ver, key_fn = deps.split('@')
versions = self.data[name][fn_pyver]
deps = versions[key_ver][key_fn]
assert isinstance(deps, dict)
yield WheelRelease(
fn_pyver,
name,
ver,
fn,
deps['requires_dist'] if 'requires_dist' in deps else None,
deps['requires_extras'] if 'requires_extras' in deps else None,
deps['requires_python'].strip(',') if 'requires_python' in deps else None,
)
def _apply_filters(self, filters: List[callable], objects: Iterable):
"""
Applies multiple filters to objects. First filter in the list is applied first
"""
assert len(filters) > 0
if len(filters) == 1:
return filter(filters[0], objects)
return filter(filters[-1], self._apply_filters(filters[:-1], objects))
@cached()
def _choose_wheel(self, pkg_name, pkg_version: Version) -> WheelRelease:
suitable = list(self._suitable_wheels(pkg_name, pkg_version))
if not suitable:
raise PackageNotFound(pkg_name, pkg_version, self.name)
return self._select_preferred_wheel(suitable)
def _suitable_wheels(self, pkg_name: str, ver: Version = None) -> Iterable[WheelRelease]:
wheels = self._all_releases(pkg_name)
if ver is not None:
wheels = filter(lambda w: parse_ver(w.ver) == ver, wheels)
return self._apply_filters(
[
self._wheel_type_ok,
self._python_requires_ok,
],
wheels)
def _select_preferred_wheel(self, wheels: Iterable[WheelRelease]):
wheels = list(wheels)
for pattern in self.preferred_wheels:
for wheel in wheels:
if re.search(pattern, wheel.fn):
return wheel
raise Exception(f"No wheel type found that is compatible to the current system")
def _wheel_type_ok(self, wheel: WheelRelease):
return any(re.search(pattern, wheel.fn) for pattern in self.preferred_wheels)
def _python_requires_ok(self, wheel: WheelRelease):
if not wheel.requires_python:
return True
ver = self.py_ver.version
try:
parsed_py_requires = list(parse_reqs(f"python{wheel.requires_python}"))
return bool(filter_versions([ver], parsed_py_requires[0]))
except RequirementParseError:
print(f"WARNING: `requires_python` attribute of wheel {wheel.name}:{wheel.ver} could not be parsed")
return False
class SdistDependencyProvider(DependencyProviderBase):
name = 'sdist'
def __init__(self, data_dir: str, *args, **kwargs):
self.data = LazyBucketDict(data_dir)
super(SdistDependencyProvider, self).__init__(*args, **kwargs)
@cached()
def _get_candidates(self, name) -> dict:
"""
returns all candidates for the give name which are available for the current python version
"""
key = self.unify_key(name)
candidates = {}
try:
self.data[key]
except KeyError:
return {}
for ver, pyvers in self.data[key].items():
# in case pyvers is a string, it is a reference to another ver which we need to resolve
if isinstance(pyvers, str):
pyvers = self.data[key][pyvers]
# in case pyver is a string, it is a reference to another pyver which we need to resolve
if self.py_ver_digits in pyvers:
pkg_data = pyvers[self.py_ver_digits]
if isinstance(pkg_data, str):
pkg_data = pyvers[pkg_data]
if 'python_requires' in pkg_data:
specs = ",".join(pkg_data['python_requires'])
parsed_py_requires = list(parse_reqs(f"python{specs}"))
if not filter_versions([self.py_ver.version], parsed_py_requires[0]):
continue
candidates[ver] = pkg_data
return candidates
def _get_reqs_for_extras(self, pkg, extras):
if extras is None:
return []
extras = set(extras)
requirements = []
if 'extras_require' in pkg:
for name, reqs_str in pkg['extras_require'].items():
# handle extras with marker in key
if ':' in name:
name, marker = name.split(':')
if not distlib.markers.interpret(marker, self.context):
continue
if name == '' or name in extras:
requirements += list(filter_reqs_by_eval_marker(parse_reqs(reqs_str), self.context))
return requirements
def get_pkg_reqs(self, c: Candidate) -> Tuple[List[Requirement], List[Requirement]]:
"""
Get requirements for package
"""
pkg = c.provider_info.data
requirements = dict(
setup_requires=[],
install_requires=[]
)
for t in ("setup_requires", "install_requires"):
if t not in pkg:
requirements[t] = []
else:
reqs_raw = pkg[t]
reqs = parse_reqs(reqs_raw)
requirements[t] = list(filter_reqs_by_eval_marker(reqs, self.context))
# even if no extras are selected we need to collect reqs for extras,
# because some extras consist of only a marker which needs to be evaluated
requirements['install_requires'] += self._get_reqs_for_extras(pkg, c.selected_extras)
return requirements['install_requires'], requirements['setup_requires']
def all_candidates(self, pkg_name, extras, builds) -> Iterable[Candidate]:
if builds:
return []
return [Candidate(
pkg_name,
parse_ver(ver),
ver,
extras,
provider_info=ProviderInfo(self, data=pkg)
) for ver, pkg in self._get_candidates(pkg_name).items()]
def conda_virtual_packages():
packages = dict(
__unix=0,
)
# platform.libc_ver() returns ('', '') on macOS and MACHNIX_GLIBC_VERSION is unset
libc_compiler, libc_version = platform.libc_ver()
if libc_compiler == 'glibc':
packages['__glibc'] = environ.get("MACHNIX_GLIBC_VERSION", libc_version)
# Maximum version of CUDA supported by the display driver.
cudaVer = environ.get("MACHNIX_CUDA_VERSION", None)
if cudaVer is not None:
packages['__cuda'] = cudaVer
if sys.platform == 'linux':
packages['__linux'] = environ.get("MACHNIX_LINUX_VERSION", platform.uname().release)
if sys.platform == 'darwin':
packages['__osx'] = environ.get("MACHNIX_OSX_VERSION", platform.uname().release)
return packages
class CondaDependencyProvider(DependencyProviderBase):
ignored_pkgs = (
"python"
)
virtual_packages = conda_virtual_packages()
def __init__(self, channel, files, py_ver: PyVer, platform, system, *args, **kwargs):
self.channel = channel
self.pkgs = {}
for file in files:
with open(file) as f:
content = json.load(f)
for i, fname in enumerate(content['packages'].keys()):
p = content['packages'][fname]
name = p['name'].replace('_', '-').lower()
ver = p['version']
build = p['build']
if name not in self.pkgs:
self.pkgs[name] = {}
if ver not in self.pkgs[name]:
self.pkgs[name][ver] = {}
if build in self.pkgs[name][ver]:
if 'collisions' not in self.pkgs[name][ver][build]:
self.pkgs[name][ver][build]['collisions'] = []
self.pkgs[name][ver][build]['collisions'].append((p['name'], p['subdir']))
continue
self.pkgs[name][ver][build] = p
self.pkgs[name][ver][build]['fname'] = fname
# generate packages for virtual packages
for pname, ver in self.virtual_packages.items():
pname_norm = pname.replace('_', '-').lower()
self.pkgs[pname_norm] = {ver: {0: {
'build': 0,
'build_number': 0,
'depends': [],
'fname': None,
'name': pname_norm,
'sha256': None,
'subdir': None,
'version': ver,
}}}
super().__init__(py_ver, platform, system, *args, **kwargs)
@property
def name(self):
return f"conda/{self.channel}"
def get_pkg_reqs(self, c: Candidate) -> Tuple[List[Requirement], List[Requirement]]:
candidate = c.provider_info.data
depends = list(filter(
lambda d: d.split()[0] not in self.ignored_pkgs,
# lambda d: d.split()[0] not in self.ignored_pkgs and not d.startswith('_'),
candidate['depends']
# always add optional dependencies to ensure constraints are applied
+ (candidate['constrains'] if 'constrains' in candidate else [])
))
return list(parse_reqs(depends)), []
@cached()
def all_candidates_sorted(self, name, extras, build) -> Iterable[Candidate]:
candidates = self.all_candidates(name, extras, build)
candidates.sort(
key=lambda c: (c.ver, c.provider_info.data["build_number"]),
reverse=True,
)
return candidates
def all_candidates(self, pkg_name, extras, builds) -> Iterable[Candidate]:
pkg_name = normalize_name(pkg_name)
if pkg_name not in self.pkgs:
return []
candidates = []
for p in self.compatible_builds(pkg_name, builds):
if 'sha256' not in p:
print(
f"Ignoring conda package {p['name']}:{p['version']} from provider {self.channel} \n"
"since it doesn't provide a sha256 sum.\n")
else:
if self.channel in ('free', 'intel', 'main', 'r'):
url = f"https://repo.anaconda.com/pkgs/{self.channel}/{p['subdir']}/{p['fname']}"
else:
url = f"https://anaconda.org/{self.channel}/{p['name']}/" \
f"{p['version']}/download/{p['subdir']}/{p['fname']}"
candidates.append(Candidate(
p['name'],
parse_ver(p['version']),
p['version'],
selected_extras=tuple(),
build=p['build'],
provider_info=ProviderInfo(
self,
url=url,
hash=p['sha256'],
data=p,
)
))
if 'collisions' in p:
print(
f"WARNING: Colliding conda package in channel '{self.channel}' "
f"Ignoring {list(map(itemgetter(0), p['collisions']))} "
f"from {list(map(itemgetter(1), p['collisions']))} "
f"in favor of {p['name']} from '{p['subdir']}'")
return candidates
def python_ok(self, build):
for dep in build['depends']:
if dep == "pypy" or dep.startswith("pypy "):
return False
if dep.startswith("python "):
req = next(iter(parse_reqs([dep])))
if not filter_versions([self.py_ver.version], req):
return False
return True
@cached()
def compatible_builds(self, pkg_name, build_patterns) -> list:
if build_patterns:
# fnmatch caches the regexes it builds.
def build_matches(build):
return all((fnmatch.fnmatch(build, build_pattern) for build_pattern in build_patterns))
else:
def build_matches(build):
return True
return [
pkg_data
for ver, pkg_builds in self.pkgs[pkg_name].items()
for build, pkg_data in pkg_builds.items()
if self.python_ok(pkg_data)
and build_matches(build)
]