import json
import time
from operator import itemgetter
import requests
from packaging.version import Version, InvalidVersion
from .exceptions import AnalyzerError, AnalyzerAPIError
from .parser import RequirementParser
from .utils.lists import split_to_chunks
from .utils.logger import NoOperationLogger
from .utils.dates import safe_isoformat_parse
from . import __pkgname__, __version__
[docs]
class DependenciesAnalyzer(RequirementParser):
"""
Analyzer to get and compute package informations from Pypi for requirements.
This will need to make 2 requests to get needed informations for each package since
the package detail endpoint from new "JSON API" has releases informations but it
is deprecated in profit of the Legacy API.
Legacy API (also known as the "Simple API") return either a HTML or JSON response,
depending value of request header "Accept:".
Attributes:
PACKAGE_DETAIL_ENDPOINT (string): Template string to build URL to the Pypi
JSON API to get package details.
PACKAGE_RELEASES_ENDPOINT (string): Template string to build URL to the Pypi
Legacy API to get package releases.
"""
PACKAGE_DETAIL_ENDPOINT = "https://pypi.org/pypi/{name}/json"
PACKAGE_RELEASES_ENDPOINT = "https://pypi.org/simple/{name}/"
def __init__(self, cachedir=None, api_pause=1, api_timeout=None, api_chunk=None,
logger=None, ignores=None):
self.cachedir = cachedir
self.logger = logger or NoOperationLogger()
# Amount of requirements to analyze by chunk
self.api_chunk = api_chunk or 10
# Time in seconds to pause before an API request
# Pause time between chunks
self.api_pause = api_pause
# Time in seconds for timeout limit on API request
self.api_timeout = api_timeout
# TODO: Currently not implemented, it should be a list of package names to
# ignore from analyze, dont know the state it will end in. It could be helpful
# for bypassing some erroneous requirements without breaking the whole analyze.
self.ignores = ignores or []
[docs]
def endpoint_package_detail(self, name):
"""
Request package detail API endpoint for given package name.
Arguments:
name (string): The package name to search for.
Returns:
requests.Response: Response object from request.
"""
endpoint_url = self.PACKAGE_DETAIL_ENDPOINT.format(name=name)
response = requests.get(
endpoint_url,
headers=self.request_headers(),
timeout=self.api_timeout,
)
if response.status_code == 404:
raise AnalyzerAPIError(
(
"API responded a 404 error, package name '{}' is probably "
"invalid or not available on Pypi."
).format(name),
http_status=404
)
# In case we have an error status that is not taken in charge before
response.raise_for_status()
return response
[docs]
def endpoint_releases_detail(self, name):
"""
Request package releases API endpoint for given package name.
Arguments:
name (string): The package name to search for.
Returns:
requests.Response: Response object from request.
"""
endpoint_url = self.PACKAGE_RELEASES_ENDPOINT.format(name=name)
response = requests.get(
endpoint_url,
headers=self.request_headers(),
timeout=self.api_timeout,
)
if response.status_code == 404:
raise AnalyzerAPIError(
(
"API responded a 404 error, package name '{}' is probably "
"invalid or not available on Pypi."
).format(name),
http_status=404
)
# In case we have an error status that is not taken in charge before
response.raise_for_status()
return response
[docs]
def get_cache_or_request(self, name, filename, method, label):
"""
Helper to search for a cache before making request if there is none.
Arguments:
name (string): The package name to search for.
filename (string): Filename to write cache. It should include the label to
ensure they won't overwrite each other.
method (callable): Callable that will perform a request to get JSON
payload. The callable is expected to accept a single argument which is
a package name to request.
label (string): Label of informations kind. Commonly it is ``detail`` or
``releases``.
Returns:
dict: Returned payload from API or from stored cache.
"""
self.logger.debug("Get package {label} for '{name}'".format(
label=label,
name=name or "Unknow"
))
# Mostly impossible to be there but just in case there is an unexpected issue
if not name:
raise AnalyzerError("Package without name can not be requested.")
# Build expected cache file name if cache is enabled
cache_file = None
if self.cachedir:
cache_file = self.cachedir / filename
# Return cache if it exists
if cache_file and cache_file.exists():
self.logger.debug("Loading data from cache")
return json.loads(cache_file.read_text())
# Use given method name to request payload from API
response = method(name)
self.logger.debug("[{status}] API response from {url}".format(
status=response.status_code,
url=response.url.split("?")[0],
))
output = response.json()
# Build cache file if cache is enabled
if self.cachedir:
self.logger.debug("Writing cache: {}".format(cache_file))
cache_file.write_text(json.dumps(output, indent=4))
return output
[docs]
def get_package_data(self, name):
"""
Get package informations (detail and releases)
Arguments:
name (string): The package name to search for.
Returns:
dict: A dictionnary that contain all useful package informations (detail
and releases).
"""
self.logger.info("Processing package: {name}".format(
name=name or "Unknow"
))
if not name:
raise AnalyzerError("Package without name can not be requested.")
# Patch detail to inject released versions
output = self.get_cache_or_request(
name,
"{}.detail.json".format(name),
self.endpoint_package_detail,
"detail",
)
output["versions"] = self.format_releases_payload(
self.get_cache_or_request(
name,
"{}.releases.json".format(name),
self.endpoint_releases_detail,
"releases",
)
)
return output
[docs]
def compute_package_releases(self, name, data):
"""
Build a list of released versions from API patched with some values in useful
types.
Arguments:
name (string): Parsed package name.
data (dict): Dictionnary of package data as retrieved from API.
Returns:
list: List of dictionnary for computed releases.
"""
versions = []
# Rebuild the version list to patch some values in useful types
for item in data["versions"]:
# Enforce real datetime
item["published_at"] = safe_isoformat_parse(item["published_at"])
# Coerce original number to a Version object if possible
try:
number = Version(item["number"])
except InvalidVersion:
msg = (
"Ignored invalid version number '{version}' for package '{name}'"
)
self.logger.warning(msg.format(name=name, version=item["number"]))
continue
else:
item["number"] = number
versions.append(item)
return sorted(versions, key=itemgetter("number"))
[docs]
def get_latest_specified_release(self, specifiers, releases):
"""
Get the latest release that match given specifiers on given release list.
Pre releases are always ignored.
Arguments:
specifiers (packaging.SpecifierSet): Version specifiers to match against
releases.
releases (list): List of dict for releases as built from
``DependenciesAnalyzer.compute_package_releases()``.
Returns:
dict: Dictionnary of release data taken from given releases if it matched
specifier. Else returns a null value.
"""
indexed = {
str(item["number"]): item
for item in releases
}
matched = sorted(
specifiers.filter(
[str(item["number"]) for item in releases],
prereleases=False
),
)
if not matched:
return None
return indexed[matched[-1]]
[docs]
def compute_lateness(self, target, versions):
"""
Compute version lateness for a given version target.
Lateness is only about version higher than targeted version and that are not
build releases or pre releases
Arguments:
target (string or packaging.version.Version): The targeted version
to check against package released versions. If a string it will be
coerced to a ``Version`` object.
versions (list): List of dictionnaries (as computed from
``build_package_informations()``) for all existing release versions.
Returns:
list: A list of tuples for all existing version higher
than given target release version. Tuple first item is the version
number (as a ``Version`` object and second item is its
release publishing datetime.
"""
if not isinstance(target, Version):
target = Version(target)
return [
(str(item["number"]), item["published_at"])
for item in versions
if (
item["number"] > target and
item["number"].is_prerelease is False and
item["number"].is_postrelease is False and
item["number"].is_devrelease is False
)
]
[docs]
def get_package_urls(self, data):
"""
This should try to get the relevant URLs from package metadatas.
However the ``project_urls`` item from package metadatas is not normalized
enough to quickly get relevant infos so here we should try to get them.
Arguments:
data (dict): Dictionnary of package informations as returned from
``Analyzer.get_package_data()``.
Returns:
dict: A dictionnary that contains useful URLs.
"""
informations = data["info"]
urls = informations.get("project_urls", {})
repository_url = None
elligible_repo_url_names = ["repository", "source", "source code"]
for name, value in urls.items():
if name.lower() in elligible_repo_url_names:
repository_url = value
break
return {
"package": informations["package_url"],
"repository": repository_url,
}
[docs]
def inspect(self, requirements, environment=None, strict=False, basepath=None):
"""
Inspect given requirement to get their informations.
Arguments:
requirements (string or Path): Either a Path object for a file to open or
directly requirements content as a string.
Keyword Arguments:
environment (dict): Optionnal dictionnary of environment variables to use
with possible specifier marker resolution.
strict (boolean): If True only the valid requirements (see
``dependency_comb.package.PackageRequirement.is_valid``) are returned.
Default is False, all requirements are returned and you need to check
their status yourself if needed.
basepath (Path): A directory path where to search for requirement
inclusions (directive ``-r foo.txt``) from requirements file.
Returns:
iterator: Iterator of PackageRequirement objects for given requirements.
"""
parsed_requirements = self.parse_requirements(
requirements,
environment=environment,
basepath=basepath,
)
#
if self.api_chunk:
chunks = list(split_to_chunks(parsed_requirements, self.api_chunk))
else:
chunks = [parsed_requirements]
for i, chunk in enumerate(chunks, start=1):
for item in chunk:
pkginfos = self.build_package_informations(item)
if not strict or (strict and pkginfos.is_valid):
yield pkginfos
if self.api_pause and i < len(chunks):
self.logger.debug("Making pause of {} second(s)".format(self.api_pause))
time.sleep(self.api_pause)