speculum - A simple, straightforward Arch Linux mirror list optimizer












7












$begingroup$


After having had a look at reflector's code base I decided to write a new, more lightweight mirror list optimizer from scratch: speculum.



The script queries the Arch Linux mirror list JSON endpoint and performs filtering, sorting and limiting of mirrors according to the user's input.



Any feedback is welcome.



#! /usr/bin/env python3
#
# speculum - An Arch Linux mirror list updater.
#
# Copyright (C) 2019 Richard Neumann <mail at richard dash neumann period de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
"""Yet another Arch Linux mirrorlist optimizer."""

from __future__ import annotations
from argparse import ArgumentParser, Namespace
from datetime import datetime, timedelta
from enum import Enum
from json import load
from logging import INFO, basicConfig, getLogger
from os import linesep
from pathlib import Path
from re import error, compile, Pattern # pylint: disable=W0622
from sys import exit, stderr # pylint: disable=W0622
from typing import Callable, FrozenSet, Generator, Iterable, NamedTuple, Tuple
from urllib.request import urlopen
from urllib.parse import urlparse, ParseResult


MIRRORS_URL = 'https://www.archlinux.org/mirrors/status/json/'
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
REPO_PATH = '$repo/os/$arch'
LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
LOGGER = getLogger(__file__)


def strings(string: str) -> filter:
"""Splits strings by comma."""

return filter(None, map(lambda s: s.strip().lower(), string.split(',')))


def stringset(string: str) -> FrozenSet[str]:
"""Returns a tuple of strings form a comma separated list."""

return frozenset(strings(string))


def hours(string: str) -> timedelta:
"""Returns a timedelta of the respective
amount of hours from a string.
"""

return timedelta(hours=int(string))


def regex(string: str) -> Pattern:
"""Returns a regular expression."""

try:
return compile(string)
except error:
raise ValueError(str(error))


def sorting(string: str) -> Tuple[Sorting]:
"""Returns a tuple of sorting options
from comma-separated string values.
"""

return tuple(Sorting.from_string(string))


def posint(string: str) -> int:
"""Returns a positive integer."""

integer = int(string)

if integer > 0:
return integer

raise ValueError('Integer must be greater than zero.')


def get_json() -> dict:
"""Returns the mirrors from the respective URL."""

with urlopen(MIRRORS_URL) as response:
return load(response)


def get_mirrors() -> Generator[Mirror]:
"""Yields the respective mirrors."""

for json in get_json()['urls']:
yield Mirror.from_json(json)


def get_sorting_key(order: Tuple[Sorting]) -> Callable:
"""Returns a key function to sort mirrors."""

now = datetime.now()

def key(mirror):
return mirror.get_sorting_key(order, now)

return key


def limit(mirrors: Iterable[Mirror], maximum: int) -> Generator[Mirror]:
"""Limit the amount of mirrors."""

for count, mirror in enumerate(mirrors, start=1):
if maximum is not None and count > maximum:
break

yield mirror


def get_args() -> Namespace:
"""Returns the parsed arguments."""

parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', type=sorting, default=None, metavar='sorting',
help='sort by the respective properties')
parser.add_argument(
'--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', type=stringset, default=None, metavar='countries',
help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', type=stringset, default=None, metavar='protocols',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=hours, default=None, metavar='max_age',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--regex-incl', '-i', type=regex, default=None, metavar='regex_incl',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x', type=regex, default=None, metavar='regex_excl',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=posint, default=None, metavar='file',
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args()


def dump_mirrors(mirrors: Iterable[Mirror], path: Path) -> int:
"""Dumps the mirrors to the given path."""

mirrorlist = linesep.join(mirror.mirrorlist_record for mirror in mirrors)

try:
with path.open('w') as file:
file.write(mirrorlist)
except PermissionError as permission_error:
LOGGER.error(permission_error)
return 1

return 0


def print_mirrors(mirrors: Iterable[Mirror]) -> int:
"""Prints the mirrors to STDOUT."""

for mirror in mirrors:
try:
print(mirror.mirrorlist_record, flush=True)
except BrokenPipeError:
stderr.close()
return 0

return 0


def main() -> int:
"""Filters and sorts the mirrors."""

basicConfig(level=INFO, format=LOG_FORMAT)
args = get_args()
mirrors = get_mirrors()
filters = Filter(
args.countries, args.protocols, args.max_age, args.regex_incl,
args.regex_excl)
mirrors = filter(filters.match, mirrors)
key = get_sorting_key(args.sort)
mirrors = sorted(mirrors, key=key, reverse=args.reverse)
mirrors = limit(mirrors, args.limit)
mirrors = tuple(mirrors)

if not mirrors and args.limit != 0:
LOGGER.error('No mirrors found.')
return 1

if args.limit is not None and len(mirrors) < args.limit:
LOGGER.warning('Filter yielded less mirrors than specified limit.')

if args.output:
return dump_mirrors(mirrors, args.output)

return print_mirrors(mirrors)


class Sorting(Enum):
"""Sorting options."""

AGE = 'age'
RATE = 'rate'
COUNTRY = 'country'
SCORE = 'score'
DELAY = 'delay'

@classmethod
def from_string(cls, string: str) -> Generator[Sorting]:
"""Returns a tuple of sortings from the respective string."""
for option in strings(string):
yield cls(option)


class Duration(NamedTuple):
"""Represents the duration data on a mirror."""

average: float
stddev: float

@property
def sorting_key(self) -> Tuple[float]:
"""Returns a sorting key."""
average = float('inf') if self.average is None else self.average
stddev = float('inf') if self.stddev is None else self.stddev
return (average, stddev)


class Country(NamedTuple):
"""Represents country information."""

name: str
code: str

def match(self, string: str) -> bool:
"""Matches a country description."""
return string.lower() in {self.name.lower(), self.code.lower()}

@property
def sorting_key(self) -> Tuple[str]:
"""Returns a sorting key."""
name = '~' if self.name is None else self.name
code = '~' if self.code is None else self.code
return (name, code)


class Mirror(NamedTuple):
"""Represents information about a mirror."""

url: ParseResult
last_sync: datetime
completion: float
delay: int
duration: Duration
score: float
active: bool
country: Country
isos: bool
ipv4: bool
ipv6: bool
details: ParseResult

@classmethod
def from_json(cls, json: dict) -> Mirror:
"""Returns a new mirror from a JSON-ish dict."""
url = urlparse(json['url'])
last_sync = json['last_sync']

if last_sync is not None:
last_sync = datetime.strptime(last_sync, DATE_FORMAT).replace(
tzinfo=None)

duration_avg = json['duration_avg']
duration_stddev = json['duration_stddev']
duration = Duration(duration_avg, duration_stddev)
country = json['country']
country_code = json['country_code']
country = Country(country, country_code)
details = urlparse(json['details'])
return cls(
url, last_sync, json['completion_pct'], json['delay'], duration,
json['score'], json['active'], country, json['isos'], json['ipv4'],
json['ipv6'], details)

@property
def mirrorlist_url(self) -> ParseResult:
"""Returns a mirror list URL."""
scheme, netloc, path, params, query, fragment = self.url

if not path.endswith('/'):
path += '/'

return ParseResult(
scheme, netloc, path + REPO_PATH, params, query, fragment)

@property
def mirrorlist_record(self) -> str:
"""Returns a mirror list record."""
return f'Server = {self.mirrorlist_url.geturl()}'

def get_sorting_key(self, order: Tuple[Sorting], now: datetime) -> Tuple:
"""Returns a tuple of the soring keys in the desired order."""
if not order:
return ()

key =

for option in order:
if option == Sorting.AGE:
if self.last_sync is None:
key.append(now - datetime.fromtimestamp(0))
else:
key.append(now - self.last_sync)
elif option == Sorting.RATE:
key.append(self.duration.sorting_key)
elif option == Sorting.COUNTRY:
key.append(self.country.sorting_key)
elif option == Sorting.SCORE:
key.append(float('inf') if self.score is None else self.score)
elif option == Sorting.DELAY:
key.append(float('inf') if self.delay is None else self.delay)
else:
raise ValueError(f'Invalid sorting option: {option}.')

return tuple(key)


class Filter(NamedTuple):
"""Represents a set of mirror filtering options."""

countries: FrozenSet[str]
protocols: FrozenSet[str]
max_age: timedelta
regex_incl: Pattern
regex_excl: Pattern

def match(self, mirror: Mirror) -> bool:
"""Matches the mirror."""
if self.countries is not None:
if not any(mirror.country.match(c) for c in self.countries):
return False

if self.protocols is not None:
if mirror.url.scheme.lower() not in self.protocols:
return False

if self.max_age is not None:
if mirror.last_sync + self.max_age < datetime.now():
return False

if self.regex_incl is not None:
if not self.regex_incl.fullmatch(mirror.url.geturl()):
return False

if self.regex_excl is not None:
if self.regex_excl.fullmatch(mirror.url.geturl()):
return False

return True


if __name__ == '__main__':
try:
exit(main())
except KeyboardInterrupt:
LOGGER.error('Aborted by user.')
exit(1)



Python version: 3.7










share|improve this question











$endgroup$








  • 2




    $begingroup$
    Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doing from __future__ import annotations and from re import Pattern .
    $endgroup$
    – Graipher
    Feb 28 at 11:20






  • 4




    $begingroup$
    "Any feedback is welcome." I can't say I'm overly fond of the name.
    $endgroup$
    – Tom Chadwin
    Feb 28 at 14:25








  • 3




    $begingroup$
    Naming is hard, but knowing that "speculum" is a bad option isn't.
    $endgroup$
    – Danikov
    Feb 28 at 14:25






  • 3




    $begingroup$
    Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
    $endgroup$
    – Richard Neumann
    Feb 28 at 14:40








  • 3




    $begingroup$
    If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
    $endgroup$
    – Eric Lippert
    Feb 28 at 18:54


















7












$begingroup$


After having had a look at reflector's code base I decided to write a new, more lightweight mirror list optimizer from scratch: speculum.



The script queries the Arch Linux mirror list JSON endpoint and performs filtering, sorting and limiting of mirrors according to the user's input.



Any feedback is welcome.



#! /usr/bin/env python3
#
# speculum - An Arch Linux mirror list updater.
#
# Copyright (C) 2019 Richard Neumann <mail at richard dash neumann period de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
"""Yet another Arch Linux mirrorlist optimizer."""

from __future__ import annotations
from argparse import ArgumentParser, Namespace
from datetime import datetime, timedelta
from enum import Enum
from json import load
from logging import INFO, basicConfig, getLogger
from os import linesep
from pathlib import Path
from re import error, compile, Pattern # pylint: disable=W0622
from sys import exit, stderr # pylint: disable=W0622
from typing import Callable, FrozenSet, Generator, Iterable, NamedTuple, Tuple
from urllib.request import urlopen
from urllib.parse import urlparse, ParseResult


MIRRORS_URL = 'https://www.archlinux.org/mirrors/status/json/'
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
REPO_PATH = '$repo/os/$arch'
LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
LOGGER = getLogger(__file__)


def strings(string: str) -> filter:
"""Splits strings by comma."""

return filter(None, map(lambda s: s.strip().lower(), string.split(',')))


def stringset(string: str) -> FrozenSet[str]:
"""Returns a tuple of strings form a comma separated list."""

return frozenset(strings(string))


def hours(string: str) -> timedelta:
"""Returns a timedelta of the respective
amount of hours from a string.
"""

return timedelta(hours=int(string))


def regex(string: str) -> Pattern:
"""Returns a regular expression."""

try:
return compile(string)
except error:
raise ValueError(str(error))


def sorting(string: str) -> Tuple[Sorting]:
"""Returns a tuple of sorting options
from comma-separated string values.
"""

return tuple(Sorting.from_string(string))


def posint(string: str) -> int:
"""Returns a positive integer."""

integer = int(string)

if integer > 0:
return integer

raise ValueError('Integer must be greater than zero.')


def get_json() -> dict:
"""Returns the mirrors from the respective URL."""

with urlopen(MIRRORS_URL) as response:
return load(response)


def get_mirrors() -> Generator[Mirror]:
"""Yields the respective mirrors."""

for json in get_json()['urls']:
yield Mirror.from_json(json)


def get_sorting_key(order: Tuple[Sorting]) -> Callable:
"""Returns a key function to sort mirrors."""

now = datetime.now()

def key(mirror):
return mirror.get_sorting_key(order, now)

return key


def limit(mirrors: Iterable[Mirror], maximum: int) -> Generator[Mirror]:
"""Limit the amount of mirrors."""

for count, mirror in enumerate(mirrors, start=1):
if maximum is not None and count > maximum:
break

yield mirror


def get_args() -> Namespace:
"""Returns the parsed arguments."""

parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', type=sorting, default=None, metavar='sorting',
help='sort by the respective properties')
parser.add_argument(
'--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', type=stringset, default=None, metavar='countries',
help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', type=stringset, default=None, metavar='protocols',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=hours, default=None, metavar='max_age',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--regex-incl', '-i', type=regex, default=None, metavar='regex_incl',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x', type=regex, default=None, metavar='regex_excl',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=posint, default=None, metavar='file',
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args()


def dump_mirrors(mirrors: Iterable[Mirror], path: Path) -> int:
"""Dumps the mirrors to the given path."""

mirrorlist = linesep.join(mirror.mirrorlist_record for mirror in mirrors)

try:
with path.open('w') as file:
file.write(mirrorlist)
except PermissionError as permission_error:
LOGGER.error(permission_error)
return 1

return 0


def print_mirrors(mirrors: Iterable[Mirror]) -> int:
"""Prints the mirrors to STDOUT."""

for mirror in mirrors:
try:
print(mirror.mirrorlist_record, flush=True)
except BrokenPipeError:
stderr.close()
return 0

return 0


def main() -> int:
"""Filters and sorts the mirrors."""

basicConfig(level=INFO, format=LOG_FORMAT)
args = get_args()
mirrors = get_mirrors()
filters = Filter(
args.countries, args.protocols, args.max_age, args.regex_incl,
args.regex_excl)
mirrors = filter(filters.match, mirrors)
key = get_sorting_key(args.sort)
mirrors = sorted(mirrors, key=key, reverse=args.reverse)
mirrors = limit(mirrors, args.limit)
mirrors = tuple(mirrors)

if not mirrors and args.limit != 0:
LOGGER.error('No mirrors found.')
return 1

if args.limit is not None and len(mirrors) < args.limit:
LOGGER.warning('Filter yielded less mirrors than specified limit.')

if args.output:
return dump_mirrors(mirrors, args.output)

return print_mirrors(mirrors)


class Sorting(Enum):
"""Sorting options."""

AGE = 'age'
RATE = 'rate'
COUNTRY = 'country'
SCORE = 'score'
DELAY = 'delay'

@classmethod
def from_string(cls, string: str) -> Generator[Sorting]:
"""Returns a tuple of sortings from the respective string."""
for option in strings(string):
yield cls(option)


class Duration(NamedTuple):
"""Represents the duration data on a mirror."""

average: float
stddev: float

@property
def sorting_key(self) -> Tuple[float]:
"""Returns a sorting key."""
average = float('inf') if self.average is None else self.average
stddev = float('inf') if self.stddev is None else self.stddev
return (average, stddev)


class Country(NamedTuple):
"""Represents country information."""

name: str
code: str

def match(self, string: str) -> bool:
"""Matches a country description."""
return string.lower() in {self.name.lower(), self.code.lower()}

@property
def sorting_key(self) -> Tuple[str]:
"""Returns a sorting key."""
name = '~' if self.name is None else self.name
code = '~' if self.code is None else self.code
return (name, code)


class Mirror(NamedTuple):
"""Represents information about a mirror."""

url: ParseResult
last_sync: datetime
completion: float
delay: int
duration: Duration
score: float
active: bool
country: Country
isos: bool
ipv4: bool
ipv6: bool
details: ParseResult

@classmethod
def from_json(cls, json: dict) -> Mirror:
"""Returns a new mirror from a JSON-ish dict."""
url = urlparse(json['url'])
last_sync = json['last_sync']

if last_sync is not None:
last_sync = datetime.strptime(last_sync, DATE_FORMAT).replace(
tzinfo=None)

duration_avg = json['duration_avg']
duration_stddev = json['duration_stddev']
duration = Duration(duration_avg, duration_stddev)
country = json['country']
country_code = json['country_code']
country = Country(country, country_code)
details = urlparse(json['details'])
return cls(
url, last_sync, json['completion_pct'], json['delay'], duration,
json['score'], json['active'], country, json['isos'], json['ipv4'],
json['ipv6'], details)

@property
def mirrorlist_url(self) -> ParseResult:
"""Returns a mirror list URL."""
scheme, netloc, path, params, query, fragment = self.url

if not path.endswith('/'):
path += '/'

return ParseResult(
scheme, netloc, path + REPO_PATH, params, query, fragment)

@property
def mirrorlist_record(self) -> str:
"""Returns a mirror list record."""
return f'Server = {self.mirrorlist_url.geturl()}'

def get_sorting_key(self, order: Tuple[Sorting], now: datetime) -> Tuple:
"""Returns a tuple of the soring keys in the desired order."""
if not order:
return ()

key =

for option in order:
if option == Sorting.AGE:
if self.last_sync is None:
key.append(now - datetime.fromtimestamp(0))
else:
key.append(now - self.last_sync)
elif option == Sorting.RATE:
key.append(self.duration.sorting_key)
elif option == Sorting.COUNTRY:
key.append(self.country.sorting_key)
elif option == Sorting.SCORE:
key.append(float('inf') if self.score is None else self.score)
elif option == Sorting.DELAY:
key.append(float('inf') if self.delay is None else self.delay)
else:
raise ValueError(f'Invalid sorting option: {option}.')

return tuple(key)


class Filter(NamedTuple):
"""Represents a set of mirror filtering options."""

countries: FrozenSet[str]
protocols: FrozenSet[str]
max_age: timedelta
regex_incl: Pattern
regex_excl: Pattern

def match(self, mirror: Mirror) -> bool:
"""Matches the mirror."""
if self.countries is not None:
if not any(mirror.country.match(c) for c in self.countries):
return False

if self.protocols is not None:
if mirror.url.scheme.lower() not in self.protocols:
return False

if self.max_age is not None:
if mirror.last_sync + self.max_age < datetime.now():
return False

if self.regex_incl is not None:
if not self.regex_incl.fullmatch(mirror.url.geturl()):
return False

if self.regex_excl is not None:
if self.regex_excl.fullmatch(mirror.url.geturl()):
return False

return True


if __name__ == '__main__':
try:
exit(main())
except KeyboardInterrupt:
LOGGER.error('Aborted by user.')
exit(1)



Python version: 3.7










share|improve this question











$endgroup$








  • 2




    $begingroup$
    Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doing from __future__ import annotations and from re import Pattern .
    $endgroup$
    – Graipher
    Feb 28 at 11:20






  • 4




    $begingroup$
    "Any feedback is welcome." I can't say I'm overly fond of the name.
    $endgroup$
    – Tom Chadwin
    Feb 28 at 14:25








  • 3




    $begingroup$
    Naming is hard, but knowing that "speculum" is a bad option isn't.
    $endgroup$
    – Danikov
    Feb 28 at 14:25






  • 3




    $begingroup$
    Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
    $endgroup$
    – Richard Neumann
    Feb 28 at 14:40








  • 3




    $begingroup$
    If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
    $endgroup$
    – Eric Lippert
    Feb 28 at 18:54
















7












7








7


1



$begingroup$


After having had a look at reflector's code base I decided to write a new, more lightweight mirror list optimizer from scratch: speculum.



The script queries the Arch Linux mirror list JSON endpoint and performs filtering, sorting and limiting of mirrors according to the user's input.



Any feedback is welcome.



#! /usr/bin/env python3
#
# speculum - An Arch Linux mirror list updater.
#
# Copyright (C) 2019 Richard Neumann <mail at richard dash neumann period de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
"""Yet another Arch Linux mirrorlist optimizer."""

from __future__ import annotations
from argparse import ArgumentParser, Namespace
from datetime import datetime, timedelta
from enum import Enum
from json import load
from logging import INFO, basicConfig, getLogger
from os import linesep
from pathlib import Path
from re import error, compile, Pattern # pylint: disable=W0622
from sys import exit, stderr # pylint: disable=W0622
from typing import Callable, FrozenSet, Generator, Iterable, NamedTuple, Tuple
from urllib.request import urlopen
from urllib.parse import urlparse, ParseResult


MIRRORS_URL = 'https://www.archlinux.org/mirrors/status/json/'
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
REPO_PATH = '$repo/os/$arch'
LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
LOGGER = getLogger(__file__)


def strings(string: str) -> filter:
"""Splits strings by comma."""

return filter(None, map(lambda s: s.strip().lower(), string.split(',')))


def stringset(string: str) -> FrozenSet[str]:
"""Returns a tuple of strings form a comma separated list."""

return frozenset(strings(string))


def hours(string: str) -> timedelta:
"""Returns a timedelta of the respective
amount of hours from a string.
"""

return timedelta(hours=int(string))


def regex(string: str) -> Pattern:
"""Returns a regular expression."""

try:
return compile(string)
except error:
raise ValueError(str(error))


def sorting(string: str) -> Tuple[Sorting]:
"""Returns a tuple of sorting options
from comma-separated string values.
"""

return tuple(Sorting.from_string(string))


def posint(string: str) -> int:
"""Returns a positive integer."""

integer = int(string)

if integer > 0:
return integer

raise ValueError('Integer must be greater than zero.')


def get_json() -> dict:
"""Returns the mirrors from the respective URL."""

with urlopen(MIRRORS_URL) as response:
return load(response)


def get_mirrors() -> Generator[Mirror]:
"""Yields the respective mirrors."""

for json in get_json()['urls']:
yield Mirror.from_json(json)


def get_sorting_key(order: Tuple[Sorting]) -> Callable:
"""Returns a key function to sort mirrors."""

now = datetime.now()

def key(mirror):
return mirror.get_sorting_key(order, now)

return key


def limit(mirrors: Iterable[Mirror], maximum: int) -> Generator[Mirror]:
"""Limit the amount of mirrors."""

for count, mirror in enumerate(mirrors, start=1):
if maximum is not None and count > maximum:
break

yield mirror


def get_args() -> Namespace:
"""Returns the parsed arguments."""

parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', type=sorting, default=None, metavar='sorting',
help='sort by the respective properties')
parser.add_argument(
'--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', type=stringset, default=None, metavar='countries',
help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', type=stringset, default=None, metavar='protocols',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=hours, default=None, metavar='max_age',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--regex-incl', '-i', type=regex, default=None, metavar='regex_incl',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x', type=regex, default=None, metavar='regex_excl',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=posint, default=None, metavar='file',
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args()


def dump_mirrors(mirrors: Iterable[Mirror], path: Path) -> int:
"""Dumps the mirrors to the given path."""

mirrorlist = linesep.join(mirror.mirrorlist_record for mirror in mirrors)

try:
with path.open('w') as file:
file.write(mirrorlist)
except PermissionError as permission_error:
LOGGER.error(permission_error)
return 1

return 0


def print_mirrors(mirrors: Iterable[Mirror]) -> int:
"""Prints the mirrors to STDOUT."""

for mirror in mirrors:
try:
print(mirror.mirrorlist_record, flush=True)
except BrokenPipeError:
stderr.close()
return 0

return 0


def main() -> int:
"""Filters and sorts the mirrors."""

basicConfig(level=INFO, format=LOG_FORMAT)
args = get_args()
mirrors = get_mirrors()
filters = Filter(
args.countries, args.protocols, args.max_age, args.regex_incl,
args.regex_excl)
mirrors = filter(filters.match, mirrors)
key = get_sorting_key(args.sort)
mirrors = sorted(mirrors, key=key, reverse=args.reverse)
mirrors = limit(mirrors, args.limit)
mirrors = tuple(mirrors)

if not mirrors and args.limit != 0:
LOGGER.error('No mirrors found.')
return 1

if args.limit is not None and len(mirrors) < args.limit:
LOGGER.warning('Filter yielded less mirrors than specified limit.')

if args.output:
return dump_mirrors(mirrors, args.output)

return print_mirrors(mirrors)


class Sorting(Enum):
"""Sorting options."""

AGE = 'age'
RATE = 'rate'
COUNTRY = 'country'
SCORE = 'score'
DELAY = 'delay'

@classmethod
def from_string(cls, string: str) -> Generator[Sorting]:
"""Returns a tuple of sortings from the respective string."""
for option in strings(string):
yield cls(option)


class Duration(NamedTuple):
"""Represents the duration data on a mirror."""

average: float
stddev: float

@property
def sorting_key(self) -> Tuple[float]:
"""Returns a sorting key."""
average = float('inf') if self.average is None else self.average
stddev = float('inf') if self.stddev is None else self.stddev
return (average, stddev)


class Country(NamedTuple):
"""Represents country information."""

name: str
code: str

def match(self, string: str) -> bool:
"""Matches a country description."""
return string.lower() in {self.name.lower(), self.code.lower()}

@property
def sorting_key(self) -> Tuple[str]:
"""Returns a sorting key."""
name = '~' if self.name is None else self.name
code = '~' if self.code is None else self.code
return (name, code)


class Mirror(NamedTuple):
"""Represents information about a mirror."""

url: ParseResult
last_sync: datetime
completion: float
delay: int
duration: Duration
score: float
active: bool
country: Country
isos: bool
ipv4: bool
ipv6: bool
details: ParseResult

@classmethod
def from_json(cls, json: dict) -> Mirror:
"""Returns a new mirror from a JSON-ish dict."""
url = urlparse(json['url'])
last_sync = json['last_sync']

if last_sync is not None:
last_sync = datetime.strptime(last_sync, DATE_FORMAT).replace(
tzinfo=None)

duration_avg = json['duration_avg']
duration_stddev = json['duration_stddev']
duration = Duration(duration_avg, duration_stddev)
country = json['country']
country_code = json['country_code']
country = Country(country, country_code)
details = urlparse(json['details'])
return cls(
url, last_sync, json['completion_pct'], json['delay'], duration,
json['score'], json['active'], country, json['isos'], json['ipv4'],
json['ipv6'], details)

@property
def mirrorlist_url(self) -> ParseResult:
"""Returns a mirror list URL."""
scheme, netloc, path, params, query, fragment = self.url

if not path.endswith('/'):
path += '/'

return ParseResult(
scheme, netloc, path + REPO_PATH, params, query, fragment)

@property
def mirrorlist_record(self) -> str:
"""Returns a mirror list record."""
return f'Server = {self.mirrorlist_url.geturl()}'

def get_sorting_key(self, order: Tuple[Sorting], now: datetime) -> Tuple:
"""Returns a tuple of the soring keys in the desired order."""
if not order:
return ()

key =

for option in order:
if option == Sorting.AGE:
if self.last_sync is None:
key.append(now - datetime.fromtimestamp(0))
else:
key.append(now - self.last_sync)
elif option == Sorting.RATE:
key.append(self.duration.sorting_key)
elif option == Sorting.COUNTRY:
key.append(self.country.sorting_key)
elif option == Sorting.SCORE:
key.append(float('inf') if self.score is None else self.score)
elif option == Sorting.DELAY:
key.append(float('inf') if self.delay is None else self.delay)
else:
raise ValueError(f'Invalid sorting option: {option}.')

return tuple(key)


class Filter(NamedTuple):
"""Represents a set of mirror filtering options."""

countries: FrozenSet[str]
protocols: FrozenSet[str]
max_age: timedelta
regex_incl: Pattern
regex_excl: Pattern

def match(self, mirror: Mirror) -> bool:
"""Matches the mirror."""
if self.countries is not None:
if not any(mirror.country.match(c) for c in self.countries):
return False

if self.protocols is not None:
if mirror.url.scheme.lower() not in self.protocols:
return False

if self.max_age is not None:
if mirror.last_sync + self.max_age < datetime.now():
return False

if self.regex_incl is not None:
if not self.regex_incl.fullmatch(mirror.url.geturl()):
return False

if self.regex_excl is not None:
if self.regex_excl.fullmatch(mirror.url.geturl()):
return False

return True


if __name__ == '__main__':
try:
exit(main())
except KeyboardInterrupt:
LOGGER.error('Aborted by user.')
exit(1)



Python version: 3.7










share|improve this question











$endgroup$




After having had a look at reflector's code base I decided to write a new, more lightweight mirror list optimizer from scratch: speculum.



The script queries the Arch Linux mirror list JSON endpoint and performs filtering, sorting and limiting of mirrors according to the user's input.



Any feedback is welcome.



#! /usr/bin/env python3
#
# speculum - An Arch Linux mirror list updater.
#
# Copyright (C) 2019 Richard Neumann <mail at richard dash neumann period de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
"""Yet another Arch Linux mirrorlist optimizer."""

from __future__ import annotations
from argparse import ArgumentParser, Namespace
from datetime import datetime, timedelta
from enum import Enum
from json import load
from logging import INFO, basicConfig, getLogger
from os import linesep
from pathlib import Path
from re import error, compile, Pattern # pylint: disable=W0622
from sys import exit, stderr # pylint: disable=W0622
from typing import Callable, FrozenSet, Generator, Iterable, NamedTuple, Tuple
from urllib.request import urlopen
from urllib.parse import urlparse, ParseResult


MIRRORS_URL = 'https://www.archlinux.org/mirrors/status/json/'
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
REPO_PATH = '$repo/os/$arch'
LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
LOGGER = getLogger(__file__)


def strings(string: str) -> filter:
"""Splits strings by comma."""

return filter(None, map(lambda s: s.strip().lower(), string.split(',')))


def stringset(string: str) -> FrozenSet[str]:
"""Returns a tuple of strings form a comma separated list."""

return frozenset(strings(string))


def hours(string: str) -> timedelta:
"""Returns a timedelta of the respective
amount of hours from a string.
"""

return timedelta(hours=int(string))


def regex(string: str) -> Pattern:
"""Returns a regular expression."""

try:
return compile(string)
except error:
raise ValueError(str(error))


def sorting(string: str) -> Tuple[Sorting]:
"""Returns a tuple of sorting options
from comma-separated string values.
"""

return tuple(Sorting.from_string(string))


def posint(string: str) -> int:
"""Returns a positive integer."""

integer = int(string)

if integer > 0:
return integer

raise ValueError('Integer must be greater than zero.')


def get_json() -> dict:
"""Returns the mirrors from the respective URL."""

with urlopen(MIRRORS_URL) as response:
return load(response)


def get_mirrors() -> Generator[Mirror]:
"""Yields the respective mirrors."""

for json in get_json()['urls']:
yield Mirror.from_json(json)


def get_sorting_key(order: Tuple[Sorting]) -> Callable:
"""Returns a key function to sort mirrors."""

now = datetime.now()

def key(mirror):
return mirror.get_sorting_key(order, now)

return key


def limit(mirrors: Iterable[Mirror], maximum: int) -> Generator[Mirror]:
"""Limit the amount of mirrors."""

for count, mirror in enumerate(mirrors, start=1):
if maximum is not None and count > maximum:
break

yield mirror


def get_args() -> Namespace:
"""Returns the parsed arguments."""

parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', type=sorting, default=None, metavar='sorting',
help='sort by the respective properties')
parser.add_argument(
'--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', type=stringset, default=None, metavar='countries',
help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', type=stringset, default=None, metavar='protocols',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=hours, default=None, metavar='max_age',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--regex-incl', '-i', type=regex, default=None, metavar='regex_incl',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x', type=regex, default=None, metavar='regex_excl',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=posint, default=None, metavar='file',
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args()


def dump_mirrors(mirrors: Iterable[Mirror], path: Path) -> int:
"""Dumps the mirrors to the given path."""

mirrorlist = linesep.join(mirror.mirrorlist_record for mirror in mirrors)

try:
with path.open('w') as file:
file.write(mirrorlist)
except PermissionError as permission_error:
LOGGER.error(permission_error)
return 1

return 0


def print_mirrors(mirrors: Iterable[Mirror]) -> int:
"""Prints the mirrors to STDOUT."""

for mirror in mirrors:
try:
print(mirror.mirrorlist_record, flush=True)
except BrokenPipeError:
stderr.close()
return 0

return 0


def main() -> int:
"""Filters and sorts the mirrors."""

basicConfig(level=INFO, format=LOG_FORMAT)
args = get_args()
mirrors = get_mirrors()
filters = Filter(
args.countries, args.protocols, args.max_age, args.regex_incl,
args.regex_excl)
mirrors = filter(filters.match, mirrors)
key = get_sorting_key(args.sort)
mirrors = sorted(mirrors, key=key, reverse=args.reverse)
mirrors = limit(mirrors, args.limit)
mirrors = tuple(mirrors)

if not mirrors and args.limit != 0:
LOGGER.error('No mirrors found.')
return 1

if args.limit is not None and len(mirrors) < args.limit:
LOGGER.warning('Filter yielded less mirrors than specified limit.')

if args.output:
return dump_mirrors(mirrors, args.output)

return print_mirrors(mirrors)


class Sorting(Enum):
"""Sorting options."""

AGE = 'age'
RATE = 'rate'
COUNTRY = 'country'
SCORE = 'score'
DELAY = 'delay'

@classmethod
def from_string(cls, string: str) -> Generator[Sorting]:
"""Returns a tuple of sortings from the respective string."""
for option in strings(string):
yield cls(option)


class Duration(NamedTuple):
"""Represents the duration data on a mirror."""

average: float
stddev: float

@property
def sorting_key(self) -> Tuple[float]:
"""Returns a sorting key."""
average = float('inf') if self.average is None else self.average
stddev = float('inf') if self.stddev is None else self.stddev
return (average, stddev)


class Country(NamedTuple):
"""Represents country information."""

name: str
code: str

def match(self, string: str) -> bool:
"""Matches a country description."""
return string.lower() in {self.name.lower(), self.code.lower()}

@property
def sorting_key(self) -> Tuple[str]:
"""Returns a sorting key."""
name = '~' if self.name is None else self.name
code = '~' if self.code is None else self.code
return (name, code)


class Mirror(NamedTuple):
"""Represents information about a mirror."""

url: ParseResult
last_sync: datetime
completion: float
delay: int
duration: Duration
score: float
active: bool
country: Country
isos: bool
ipv4: bool
ipv6: bool
details: ParseResult

@classmethod
def from_json(cls, json: dict) -> Mirror:
"""Returns a new mirror from a JSON-ish dict."""
url = urlparse(json['url'])
last_sync = json['last_sync']

if last_sync is not None:
last_sync = datetime.strptime(last_sync, DATE_FORMAT).replace(
tzinfo=None)

duration_avg = json['duration_avg']
duration_stddev = json['duration_stddev']
duration = Duration(duration_avg, duration_stddev)
country = json['country']
country_code = json['country_code']
country = Country(country, country_code)
details = urlparse(json['details'])
return cls(
url, last_sync, json['completion_pct'], json['delay'], duration,
json['score'], json['active'], country, json['isos'], json['ipv4'],
json['ipv6'], details)

@property
def mirrorlist_url(self) -> ParseResult:
"""Returns a mirror list URL."""
scheme, netloc, path, params, query, fragment = self.url

if not path.endswith('/'):
path += '/'

return ParseResult(
scheme, netloc, path + REPO_PATH, params, query, fragment)

@property
def mirrorlist_record(self) -> str:
"""Returns a mirror list record."""
return f'Server = {self.mirrorlist_url.geturl()}'

def get_sorting_key(self, order: Tuple[Sorting], now: datetime) -> Tuple:
"""Returns a tuple of the soring keys in the desired order."""
if not order:
return ()

key =

for option in order:
if option == Sorting.AGE:
if self.last_sync is None:
key.append(now - datetime.fromtimestamp(0))
else:
key.append(now - self.last_sync)
elif option == Sorting.RATE:
key.append(self.duration.sorting_key)
elif option == Sorting.COUNTRY:
key.append(self.country.sorting_key)
elif option == Sorting.SCORE:
key.append(float('inf') if self.score is None else self.score)
elif option == Sorting.DELAY:
key.append(float('inf') if self.delay is None else self.delay)
else:
raise ValueError(f'Invalid sorting option: {option}.')

return tuple(key)


class Filter(NamedTuple):
"""Represents a set of mirror filtering options."""

countries: FrozenSet[str]
protocols: FrozenSet[str]
max_age: timedelta
regex_incl: Pattern
regex_excl: Pattern

def match(self, mirror: Mirror) -> bool:
"""Matches the mirror."""
if self.countries is not None:
if not any(mirror.country.match(c) for c in self.countries):
return False

if self.protocols is not None:
if mirror.url.scheme.lower() not in self.protocols:
return False

if self.max_age is not None:
if mirror.last_sync + self.max_age < datetime.now():
return False

if self.regex_incl is not None:
if not self.regex_incl.fullmatch(mirror.url.geturl()):
return False

if self.regex_excl is not None:
if self.regex_excl.fullmatch(mirror.url.geturl()):
return False

return True


if __name__ == '__main__':
try:
exit(main())
except KeyboardInterrupt:
LOGGER.error('Aborted by user.')
exit(1)



Python version: 3.7







python python-3.x






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Feb 28 at 11:28







Richard Neumann

















asked Feb 28 at 11:04









Richard NeumannRichard Neumann

1,948724




1,948724








  • 2




    $begingroup$
    Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doing from __future__ import annotations and from re import Pattern .
    $endgroup$
    – Graipher
    Feb 28 at 11:20






  • 4




    $begingroup$
    "Any feedback is welcome." I can't say I'm overly fond of the name.
    $endgroup$
    – Tom Chadwin
    Feb 28 at 14:25








  • 3




    $begingroup$
    Naming is hard, but knowing that "speculum" is a bad option isn't.
    $endgroup$
    – Danikov
    Feb 28 at 14:25






  • 3




    $begingroup$
    Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
    $endgroup$
    – Richard Neumann
    Feb 28 at 14:40








  • 3




    $begingroup$
    If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
    $endgroup$
    – Eric Lippert
    Feb 28 at 18:54
















  • 2




    $begingroup$
    Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doing from __future__ import annotations and from re import Pattern .
    $endgroup$
    – Graipher
    Feb 28 at 11:20






  • 4




    $begingroup$
    "Any feedback is welcome." I can't say I'm overly fond of the name.
    $endgroup$
    – Tom Chadwin
    Feb 28 at 14:25








  • 3




    $begingroup$
    Naming is hard, but knowing that "speculum" is a bad option isn't.
    $endgroup$
    – Danikov
    Feb 28 at 14:25






  • 3




    $begingroup$
    Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
    $endgroup$
    – Richard Neumann
    Feb 28 at 14:40








  • 3




    $begingroup$
    If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
    $endgroup$
    – Eric Lippert
    Feb 28 at 18:54










2




2




$begingroup$
Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doing from __future__ import annotations and from re import Pattern .
$endgroup$
– Graipher
Feb 28 at 11:20




$begingroup$
Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doing from __future__ import annotations and from re import Pattern .
$endgroup$
– Graipher
Feb 28 at 11:20




4




4




$begingroup$
"Any feedback is welcome." I can't say I'm overly fond of the name.
$endgroup$
– Tom Chadwin
Feb 28 at 14:25






$begingroup$
"Any feedback is welcome." I can't say I'm overly fond of the name.
$endgroup$
– Tom Chadwin
Feb 28 at 14:25






3




3




$begingroup$
Naming is hard, but knowing that "speculum" is a bad option isn't.
$endgroup$
– Danikov
Feb 28 at 14:25




$begingroup$
Naming is hard, but knowing that "speculum" is a bad option isn't.
$endgroup$
– Danikov
Feb 28 at 14:25




3




3




$begingroup$
Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
$endgroup$
– Richard Neumann
Feb 28 at 14:40






$begingroup$
Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
$endgroup$
– Richard Neumann
Feb 28 at 14:40






3




3




$begingroup$
If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
$endgroup$
– Eric Lippert
Feb 28 at 18:54






$begingroup$
If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
$endgroup$
– Eric Lippert
Feb 28 at 18:54












1 Answer
1






active

oldest

votes


















6












$begingroup$

The classic Python file structure is this:



import this

CONSTANT = None

class Foo:
def methods(self):
pass

def function():
pass

def main():
pass

if __name__ == "__main__":
main()


While you do have all of those elements, by putting the classes all the way at the end you had me quite confused.





In your regex function you are just printing the name of the exception, not the exception text. So you will always just get back ValueError: <class 'sre_constants.error'> instead of a helpful description. Just use as:



def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error as e:
raise ValueError(str(e))




For the command line interface I would use the functionalities argparse supplies for multiple arguments, instead of parsing it yourself:



def get_args(args=None):
"""Returns the parsed arguments."""

parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', nargs="+", help='sort by the respective properties')
parser.add_argument('--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', nargs="+", help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', nargs="+",
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=int, default=None,
help='match mirrors updated more recently than this')
parser.add_argument(
'--regex-incl', '-i',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=int,
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args(args)


Also the default value of an unspecified optional argument already is None, so you don't need to specify it every time and an option like --regex-incl will automatically be stored in regex_incl, so no need for that either.



And finally, if you give your function an argument which you pass on to the parsing and default it to None, you can test this function by passing a list of strings.





I think you have slightly over-engineered this. Instead I would use a simple pandas.DataFrame, which can easily be filtered and sorted.



import pandas as pd
from datetime import datetime

args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])


mirrors = get_json()
df = pd.DataFrame(mirrors['urls'])
df['age'] = (datetime.now() - pd.to_datetime(df.last_sync)).dt.total_seconds() / 3600
df = df[df.active]


For the filtering you can either hardcode it similar to how you are currently doing:



if args.max_age is not None:
df = df[df.age <= args.max_age]
if args.countries is not None:
df = df[df.country.isin(args.countries) | df.country_code.isin(args.countries)]
if args.protocols is not None:
df = df[df.protocol.isin(args.protocols)]


Or you could accept a query string which you pass along to the dataframe directly:



args.query = "age < 24 and country_code == 'US'"
df = df.query(args.query)


Or even support both.



Regex patterns are also supported:



if args.regex_incl is not None:
df = df[df.url.str.match(args.regex_incl)]
if args.regex_excl is not None:
df = df[~df.url.str.match(args.regex_excl)]


Sorting by arbitrary column names is also quite easy:



df = df.sort_values(args.sort, ascending=not args.reverse)


And so is limiting:



df = df.head(args.limit)


Even with the explanations in between, this code is way less to read than your module. And a lot more readable IMO. It does however add an additional dependency, since pandas is not in the Python Standard Library.





Of course you could just implement it using just standard library tools:



import argparse
from datetime import datetime
from functools import partial
from operator import itemgetter
from pathlib import Path
import re
import requests

DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

def get_args(args=None):
"""Returns the parsed arguments."""
...

def get_mirrors(url):
res = requests.get(url)
res.raise_for_status()
return res.json()

def filter_mirrors(args, mirror):
if not mirror['active']:
return False
try:
mirror['age'] = (datetime.now() - datetime.strptime(mirror['last_sync'], DATE_FORMAT)).total_seconds() / 3600
except TypeError:
return False
if args.max_age is not None and mirror['age'] > args.max_age:
return False
if args.countries is not None:
if mirror['country'] not in args.countries and mirror['country_code'] not in args.countries:
return False
if args.protocols is not None:
if mirror['protocol'] not in args.protocols:
return False
if args.regex_incl is not None:
match = re.match(args.regex_incl, mirror['url'])
if match is None:
return False
if args.regex_excl is not None:
match = re.match(args.regex_excl, mirror['url'])
if match is not None:
return False
return True

def match_mirrors(mirrors, args):
mirrors = mirrors['urls']
mirrors = filter(partial(filter_mirrors, args), mirrors)
mirrors = sorted(mirrors, key=itemgetter(*args.sort), reverse=args.reverse)
return [mirror['url'] for mirror in mirrors[:args.limit]]


if __name__ == "__main__":
mirrors = get_mirrors('https://www.archlinux.org/mirrors/status/json/')
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])

print(match_mirrors(mirrors, args))


And this would still be vastly shorter and more readable than your code...






share|improve this answer











$endgroup$









  • 2




    $begingroup$
    @RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
    $endgroup$
    – Graipher
    Feb 28 at 11:51










  • $begingroup$
    @RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation using pandas with all of your functionalities (and a lot less code).
    $endgroup$
    – Graipher
    Feb 28 at 13:30












Your Answer





StackExchange.ifUsing("editor", function () {
return StackExchange.using("mathjaxEditing", function () {
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
});
});
}, "mathjax-editing");

StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "196"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f214459%2fspeculum-a-simple-straightforward-arch-linux-mirror-list-optimizer%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









6












$begingroup$

The classic Python file structure is this:



import this

CONSTANT = None

class Foo:
def methods(self):
pass

def function():
pass

def main():
pass

if __name__ == "__main__":
main()


While you do have all of those elements, by putting the classes all the way at the end you had me quite confused.





In your regex function you are just printing the name of the exception, not the exception text. So you will always just get back ValueError: <class 'sre_constants.error'> instead of a helpful description. Just use as:



def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error as e:
raise ValueError(str(e))




For the command line interface I would use the functionalities argparse supplies for multiple arguments, instead of parsing it yourself:



def get_args(args=None):
"""Returns the parsed arguments."""

parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', nargs="+", help='sort by the respective properties')
parser.add_argument('--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', nargs="+", help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', nargs="+",
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=int, default=None,
help='match mirrors updated more recently than this')
parser.add_argument(
'--regex-incl', '-i',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=int,
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args(args)


Also the default value of an unspecified optional argument already is None, so you don't need to specify it every time and an option like --regex-incl will automatically be stored in regex_incl, so no need for that either.



And finally, if you give your function an argument which you pass on to the parsing and default it to None, you can test this function by passing a list of strings.





I think you have slightly over-engineered this. Instead I would use a simple pandas.DataFrame, which can easily be filtered and sorted.



import pandas as pd
from datetime import datetime

args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])


mirrors = get_json()
df = pd.DataFrame(mirrors['urls'])
df['age'] = (datetime.now() - pd.to_datetime(df.last_sync)).dt.total_seconds() / 3600
df = df[df.active]


For the filtering you can either hardcode it similar to how you are currently doing:



if args.max_age is not None:
df = df[df.age <= args.max_age]
if args.countries is not None:
df = df[df.country.isin(args.countries) | df.country_code.isin(args.countries)]
if args.protocols is not None:
df = df[df.protocol.isin(args.protocols)]


Or you could accept a query string which you pass along to the dataframe directly:



args.query = "age < 24 and country_code == 'US'"
df = df.query(args.query)


Or even support both.



Regex patterns are also supported:



if args.regex_incl is not None:
df = df[df.url.str.match(args.regex_incl)]
if args.regex_excl is not None:
df = df[~df.url.str.match(args.regex_excl)]


Sorting by arbitrary column names is also quite easy:



df = df.sort_values(args.sort, ascending=not args.reverse)


And so is limiting:



df = df.head(args.limit)


Even with the explanations in between, this code is way less to read than your module. And a lot more readable IMO. It does however add an additional dependency, since pandas is not in the Python Standard Library.





Of course you could just implement it using just standard library tools:



import argparse
from datetime import datetime
from functools import partial
from operator import itemgetter
from pathlib import Path
import re
import requests

DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

def get_args(args=None):
"""Returns the parsed arguments."""
...

def get_mirrors(url):
res = requests.get(url)
res.raise_for_status()
return res.json()

def filter_mirrors(args, mirror):
if not mirror['active']:
return False
try:
mirror['age'] = (datetime.now() - datetime.strptime(mirror['last_sync'], DATE_FORMAT)).total_seconds() / 3600
except TypeError:
return False
if args.max_age is not None and mirror['age'] > args.max_age:
return False
if args.countries is not None:
if mirror['country'] not in args.countries and mirror['country_code'] not in args.countries:
return False
if args.protocols is not None:
if mirror['protocol'] not in args.protocols:
return False
if args.regex_incl is not None:
match = re.match(args.regex_incl, mirror['url'])
if match is None:
return False
if args.regex_excl is not None:
match = re.match(args.regex_excl, mirror['url'])
if match is not None:
return False
return True

def match_mirrors(mirrors, args):
mirrors = mirrors['urls']
mirrors = filter(partial(filter_mirrors, args), mirrors)
mirrors = sorted(mirrors, key=itemgetter(*args.sort), reverse=args.reverse)
return [mirror['url'] for mirror in mirrors[:args.limit]]


if __name__ == "__main__":
mirrors = get_mirrors('https://www.archlinux.org/mirrors/status/json/')
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])

print(match_mirrors(mirrors, args))


And this would still be vastly shorter and more readable than your code...






share|improve this answer











$endgroup$









  • 2




    $begingroup$
    @RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
    $endgroup$
    – Graipher
    Feb 28 at 11:51










  • $begingroup$
    @RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation using pandas with all of your functionalities (and a lot less code).
    $endgroup$
    – Graipher
    Feb 28 at 13:30
















6












$begingroup$

The classic Python file structure is this:



import this

CONSTANT = None

class Foo:
def methods(self):
pass

def function():
pass

def main():
pass

if __name__ == "__main__":
main()


While you do have all of those elements, by putting the classes all the way at the end you had me quite confused.





In your regex function you are just printing the name of the exception, not the exception text. So you will always just get back ValueError: <class 'sre_constants.error'> instead of a helpful description. Just use as:



def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error as e:
raise ValueError(str(e))




For the command line interface I would use the functionalities argparse supplies for multiple arguments, instead of parsing it yourself:



def get_args(args=None):
"""Returns the parsed arguments."""

parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', nargs="+", help='sort by the respective properties')
parser.add_argument('--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', nargs="+", help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', nargs="+",
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=int, default=None,
help='match mirrors updated more recently than this')
parser.add_argument(
'--regex-incl', '-i',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=int,
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args(args)


Also the default value of an unspecified optional argument already is None, so you don't need to specify it every time and an option like --regex-incl will automatically be stored in regex_incl, so no need for that either.



And finally, if you give your function an argument which you pass on to the parsing and default it to None, you can test this function by passing a list of strings.





I think you have slightly over-engineered this. Instead I would use a simple pandas.DataFrame, which can easily be filtered and sorted.



import pandas as pd
from datetime import datetime

args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])


mirrors = get_json()
df = pd.DataFrame(mirrors['urls'])
df['age'] = (datetime.now() - pd.to_datetime(df.last_sync)).dt.total_seconds() / 3600
df = df[df.active]


For the filtering you can either hardcode it similar to how you are currently doing:



if args.max_age is not None:
df = df[df.age <= args.max_age]
if args.countries is not None:
df = df[df.country.isin(args.countries) | df.country_code.isin(args.countries)]
if args.protocols is not None:
df = df[df.protocol.isin(args.protocols)]


Or you could accept a query string which you pass along to the dataframe directly:



args.query = "age < 24 and country_code == 'US'"
df = df.query(args.query)


Or even support both.



Regex patterns are also supported:



if args.regex_incl is not None:
df = df[df.url.str.match(args.regex_incl)]
if args.regex_excl is not None:
df = df[~df.url.str.match(args.regex_excl)]


Sorting by arbitrary column names is also quite easy:



df = df.sort_values(args.sort, ascending=not args.reverse)


And so is limiting:



df = df.head(args.limit)


Even with the explanations in between, this code is way less to read than your module. And a lot more readable IMO. It does however add an additional dependency, since pandas is not in the Python Standard Library.





Of course you could just implement it using just standard library tools:



import argparse
from datetime import datetime
from functools import partial
from operator import itemgetter
from pathlib import Path
import re
import requests

DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

def get_args(args=None):
"""Returns the parsed arguments."""
...

def get_mirrors(url):
res = requests.get(url)
res.raise_for_status()
return res.json()

def filter_mirrors(args, mirror):
if not mirror['active']:
return False
try:
mirror['age'] = (datetime.now() - datetime.strptime(mirror['last_sync'], DATE_FORMAT)).total_seconds() / 3600
except TypeError:
return False
if args.max_age is not None and mirror['age'] > args.max_age:
return False
if args.countries is not None:
if mirror['country'] not in args.countries and mirror['country_code'] not in args.countries:
return False
if args.protocols is not None:
if mirror['protocol'] not in args.protocols:
return False
if args.regex_incl is not None:
match = re.match(args.regex_incl, mirror['url'])
if match is None:
return False
if args.regex_excl is not None:
match = re.match(args.regex_excl, mirror['url'])
if match is not None:
return False
return True

def match_mirrors(mirrors, args):
mirrors = mirrors['urls']
mirrors = filter(partial(filter_mirrors, args), mirrors)
mirrors = sorted(mirrors, key=itemgetter(*args.sort), reverse=args.reverse)
return [mirror['url'] for mirror in mirrors[:args.limit]]


if __name__ == "__main__":
mirrors = get_mirrors('https://www.archlinux.org/mirrors/status/json/')
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])

print(match_mirrors(mirrors, args))


And this would still be vastly shorter and more readable than your code...






share|improve this answer











$endgroup$









  • 2




    $begingroup$
    @RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
    $endgroup$
    – Graipher
    Feb 28 at 11:51










  • $begingroup$
    @RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation using pandas with all of your functionalities (and a lot less code).
    $endgroup$
    – Graipher
    Feb 28 at 13:30














6












6








6





$begingroup$

The classic Python file structure is this:



import this

CONSTANT = None

class Foo:
def methods(self):
pass

def function():
pass

def main():
pass

if __name__ == "__main__":
main()


While you do have all of those elements, by putting the classes all the way at the end you had me quite confused.





In your regex function you are just printing the name of the exception, not the exception text. So you will always just get back ValueError: <class 'sre_constants.error'> instead of a helpful description. Just use as:



def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error as e:
raise ValueError(str(e))




For the command line interface I would use the functionalities argparse supplies for multiple arguments, instead of parsing it yourself:



def get_args(args=None):
"""Returns the parsed arguments."""

parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', nargs="+", help='sort by the respective properties')
parser.add_argument('--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', nargs="+", help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', nargs="+",
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=int, default=None,
help='match mirrors updated more recently than this')
parser.add_argument(
'--regex-incl', '-i',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=int,
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args(args)


Also the default value of an unspecified optional argument already is None, so you don't need to specify it every time and an option like --regex-incl will automatically be stored in regex_incl, so no need for that either.



And finally, if you give your function an argument which you pass on to the parsing and default it to None, you can test this function by passing a list of strings.





I think you have slightly over-engineered this. Instead I would use a simple pandas.DataFrame, which can easily be filtered and sorted.



import pandas as pd
from datetime import datetime

args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])


mirrors = get_json()
df = pd.DataFrame(mirrors['urls'])
df['age'] = (datetime.now() - pd.to_datetime(df.last_sync)).dt.total_seconds() / 3600
df = df[df.active]


For the filtering you can either hardcode it similar to how you are currently doing:



if args.max_age is not None:
df = df[df.age <= args.max_age]
if args.countries is not None:
df = df[df.country.isin(args.countries) | df.country_code.isin(args.countries)]
if args.protocols is not None:
df = df[df.protocol.isin(args.protocols)]


Or you could accept a query string which you pass along to the dataframe directly:



args.query = "age < 24 and country_code == 'US'"
df = df.query(args.query)


Or even support both.



Regex patterns are also supported:



if args.regex_incl is not None:
df = df[df.url.str.match(args.regex_incl)]
if args.regex_excl is not None:
df = df[~df.url.str.match(args.regex_excl)]


Sorting by arbitrary column names is also quite easy:



df = df.sort_values(args.sort, ascending=not args.reverse)


And so is limiting:



df = df.head(args.limit)


Even with the explanations in between, this code is way less to read than your module. And a lot more readable IMO. It does however add an additional dependency, since pandas is not in the Python Standard Library.





Of course you could just implement it using just standard library tools:



import argparse
from datetime import datetime
from functools import partial
from operator import itemgetter
from pathlib import Path
import re
import requests

DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

def get_args(args=None):
"""Returns the parsed arguments."""
...

def get_mirrors(url):
res = requests.get(url)
res.raise_for_status()
return res.json()

def filter_mirrors(args, mirror):
if not mirror['active']:
return False
try:
mirror['age'] = (datetime.now() - datetime.strptime(mirror['last_sync'], DATE_FORMAT)).total_seconds() / 3600
except TypeError:
return False
if args.max_age is not None and mirror['age'] > args.max_age:
return False
if args.countries is not None:
if mirror['country'] not in args.countries and mirror['country_code'] not in args.countries:
return False
if args.protocols is not None:
if mirror['protocol'] not in args.protocols:
return False
if args.regex_incl is not None:
match = re.match(args.regex_incl, mirror['url'])
if match is None:
return False
if args.regex_excl is not None:
match = re.match(args.regex_excl, mirror['url'])
if match is not None:
return False
return True

def match_mirrors(mirrors, args):
mirrors = mirrors['urls']
mirrors = filter(partial(filter_mirrors, args), mirrors)
mirrors = sorted(mirrors, key=itemgetter(*args.sort), reverse=args.reverse)
return [mirror['url'] for mirror in mirrors[:args.limit]]


if __name__ == "__main__":
mirrors = get_mirrors('https://www.archlinux.org/mirrors/status/json/')
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])

print(match_mirrors(mirrors, args))


And this would still be vastly shorter and more readable than your code...






share|improve this answer











$endgroup$



The classic Python file structure is this:



import this

CONSTANT = None

class Foo:
def methods(self):
pass

def function():
pass

def main():
pass

if __name__ == "__main__":
main()


While you do have all of those elements, by putting the classes all the way at the end you had me quite confused.





In your regex function you are just printing the name of the exception, not the exception text. So you will always just get back ValueError: <class 'sre_constants.error'> instead of a helpful description. Just use as:



def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error as e:
raise ValueError(str(e))




For the command line interface I would use the functionalities argparse supplies for multiple arguments, instead of parsing it yourself:



def get_args(args=None):
"""Returns the parsed arguments."""

parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', nargs="+", help='sort by the respective properties')
parser.add_argument('--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', nargs="+", help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', nargs="+",
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=int, default=None,
help='match mirrors updated more recently than this')
parser.add_argument(
'--regex-incl', '-i',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=int,
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args(args)


Also the default value of an unspecified optional argument already is None, so you don't need to specify it every time and an option like --regex-incl will automatically be stored in regex_incl, so no need for that either.



And finally, if you give your function an argument which you pass on to the parsing and default it to None, you can test this function by passing a list of strings.





I think you have slightly over-engineered this. Instead I would use a simple pandas.DataFrame, which can easily be filtered and sorted.



import pandas as pd
from datetime import datetime

args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])


mirrors = get_json()
df = pd.DataFrame(mirrors['urls'])
df['age'] = (datetime.now() - pd.to_datetime(df.last_sync)).dt.total_seconds() / 3600
df = df[df.active]


For the filtering you can either hardcode it similar to how you are currently doing:



if args.max_age is not None:
df = df[df.age <= args.max_age]
if args.countries is not None:
df = df[df.country.isin(args.countries) | df.country_code.isin(args.countries)]
if args.protocols is not None:
df = df[df.protocol.isin(args.protocols)]


Or you could accept a query string which you pass along to the dataframe directly:



args.query = "age < 24 and country_code == 'US'"
df = df.query(args.query)


Or even support both.



Regex patterns are also supported:



if args.regex_incl is not None:
df = df[df.url.str.match(args.regex_incl)]
if args.regex_excl is not None:
df = df[~df.url.str.match(args.regex_excl)]


Sorting by arbitrary column names is also quite easy:



df = df.sort_values(args.sort, ascending=not args.reverse)


And so is limiting:



df = df.head(args.limit)


Even with the explanations in between, this code is way less to read than your module. And a lot more readable IMO. It does however add an additional dependency, since pandas is not in the Python Standard Library.





Of course you could just implement it using just standard library tools:



import argparse
from datetime import datetime
from functools import partial
from operator import itemgetter
from pathlib import Path
import re
import requests

DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

def get_args(args=None):
"""Returns the parsed arguments."""
...

def get_mirrors(url):
res = requests.get(url)
res.raise_for_status()
return res.json()

def filter_mirrors(args, mirror):
if not mirror['active']:
return False
try:
mirror['age'] = (datetime.now() - datetime.strptime(mirror['last_sync'], DATE_FORMAT)).total_seconds() / 3600
except TypeError:
return False
if args.max_age is not None and mirror['age'] > args.max_age:
return False
if args.countries is not None:
if mirror['country'] not in args.countries and mirror['country_code'] not in args.countries:
return False
if args.protocols is not None:
if mirror['protocol'] not in args.protocols:
return False
if args.regex_incl is not None:
match = re.match(args.regex_incl, mirror['url'])
if match is None:
return False
if args.regex_excl is not None:
match = re.match(args.regex_excl, mirror['url'])
if match is not None:
return False
return True

def match_mirrors(mirrors, args):
mirrors = mirrors['urls']
mirrors = filter(partial(filter_mirrors, args), mirrors)
mirrors = sorted(mirrors, key=itemgetter(*args.sort), reverse=args.reverse)
return [mirror['url'] for mirror in mirrors[:args.limit]]


if __name__ == "__main__":
mirrors = get_mirrors('https://www.archlinux.org/mirrors/status/json/')
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])

print(match_mirrors(mirrors, args))


And this would still be vastly shorter and more readable than your code...







share|improve this answer














share|improve this answer



share|improve this answer








edited Feb 28 at 15:55

























answered Feb 28 at 11:34









GraipherGraipher

26.6k54092




26.6k54092








  • 2




    $begingroup$
    @RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
    $endgroup$
    – Graipher
    Feb 28 at 11:51










  • $begingroup$
    @RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation using pandas with all of your functionalities (and a lot less code).
    $endgroup$
    – Graipher
    Feb 28 at 13:30














  • 2




    $begingroup$
    @RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
    $endgroup$
    – Graipher
    Feb 28 at 11:51










  • $begingroup$
    @RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation using pandas with all of your functionalities (and a lot less code).
    $endgroup$
    – Graipher
    Feb 28 at 13:30








2




2




$begingroup$
@RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
$endgroup$
– Graipher
Feb 28 at 11:51




$begingroup$
@RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
$endgroup$
– Graipher
Feb 28 at 11:51












$begingroup$
@RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation using pandas with all of your functionalities (and a lot less code).
$endgroup$
– Graipher
Feb 28 at 13:30




$begingroup$
@RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation using pandas with all of your functionalities (and a lot less code).
$endgroup$
– Graipher
Feb 28 at 13:30


















draft saved

draft discarded




















































Thanks for contributing an answer to Code Review Stack Exchange!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


Use MathJax to format equations. MathJax reference.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f214459%2fspeculum-a-simple-straightforward-arch-linux-mirror-list-optimizer%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Probability when a professor distributes a quiz and homework assignment to a class of n students.

Aardman Animations

Are they similar matrix