speculum - A simple, straightforward Arch Linux mirror list optimizer
$begingroup$
After having had a look at reflector's code base I decided to write a new, more lightweight mirror list optimizer from scratch: speculum.
The script queries the Arch Linux mirror list JSON endpoint and performs filtering, sorting and limiting of mirrors according to the user's input.
Any feedback is welcome.
#! /usr/bin/env python3
#
# speculum - An Arch Linux mirror list updater.
#
# Copyright (C) 2019 Richard Neumann <mail at richard dash neumann period de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
"""Yet another Arch Linux mirrorlist optimizer."""
from __future__ import annotations
from argparse import ArgumentParser, Namespace
from datetime import datetime, timedelta
from enum import Enum
from json import load
from logging import INFO, basicConfig, getLogger
from os import linesep
from pathlib import Path
from re import error, compile, Pattern # pylint: disable=W0622
from sys import exit, stderr # pylint: disable=W0622
from typing import Callable, FrozenSet, Generator, Iterable, NamedTuple, Tuple
from urllib.request import urlopen
from urllib.parse import urlparse, ParseResult
MIRRORS_URL = 'https://www.archlinux.org/mirrors/status/json/'
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
REPO_PATH = '$repo/os/$arch'
LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
LOGGER = getLogger(__file__)
def strings(string: str) -> filter:
"""Splits strings by comma."""
return filter(None, map(lambda s: s.strip().lower(), string.split(',')))
def stringset(string: str) -> FrozenSet[str]:
"""Returns a tuple of strings form a comma separated list."""
return frozenset(strings(string))
def hours(string: str) -> timedelta:
"""Returns a timedelta of the respective
amount of hours from a string.
"""
return timedelta(hours=int(string))
def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error:
raise ValueError(str(error))
def sorting(string: str) -> Tuple[Sorting]:
"""Returns a tuple of sorting options
from comma-separated string values.
"""
return tuple(Sorting.from_string(string))
def posint(string: str) -> int:
"""Returns a positive integer."""
integer = int(string)
if integer > 0:
return integer
raise ValueError('Integer must be greater than zero.')
def get_json() -> dict:
"""Returns the mirrors from the respective URL."""
with urlopen(MIRRORS_URL) as response:
return load(response)
def get_mirrors() -> Generator[Mirror]:
"""Yields the respective mirrors."""
for json in get_json()['urls']:
yield Mirror.from_json(json)
def get_sorting_key(order: Tuple[Sorting]) -> Callable:
"""Returns a key function to sort mirrors."""
now = datetime.now()
def key(mirror):
return mirror.get_sorting_key(order, now)
return key
def limit(mirrors: Iterable[Mirror], maximum: int) -> Generator[Mirror]:
"""Limit the amount of mirrors."""
for count, mirror in enumerate(mirrors, start=1):
if maximum is not None and count > maximum:
break
yield mirror
def get_args() -> Namespace:
"""Returns the parsed arguments."""
parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', type=sorting, default=None, metavar='sorting',
help='sort by the respective properties')
parser.add_argument(
'--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', type=stringset, default=None, metavar='countries',
help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', type=stringset, default=None, metavar='protocols',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=hours, default=None, metavar='max_age',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--regex-incl', '-i', type=regex, default=None, metavar='regex_incl',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x', type=regex, default=None, metavar='regex_excl',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=posint, default=None, metavar='file',
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args()
def dump_mirrors(mirrors: Iterable[Mirror], path: Path) -> int:
"""Dumps the mirrors to the given path."""
mirrorlist = linesep.join(mirror.mirrorlist_record for mirror in mirrors)
try:
with path.open('w') as file:
file.write(mirrorlist)
except PermissionError as permission_error:
LOGGER.error(permission_error)
return 1
return 0
def print_mirrors(mirrors: Iterable[Mirror]) -> int:
"""Prints the mirrors to STDOUT."""
for mirror in mirrors:
try:
print(mirror.mirrorlist_record, flush=True)
except BrokenPipeError:
stderr.close()
return 0
return 0
def main() -> int:
"""Filters and sorts the mirrors."""
basicConfig(level=INFO, format=LOG_FORMAT)
args = get_args()
mirrors = get_mirrors()
filters = Filter(
args.countries, args.protocols, args.max_age, args.regex_incl,
args.regex_excl)
mirrors = filter(filters.match, mirrors)
key = get_sorting_key(args.sort)
mirrors = sorted(mirrors, key=key, reverse=args.reverse)
mirrors = limit(mirrors, args.limit)
mirrors = tuple(mirrors)
if not mirrors and args.limit != 0:
LOGGER.error('No mirrors found.')
return 1
if args.limit is not None and len(mirrors) < args.limit:
LOGGER.warning('Filter yielded less mirrors than specified limit.')
if args.output:
return dump_mirrors(mirrors, args.output)
return print_mirrors(mirrors)
class Sorting(Enum):
"""Sorting options."""
AGE = 'age'
RATE = 'rate'
COUNTRY = 'country'
SCORE = 'score'
DELAY = 'delay'
@classmethod
def from_string(cls, string: str) -> Generator[Sorting]:
"""Returns a tuple of sortings from the respective string."""
for option in strings(string):
yield cls(option)
class Duration(NamedTuple):
"""Represents the duration data on a mirror."""
average: float
stddev: float
@property
def sorting_key(self) -> Tuple[float]:
"""Returns a sorting key."""
average = float('inf') if self.average is None else self.average
stddev = float('inf') if self.stddev is None else self.stddev
return (average, stddev)
class Country(NamedTuple):
"""Represents country information."""
name: str
code: str
def match(self, string: str) -> bool:
"""Matches a country description."""
return string.lower() in {self.name.lower(), self.code.lower()}
@property
def sorting_key(self) -> Tuple[str]:
"""Returns a sorting key."""
name = '~' if self.name is None else self.name
code = '~' if self.code is None else self.code
return (name, code)
class Mirror(NamedTuple):
"""Represents information about a mirror."""
url: ParseResult
last_sync: datetime
completion: float
delay: int
duration: Duration
score: float
active: bool
country: Country
isos: bool
ipv4: bool
ipv6: bool
details: ParseResult
@classmethod
def from_json(cls, json: dict) -> Mirror:
"""Returns a new mirror from a JSON-ish dict."""
url = urlparse(json['url'])
last_sync = json['last_sync']
if last_sync is not None:
last_sync = datetime.strptime(last_sync, DATE_FORMAT).replace(
tzinfo=None)
duration_avg = json['duration_avg']
duration_stddev = json['duration_stddev']
duration = Duration(duration_avg, duration_stddev)
country = json['country']
country_code = json['country_code']
country = Country(country, country_code)
details = urlparse(json['details'])
return cls(
url, last_sync, json['completion_pct'], json['delay'], duration,
json['score'], json['active'], country, json['isos'], json['ipv4'],
json['ipv6'], details)
@property
def mirrorlist_url(self) -> ParseResult:
"""Returns a mirror list URL."""
scheme, netloc, path, params, query, fragment = self.url
if not path.endswith('/'):
path += '/'
return ParseResult(
scheme, netloc, path + REPO_PATH, params, query, fragment)
@property
def mirrorlist_record(self) -> str:
"""Returns a mirror list record."""
return f'Server = {self.mirrorlist_url.geturl()}'
def get_sorting_key(self, order: Tuple[Sorting], now: datetime) -> Tuple:
"""Returns a tuple of the soring keys in the desired order."""
if not order:
return ()
key =
for option in order:
if option == Sorting.AGE:
if self.last_sync is None:
key.append(now - datetime.fromtimestamp(0))
else:
key.append(now - self.last_sync)
elif option == Sorting.RATE:
key.append(self.duration.sorting_key)
elif option == Sorting.COUNTRY:
key.append(self.country.sorting_key)
elif option == Sorting.SCORE:
key.append(float('inf') if self.score is None else self.score)
elif option == Sorting.DELAY:
key.append(float('inf') if self.delay is None else self.delay)
else:
raise ValueError(f'Invalid sorting option: {option}.')
return tuple(key)
class Filter(NamedTuple):
"""Represents a set of mirror filtering options."""
countries: FrozenSet[str]
protocols: FrozenSet[str]
max_age: timedelta
regex_incl: Pattern
regex_excl: Pattern
def match(self, mirror: Mirror) -> bool:
"""Matches the mirror."""
if self.countries is not None:
if not any(mirror.country.match(c) for c in self.countries):
return False
if self.protocols is not None:
if mirror.url.scheme.lower() not in self.protocols:
return False
if self.max_age is not None:
if mirror.last_sync + self.max_age < datetime.now():
return False
if self.regex_incl is not None:
if not self.regex_incl.fullmatch(mirror.url.geturl()):
return False
if self.regex_excl is not None:
if self.regex_excl.fullmatch(mirror.url.geturl()):
return False
return True
if __name__ == '__main__':
try:
exit(main())
except KeyboardInterrupt:
LOGGER.error('Aborted by user.')
exit(1)
Python version: 3.7
python python-3.x
$endgroup$
|
show 1 more comment
$begingroup$
After having had a look at reflector's code base I decided to write a new, more lightweight mirror list optimizer from scratch: speculum.
The script queries the Arch Linux mirror list JSON endpoint and performs filtering, sorting and limiting of mirrors according to the user's input.
Any feedback is welcome.
#! /usr/bin/env python3
#
# speculum - An Arch Linux mirror list updater.
#
# Copyright (C) 2019 Richard Neumann <mail at richard dash neumann period de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
"""Yet another Arch Linux mirrorlist optimizer."""
from __future__ import annotations
from argparse import ArgumentParser, Namespace
from datetime import datetime, timedelta
from enum import Enum
from json import load
from logging import INFO, basicConfig, getLogger
from os import linesep
from pathlib import Path
from re import error, compile, Pattern # pylint: disable=W0622
from sys import exit, stderr # pylint: disable=W0622
from typing import Callable, FrozenSet, Generator, Iterable, NamedTuple, Tuple
from urllib.request import urlopen
from urllib.parse import urlparse, ParseResult
MIRRORS_URL = 'https://www.archlinux.org/mirrors/status/json/'
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
REPO_PATH = '$repo/os/$arch'
LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
LOGGER = getLogger(__file__)
def strings(string: str) -> filter:
"""Splits strings by comma."""
return filter(None, map(lambda s: s.strip().lower(), string.split(',')))
def stringset(string: str) -> FrozenSet[str]:
"""Returns a tuple of strings form a comma separated list."""
return frozenset(strings(string))
def hours(string: str) -> timedelta:
"""Returns a timedelta of the respective
amount of hours from a string.
"""
return timedelta(hours=int(string))
def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error:
raise ValueError(str(error))
def sorting(string: str) -> Tuple[Sorting]:
"""Returns a tuple of sorting options
from comma-separated string values.
"""
return tuple(Sorting.from_string(string))
def posint(string: str) -> int:
"""Returns a positive integer."""
integer = int(string)
if integer > 0:
return integer
raise ValueError('Integer must be greater than zero.')
def get_json() -> dict:
"""Returns the mirrors from the respective URL."""
with urlopen(MIRRORS_URL) as response:
return load(response)
def get_mirrors() -> Generator[Mirror]:
"""Yields the respective mirrors."""
for json in get_json()['urls']:
yield Mirror.from_json(json)
def get_sorting_key(order: Tuple[Sorting]) -> Callable:
"""Returns a key function to sort mirrors."""
now = datetime.now()
def key(mirror):
return mirror.get_sorting_key(order, now)
return key
def limit(mirrors: Iterable[Mirror], maximum: int) -> Generator[Mirror]:
"""Limit the amount of mirrors."""
for count, mirror in enumerate(mirrors, start=1):
if maximum is not None and count > maximum:
break
yield mirror
def get_args() -> Namespace:
"""Returns the parsed arguments."""
parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', type=sorting, default=None, metavar='sorting',
help='sort by the respective properties')
parser.add_argument(
'--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', type=stringset, default=None, metavar='countries',
help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', type=stringset, default=None, metavar='protocols',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=hours, default=None, metavar='max_age',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--regex-incl', '-i', type=regex, default=None, metavar='regex_incl',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x', type=regex, default=None, metavar='regex_excl',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=posint, default=None, metavar='file',
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args()
def dump_mirrors(mirrors: Iterable[Mirror], path: Path) -> int:
"""Dumps the mirrors to the given path."""
mirrorlist = linesep.join(mirror.mirrorlist_record for mirror in mirrors)
try:
with path.open('w') as file:
file.write(mirrorlist)
except PermissionError as permission_error:
LOGGER.error(permission_error)
return 1
return 0
def print_mirrors(mirrors: Iterable[Mirror]) -> int:
"""Prints the mirrors to STDOUT."""
for mirror in mirrors:
try:
print(mirror.mirrorlist_record, flush=True)
except BrokenPipeError:
stderr.close()
return 0
return 0
def main() -> int:
"""Filters and sorts the mirrors."""
basicConfig(level=INFO, format=LOG_FORMAT)
args = get_args()
mirrors = get_mirrors()
filters = Filter(
args.countries, args.protocols, args.max_age, args.regex_incl,
args.regex_excl)
mirrors = filter(filters.match, mirrors)
key = get_sorting_key(args.sort)
mirrors = sorted(mirrors, key=key, reverse=args.reverse)
mirrors = limit(mirrors, args.limit)
mirrors = tuple(mirrors)
if not mirrors and args.limit != 0:
LOGGER.error('No mirrors found.')
return 1
if args.limit is not None and len(mirrors) < args.limit:
LOGGER.warning('Filter yielded less mirrors than specified limit.')
if args.output:
return dump_mirrors(mirrors, args.output)
return print_mirrors(mirrors)
class Sorting(Enum):
"""Sorting options."""
AGE = 'age'
RATE = 'rate'
COUNTRY = 'country'
SCORE = 'score'
DELAY = 'delay'
@classmethod
def from_string(cls, string: str) -> Generator[Sorting]:
"""Returns a tuple of sortings from the respective string."""
for option in strings(string):
yield cls(option)
class Duration(NamedTuple):
"""Represents the duration data on a mirror."""
average: float
stddev: float
@property
def sorting_key(self) -> Tuple[float]:
"""Returns a sorting key."""
average = float('inf') if self.average is None else self.average
stddev = float('inf') if self.stddev is None else self.stddev
return (average, stddev)
class Country(NamedTuple):
"""Represents country information."""
name: str
code: str
def match(self, string: str) -> bool:
"""Matches a country description."""
return string.lower() in {self.name.lower(), self.code.lower()}
@property
def sorting_key(self) -> Tuple[str]:
"""Returns a sorting key."""
name = '~' if self.name is None else self.name
code = '~' if self.code is None else self.code
return (name, code)
class Mirror(NamedTuple):
"""Represents information about a mirror."""
url: ParseResult
last_sync: datetime
completion: float
delay: int
duration: Duration
score: float
active: bool
country: Country
isos: bool
ipv4: bool
ipv6: bool
details: ParseResult
@classmethod
def from_json(cls, json: dict) -> Mirror:
"""Returns a new mirror from a JSON-ish dict."""
url = urlparse(json['url'])
last_sync = json['last_sync']
if last_sync is not None:
last_sync = datetime.strptime(last_sync, DATE_FORMAT).replace(
tzinfo=None)
duration_avg = json['duration_avg']
duration_stddev = json['duration_stddev']
duration = Duration(duration_avg, duration_stddev)
country = json['country']
country_code = json['country_code']
country = Country(country, country_code)
details = urlparse(json['details'])
return cls(
url, last_sync, json['completion_pct'], json['delay'], duration,
json['score'], json['active'], country, json['isos'], json['ipv4'],
json['ipv6'], details)
@property
def mirrorlist_url(self) -> ParseResult:
"""Returns a mirror list URL."""
scheme, netloc, path, params, query, fragment = self.url
if not path.endswith('/'):
path += '/'
return ParseResult(
scheme, netloc, path + REPO_PATH, params, query, fragment)
@property
def mirrorlist_record(self) -> str:
"""Returns a mirror list record."""
return f'Server = {self.mirrorlist_url.geturl()}'
def get_sorting_key(self, order: Tuple[Sorting], now: datetime) -> Tuple:
"""Returns a tuple of the soring keys in the desired order."""
if not order:
return ()
key =
for option in order:
if option == Sorting.AGE:
if self.last_sync is None:
key.append(now - datetime.fromtimestamp(0))
else:
key.append(now - self.last_sync)
elif option == Sorting.RATE:
key.append(self.duration.sorting_key)
elif option == Sorting.COUNTRY:
key.append(self.country.sorting_key)
elif option == Sorting.SCORE:
key.append(float('inf') if self.score is None else self.score)
elif option == Sorting.DELAY:
key.append(float('inf') if self.delay is None else self.delay)
else:
raise ValueError(f'Invalid sorting option: {option}.')
return tuple(key)
class Filter(NamedTuple):
"""Represents a set of mirror filtering options."""
countries: FrozenSet[str]
protocols: FrozenSet[str]
max_age: timedelta
regex_incl: Pattern
regex_excl: Pattern
def match(self, mirror: Mirror) -> bool:
"""Matches the mirror."""
if self.countries is not None:
if not any(mirror.country.match(c) for c in self.countries):
return False
if self.protocols is not None:
if mirror.url.scheme.lower() not in self.protocols:
return False
if self.max_age is not None:
if mirror.last_sync + self.max_age < datetime.now():
return False
if self.regex_incl is not None:
if not self.regex_incl.fullmatch(mirror.url.geturl()):
return False
if self.regex_excl is not None:
if self.regex_excl.fullmatch(mirror.url.geturl()):
return False
return True
if __name__ == '__main__':
try:
exit(main())
except KeyboardInterrupt:
LOGGER.error('Aborted by user.')
exit(1)
Python version: 3.7
python python-3.x
$endgroup$
2
$begingroup$
Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doingfrom __future__ import annotations
andfrom re import Pattern
.
$endgroup$
– Graipher
Feb 28 at 11:20
4
$begingroup$
"Any feedback is welcome." I can't say I'm overly fond of the name.
$endgroup$
– Tom Chadwin
Feb 28 at 14:25
3
$begingroup$
Naming is hard, but knowing that "speculum" is a bad option isn't.
$endgroup$
– Danikov
Feb 28 at 14:25
3
$begingroup$
Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
$endgroup$
– Richard Neumann
Feb 28 at 14:40
3
$begingroup$
If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
$endgroup$
– Eric Lippert
Feb 28 at 18:54
|
show 1 more comment
$begingroup$
After having had a look at reflector's code base I decided to write a new, more lightweight mirror list optimizer from scratch: speculum.
The script queries the Arch Linux mirror list JSON endpoint and performs filtering, sorting and limiting of mirrors according to the user's input.
Any feedback is welcome.
#! /usr/bin/env python3
#
# speculum - An Arch Linux mirror list updater.
#
# Copyright (C) 2019 Richard Neumann <mail at richard dash neumann period de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
"""Yet another Arch Linux mirrorlist optimizer."""
from __future__ import annotations
from argparse import ArgumentParser, Namespace
from datetime import datetime, timedelta
from enum import Enum
from json import load
from logging import INFO, basicConfig, getLogger
from os import linesep
from pathlib import Path
from re import error, compile, Pattern # pylint: disable=W0622
from sys import exit, stderr # pylint: disable=W0622
from typing import Callable, FrozenSet, Generator, Iterable, NamedTuple, Tuple
from urllib.request import urlopen
from urllib.parse import urlparse, ParseResult
MIRRORS_URL = 'https://www.archlinux.org/mirrors/status/json/'
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
REPO_PATH = '$repo/os/$arch'
LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
LOGGER = getLogger(__file__)
def strings(string: str) -> filter:
"""Splits strings by comma."""
return filter(None, map(lambda s: s.strip().lower(), string.split(',')))
def stringset(string: str) -> FrozenSet[str]:
"""Returns a tuple of strings form a comma separated list."""
return frozenset(strings(string))
def hours(string: str) -> timedelta:
"""Returns a timedelta of the respective
amount of hours from a string.
"""
return timedelta(hours=int(string))
def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error:
raise ValueError(str(error))
def sorting(string: str) -> Tuple[Sorting]:
"""Returns a tuple of sorting options
from comma-separated string values.
"""
return tuple(Sorting.from_string(string))
def posint(string: str) -> int:
"""Returns a positive integer."""
integer = int(string)
if integer > 0:
return integer
raise ValueError('Integer must be greater than zero.')
def get_json() -> dict:
"""Returns the mirrors from the respective URL."""
with urlopen(MIRRORS_URL) as response:
return load(response)
def get_mirrors() -> Generator[Mirror]:
"""Yields the respective mirrors."""
for json in get_json()['urls']:
yield Mirror.from_json(json)
def get_sorting_key(order: Tuple[Sorting]) -> Callable:
"""Returns a key function to sort mirrors."""
now = datetime.now()
def key(mirror):
return mirror.get_sorting_key(order, now)
return key
def limit(mirrors: Iterable[Mirror], maximum: int) -> Generator[Mirror]:
"""Limit the amount of mirrors."""
for count, mirror in enumerate(mirrors, start=1):
if maximum is not None and count > maximum:
break
yield mirror
def get_args() -> Namespace:
"""Returns the parsed arguments."""
parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', type=sorting, default=None, metavar='sorting',
help='sort by the respective properties')
parser.add_argument(
'--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', type=stringset, default=None, metavar='countries',
help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', type=stringset, default=None, metavar='protocols',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=hours, default=None, metavar='max_age',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--regex-incl', '-i', type=regex, default=None, metavar='regex_incl',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x', type=regex, default=None, metavar='regex_excl',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=posint, default=None, metavar='file',
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args()
def dump_mirrors(mirrors: Iterable[Mirror], path: Path) -> int:
"""Dumps the mirrors to the given path."""
mirrorlist = linesep.join(mirror.mirrorlist_record for mirror in mirrors)
try:
with path.open('w') as file:
file.write(mirrorlist)
except PermissionError as permission_error:
LOGGER.error(permission_error)
return 1
return 0
def print_mirrors(mirrors: Iterable[Mirror]) -> int:
"""Prints the mirrors to STDOUT."""
for mirror in mirrors:
try:
print(mirror.mirrorlist_record, flush=True)
except BrokenPipeError:
stderr.close()
return 0
return 0
def main() -> int:
"""Filters and sorts the mirrors."""
basicConfig(level=INFO, format=LOG_FORMAT)
args = get_args()
mirrors = get_mirrors()
filters = Filter(
args.countries, args.protocols, args.max_age, args.regex_incl,
args.regex_excl)
mirrors = filter(filters.match, mirrors)
key = get_sorting_key(args.sort)
mirrors = sorted(mirrors, key=key, reverse=args.reverse)
mirrors = limit(mirrors, args.limit)
mirrors = tuple(mirrors)
if not mirrors and args.limit != 0:
LOGGER.error('No mirrors found.')
return 1
if args.limit is not None and len(mirrors) < args.limit:
LOGGER.warning('Filter yielded less mirrors than specified limit.')
if args.output:
return dump_mirrors(mirrors, args.output)
return print_mirrors(mirrors)
class Sorting(Enum):
"""Sorting options."""
AGE = 'age'
RATE = 'rate'
COUNTRY = 'country'
SCORE = 'score'
DELAY = 'delay'
@classmethod
def from_string(cls, string: str) -> Generator[Sorting]:
"""Returns a tuple of sortings from the respective string."""
for option in strings(string):
yield cls(option)
class Duration(NamedTuple):
"""Represents the duration data on a mirror."""
average: float
stddev: float
@property
def sorting_key(self) -> Tuple[float]:
"""Returns a sorting key."""
average = float('inf') if self.average is None else self.average
stddev = float('inf') if self.stddev is None else self.stddev
return (average, stddev)
class Country(NamedTuple):
"""Represents country information."""
name: str
code: str
def match(self, string: str) -> bool:
"""Matches a country description."""
return string.lower() in {self.name.lower(), self.code.lower()}
@property
def sorting_key(self) -> Tuple[str]:
"""Returns a sorting key."""
name = '~' if self.name is None else self.name
code = '~' if self.code is None else self.code
return (name, code)
class Mirror(NamedTuple):
"""Represents information about a mirror."""
url: ParseResult
last_sync: datetime
completion: float
delay: int
duration: Duration
score: float
active: bool
country: Country
isos: bool
ipv4: bool
ipv6: bool
details: ParseResult
@classmethod
def from_json(cls, json: dict) -> Mirror:
"""Returns a new mirror from a JSON-ish dict."""
url = urlparse(json['url'])
last_sync = json['last_sync']
if last_sync is not None:
last_sync = datetime.strptime(last_sync, DATE_FORMAT).replace(
tzinfo=None)
duration_avg = json['duration_avg']
duration_stddev = json['duration_stddev']
duration = Duration(duration_avg, duration_stddev)
country = json['country']
country_code = json['country_code']
country = Country(country, country_code)
details = urlparse(json['details'])
return cls(
url, last_sync, json['completion_pct'], json['delay'], duration,
json['score'], json['active'], country, json['isos'], json['ipv4'],
json['ipv6'], details)
@property
def mirrorlist_url(self) -> ParseResult:
"""Returns a mirror list URL."""
scheme, netloc, path, params, query, fragment = self.url
if not path.endswith('/'):
path += '/'
return ParseResult(
scheme, netloc, path + REPO_PATH, params, query, fragment)
@property
def mirrorlist_record(self) -> str:
"""Returns a mirror list record."""
return f'Server = {self.mirrorlist_url.geturl()}'
def get_sorting_key(self, order: Tuple[Sorting], now: datetime) -> Tuple:
"""Returns a tuple of the soring keys in the desired order."""
if not order:
return ()
key =
for option in order:
if option == Sorting.AGE:
if self.last_sync is None:
key.append(now - datetime.fromtimestamp(0))
else:
key.append(now - self.last_sync)
elif option == Sorting.RATE:
key.append(self.duration.sorting_key)
elif option == Sorting.COUNTRY:
key.append(self.country.sorting_key)
elif option == Sorting.SCORE:
key.append(float('inf') if self.score is None else self.score)
elif option == Sorting.DELAY:
key.append(float('inf') if self.delay is None else self.delay)
else:
raise ValueError(f'Invalid sorting option: {option}.')
return tuple(key)
class Filter(NamedTuple):
"""Represents a set of mirror filtering options."""
countries: FrozenSet[str]
protocols: FrozenSet[str]
max_age: timedelta
regex_incl: Pattern
regex_excl: Pattern
def match(self, mirror: Mirror) -> bool:
"""Matches the mirror."""
if self.countries is not None:
if not any(mirror.country.match(c) for c in self.countries):
return False
if self.protocols is not None:
if mirror.url.scheme.lower() not in self.protocols:
return False
if self.max_age is not None:
if mirror.last_sync + self.max_age < datetime.now():
return False
if self.regex_incl is not None:
if not self.regex_incl.fullmatch(mirror.url.geturl()):
return False
if self.regex_excl is not None:
if self.regex_excl.fullmatch(mirror.url.geturl()):
return False
return True
if __name__ == '__main__':
try:
exit(main())
except KeyboardInterrupt:
LOGGER.error('Aborted by user.')
exit(1)
Python version: 3.7
python python-3.x
$endgroup$
After having had a look at reflector's code base I decided to write a new, more lightweight mirror list optimizer from scratch: speculum.
The script queries the Arch Linux mirror list JSON endpoint and performs filtering, sorting and limiting of mirrors according to the user's input.
Any feedback is welcome.
#! /usr/bin/env python3
#
# speculum - An Arch Linux mirror list updater.
#
# Copyright (C) 2019 Richard Neumann <mail at richard dash neumann period de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
"""Yet another Arch Linux mirrorlist optimizer."""
from __future__ import annotations
from argparse import ArgumentParser, Namespace
from datetime import datetime, timedelta
from enum import Enum
from json import load
from logging import INFO, basicConfig, getLogger
from os import linesep
from pathlib import Path
from re import error, compile, Pattern # pylint: disable=W0622
from sys import exit, stderr # pylint: disable=W0622
from typing import Callable, FrozenSet, Generator, Iterable, NamedTuple, Tuple
from urllib.request import urlopen
from urllib.parse import urlparse, ParseResult
MIRRORS_URL = 'https://www.archlinux.org/mirrors/status/json/'
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
REPO_PATH = '$repo/os/$arch'
LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
LOGGER = getLogger(__file__)
def strings(string: str) -> filter:
"""Splits strings by comma."""
return filter(None, map(lambda s: s.strip().lower(), string.split(',')))
def stringset(string: str) -> FrozenSet[str]:
"""Returns a tuple of strings form a comma separated list."""
return frozenset(strings(string))
def hours(string: str) -> timedelta:
"""Returns a timedelta of the respective
amount of hours from a string.
"""
return timedelta(hours=int(string))
def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error:
raise ValueError(str(error))
def sorting(string: str) -> Tuple[Sorting]:
"""Returns a tuple of sorting options
from comma-separated string values.
"""
return tuple(Sorting.from_string(string))
def posint(string: str) -> int:
"""Returns a positive integer."""
integer = int(string)
if integer > 0:
return integer
raise ValueError('Integer must be greater than zero.')
def get_json() -> dict:
"""Returns the mirrors from the respective URL."""
with urlopen(MIRRORS_URL) as response:
return load(response)
def get_mirrors() -> Generator[Mirror]:
"""Yields the respective mirrors."""
for json in get_json()['urls']:
yield Mirror.from_json(json)
def get_sorting_key(order: Tuple[Sorting]) -> Callable:
"""Returns a key function to sort mirrors."""
now = datetime.now()
def key(mirror):
return mirror.get_sorting_key(order, now)
return key
def limit(mirrors: Iterable[Mirror], maximum: int) -> Generator[Mirror]:
"""Limit the amount of mirrors."""
for count, mirror in enumerate(mirrors, start=1):
if maximum is not None and count > maximum:
break
yield mirror
def get_args() -> Namespace:
"""Returns the parsed arguments."""
parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', type=sorting, default=None, metavar='sorting',
help='sort by the respective properties')
parser.add_argument(
'--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', type=stringset, default=None, metavar='countries',
help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', type=stringset, default=None, metavar='protocols',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=hours, default=None, metavar='max_age',
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--regex-incl', '-i', type=regex, default=None, metavar='regex_incl',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x', type=regex, default=None, metavar='regex_excl',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=posint, default=None, metavar='file',
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args()
def dump_mirrors(mirrors: Iterable[Mirror], path: Path) -> int:
"""Dumps the mirrors to the given path."""
mirrorlist = linesep.join(mirror.mirrorlist_record for mirror in mirrors)
try:
with path.open('w') as file:
file.write(mirrorlist)
except PermissionError as permission_error:
LOGGER.error(permission_error)
return 1
return 0
def print_mirrors(mirrors: Iterable[Mirror]) -> int:
"""Prints the mirrors to STDOUT."""
for mirror in mirrors:
try:
print(mirror.mirrorlist_record, flush=True)
except BrokenPipeError:
stderr.close()
return 0
return 0
def main() -> int:
"""Filters and sorts the mirrors."""
basicConfig(level=INFO, format=LOG_FORMAT)
args = get_args()
mirrors = get_mirrors()
filters = Filter(
args.countries, args.protocols, args.max_age, args.regex_incl,
args.regex_excl)
mirrors = filter(filters.match, mirrors)
key = get_sorting_key(args.sort)
mirrors = sorted(mirrors, key=key, reverse=args.reverse)
mirrors = limit(mirrors, args.limit)
mirrors = tuple(mirrors)
if not mirrors and args.limit != 0:
LOGGER.error('No mirrors found.')
return 1
if args.limit is not None and len(mirrors) < args.limit:
LOGGER.warning('Filter yielded less mirrors than specified limit.')
if args.output:
return dump_mirrors(mirrors, args.output)
return print_mirrors(mirrors)
class Sorting(Enum):
"""Sorting options."""
AGE = 'age'
RATE = 'rate'
COUNTRY = 'country'
SCORE = 'score'
DELAY = 'delay'
@classmethod
def from_string(cls, string: str) -> Generator[Sorting]:
"""Returns a tuple of sortings from the respective string."""
for option in strings(string):
yield cls(option)
class Duration(NamedTuple):
"""Represents the duration data on a mirror."""
average: float
stddev: float
@property
def sorting_key(self) -> Tuple[float]:
"""Returns a sorting key."""
average = float('inf') if self.average is None else self.average
stddev = float('inf') if self.stddev is None else self.stddev
return (average, stddev)
class Country(NamedTuple):
"""Represents country information."""
name: str
code: str
def match(self, string: str) -> bool:
"""Matches a country description."""
return string.lower() in {self.name.lower(), self.code.lower()}
@property
def sorting_key(self) -> Tuple[str]:
"""Returns a sorting key."""
name = '~' if self.name is None else self.name
code = '~' if self.code is None else self.code
return (name, code)
class Mirror(NamedTuple):
"""Represents information about a mirror."""
url: ParseResult
last_sync: datetime
completion: float
delay: int
duration: Duration
score: float
active: bool
country: Country
isos: bool
ipv4: bool
ipv6: bool
details: ParseResult
@classmethod
def from_json(cls, json: dict) -> Mirror:
"""Returns a new mirror from a JSON-ish dict."""
url = urlparse(json['url'])
last_sync = json['last_sync']
if last_sync is not None:
last_sync = datetime.strptime(last_sync, DATE_FORMAT).replace(
tzinfo=None)
duration_avg = json['duration_avg']
duration_stddev = json['duration_stddev']
duration = Duration(duration_avg, duration_stddev)
country = json['country']
country_code = json['country_code']
country = Country(country, country_code)
details = urlparse(json['details'])
return cls(
url, last_sync, json['completion_pct'], json['delay'], duration,
json['score'], json['active'], country, json['isos'], json['ipv4'],
json['ipv6'], details)
@property
def mirrorlist_url(self) -> ParseResult:
"""Returns a mirror list URL."""
scheme, netloc, path, params, query, fragment = self.url
if not path.endswith('/'):
path += '/'
return ParseResult(
scheme, netloc, path + REPO_PATH, params, query, fragment)
@property
def mirrorlist_record(self) -> str:
"""Returns a mirror list record."""
return f'Server = {self.mirrorlist_url.geturl()}'
def get_sorting_key(self, order: Tuple[Sorting], now: datetime) -> Tuple:
"""Returns a tuple of the soring keys in the desired order."""
if not order:
return ()
key =
for option in order:
if option == Sorting.AGE:
if self.last_sync is None:
key.append(now - datetime.fromtimestamp(0))
else:
key.append(now - self.last_sync)
elif option == Sorting.RATE:
key.append(self.duration.sorting_key)
elif option == Sorting.COUNTRY:
key.append(self.country.sorting_key)
elif option == Sorting.SCORE:
key.append(float('inf') if self.score is None else self.score)
elif option == Sorting.DELAY:
key.append(float('inf') if self.delay is None else self.delay)
else:
raise ValueError(f'Invalid sorting option: {option}.')
return tuple(key)
class Filter(NamedTuple):
"""Represents a set of mirror filtering options."""
countries: FrozenSet[str]
protocols: FrozenSet[str]
max_age: timedelta
regex_incl: Pattern
regex_excl: Pattern
def match(self, mirror: Mirror) -> bool:
"""Matches the mirror."""
if self.countries is not None:
if not any(mirror.country.match(c) for c in self.countries):
return False
if self.protocols is not None:
if mirror.url.scheme.lower() not in self.protocols:
return False
if self.max_age is not None:
if mirror.last_sync + self.max_age < datetime.now():
return False
if self.regex_incl is not None:
if not self.regex_incl.fullmatch(mirror.url.geturl()):
return False
if self.regex_excl is not None:
if self.regex_excl.fullmatch(mirror.url.geturl()):
return False
return True
if __name__ == '__main__':
try:
exit(main())
except KeyboardInterrupt:
LOGGER.error('Aborted by user.')
exit(1)
Python version: 3.7
python python-3.x
python python-3.x
edited Feb 28 at 11:28
Richard Neumann
asked Feb 28 at 11:04
Richard NeumannRichard Neumann
1,948724
1,948724
2
$begingroup$
Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doingfrom __future__ import annotations
andfrom re import Pattern
.
$endgroup$
– Graipher
Feb 28 at 11:20
4
$begingroup$
"Any feedback is welcome." I can't say I'm overly fond of the name.
$endgroup$
– Tom Chadwin
Feb 28 at 14:25
3
$begingroup$
Naming is hard, but knowing that "speculum" is a bad option isn't.
$endgroup$
– Danikov
Feb 28 at 14:25
3
$begingroup$
Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
$endgroup$
– Richard Neumann
Feb 28 at 14:40
3
$begingroup$
If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
$endgroup$
– Eric Lippert
Feb 28 at 18:54
|
show 1 more comment
2
$begingroup$
Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doingfrom __future__ import annotations
andfrom re import Pattern
.
$endgroup$
– Graipher
Feb 28 at 11:20
4
$begingroup$
"Any feedback is welcome." I can't say I'm overly fond of the name.
$endgroup$
– Tom Chadwin
Feb 28 at 14:25
3
$begingroup$
Naming is hard, but knowing that "speculum" is a bad option isn't.
$endgroup$
– Danikov
Feb 28 at 14:25
3
$begingroup$
Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
$endgroup$
– Richard Neumann
Feb 28 at 14:40
3
$begingroup$
If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
$endgroup$
– Eric Lippert
Feb 28 at 18:54
2
2
$begingroup$
Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doing
from __future__ import annotations
and from re import Pattern
.$endgroup$
– Graipher
Feb 28 at 11:20
$begingroup$
Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doing
from __future__ import annotations
and from re import Pattern
.$endgroup$
– Graipher
Feb 28 at 11:20
4
4
$begingroup$
"Any feedback is welcome." I can't say I'm overly fond of the name.
$endgroup$
– Tom Chadwin
Feb 28 at 14:25
$begingroup$
"Any feedback is welcome." I can't say I'm overly fond of the name.
$endgroup$
– Tom Chadwin
Feb 28 at 14:25
3
3
$begingroup$
Naming is hard, but knowing that "speculum" is a bad option isn't.
$endgroup$
– Danikov
Feb 28 at 14:25
$begingroup$
Naming is hard, but knowing that "speculum" is a bad option isn't.
$endgroup$
– Danikov
Feb 28 at 14:25
3
3
$begingroup$
Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
$endgroup$
– Richard Neumann
Feb 28 at 14:40
$begingroup$
Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
$endgroup$
– Richard Neumann
Feb 28 at 14:40
3
3
$begingroup$
If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
$endgroup$
– Eric Lippert
Feb 28 at 18:54
$begingroup$
If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
$endgroup$
– Eric Lippert
Feb 28 at 18:54
|
show 1 more comment
1 Answer
1
active
oldest
votes
$begingroup$
The classic Python file structure is this:
import this
CONSTANT = None
class Foo:
def methods(self):
pass
def function():
pass
def main():
pass
if __name__ == "__main__":
main()
While you do have all of those elements, by putting the classes all the way at the end you had me quite confused.
In your regex
function you are just printing the name of the exception, not the exception text. So you will always just get back ValueError: <class 'sre_constants.error'>
instead of a helpful description. Just use as
:
def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error as e:
raise ValueError(str(e))
For the command line interface I would use the functionalities argparse
supplies for multiple arguments, instead of parsing it yourself:
def get_args(args=None):
"""Returns the parsed arguments."""
parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', nargs="+", help='sort by the respective properties')
parser.add_argument('--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', nargs="+", help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', nargs="+",
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=int, default=None,
help='match mirrors updated more recently than this')
parser.add_argument(
'--regex-incl', '-i',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=int,
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args(args)
Also the default value of an unspecified optional argument already is None
, so you don't need to specify it every time and an option like --regex-incl
will automatically be stored in regex_incl
, so no need for that either.
And finally, if you give your function an argument which you pass on to the parsing and default it to None
, you can test this function by passing a list of strings.
I think you have slightly over-engineered this. Instead I would use a simple pandas.DataFrame
, which can easily be filtered and sorted.
import pandas as pd
from datetime import datetime
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])
mirrors = get_json()
df = pd.DataFrame(mirrors['urls'])
df['age'] = (datetime.now() - pd.to_datetime(df.last_sync)).dt.total_seconds() / 3600
df = df[df.active]
For the filtering you can either hardcode it similar to how you are currently doing:
if args.max_age is not None:
df = df[df.age <= args.max_age]
if args.countries is not None:
df = df[df.country.isin(args.countries) | df.country_code.isin(args.countries)]
if args.protocols is not None:
df = df[df.protocol.isin(args.protocols)]
Or you could accept a query string which you pass along to the dataframe directly:
args.query = "age < 24 and country_code == 'US'"
df = df.query(args.query)
Or even support both.
Regex patterns are also supported:
if args.regex_incl is not None:
df = df[df.url.str.match(args.regex_incl)]
if args.regex_excl is not None:
df = df[~df.url.str.match(args.regex_excl)]
Sorting by arbitrary column names is also quite easy:
df = df.sort_values(args.sort, ascending=not args.reverse)
And so is limiting:
df = df.head(args.limit)
Even with the explanations in between, this code is way less to read than your module. And a lot more readable IMO. It does however add an additional dependency, since pandas
is not in the Python Standard Library.
Of course you could just implement it using just standard library tools:
import argparse
from datetime import datetime
from functools import partial
from operator import itemgetter
from pathlib import Path
import re
import requests
DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
def get_args(args=None):
"""Returns the parsed arguments."""
...
def get_mirrors(url):
res = requests.get(url)
res.raise_for_status()
return res.json()
def filter_mirrors(args, mirror):
if not mirror['active']:
return False
try:
mirror['age'] = (datetime.now() - datetime.strptime(mirror['last_sync'], DATE_FORMAT)).total_seconds() / 3600
except TypeError:
return False
if args.max_age is not None and mirror['age'] > args.max_age:
return False
if args.countries is not None:
if mirror['country'] not in args.countries and mirror['country_code'] not in args.countries:
return False
if args.protocols is not None:
if mirror['protocol'] not in args.protocols:
return False
if args.regex_incl is not None:
match = re.match(args.regex_incl, mirror['url'])
if match is None:
return False
if args.regex_excl is not None:
match = re.match(args.regex_excl, mirror['url'])
if match is not None:
return False
return True
def match_mirrors(mirrors, args):
mirrors = mirrors['urls']
mirrors = filter(partial(filter_mirrors, args), mirrors)
mirrors = sorted(mirrors, key=itemgetter(*args.sort), reverse=args.reverse)
return [mirror['url'] for mirror in mirrors[:args.limit]]
if __name__ == "__main__":
mirrors = get_mirrors('https://www.archlinux.org/mirrors/status/json/')
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])
print(match_mirrors(mirrors, args))
And this would still be vastly shorter and more readable than your code...
$endgroup$
2
$begingroup$
@RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
$endgroup$
– Graipher
Feb 28 at 11:51
$begingroup$
@RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation usingpandas
with all of your functionalities (and a lot less code).
$endgroup$
– Graipher
Feb 28 at 13:30
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
return StackExchange.using("mathjaxEditing", function () {
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
});
});
}, "mathjax-editing");
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "196"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f214459%2fspeculum-a-simple-straightforward-arch-linux-mirror-list-optimizer%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
$begingroup$
The classic Python file structure is this:
import this
CONSTANT = None
class Foo:
def methods(self):
pass
def function():
pass
def main():
pass
if __name__ == "__main__":
main()
While you do have all of those elements, by putting the classes all the way at the end you had me quite confused.
In your regex
function you are just printing the name of the exception, not the exception text. So you will always just get back ValueError: <class 'sre_constants.error'>
instead of a helpful description. Just use as
:
def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error as e:
raise ValueError(str(e))
For the command line interface I would use the functionalities argparse
supplies for multiple arguments, instead of parsing it yourself:
def get_args(args=None):
"""Returns the parsed arguments."""
parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', nargs="+", help='sort by the respective properties')
parser.add_argument('--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', nargs="+", help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', nargs="+",
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=int, default=None,
help='match mirrors updated more recently than this')
parser.add_argument(
'--regex-incl', '-i',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=int,
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args(args)
Also the default value of an unspecified optional argument already is None
, so you don't need to specify it every time and an option like --regex-incl
will automatically be stored in regex_incl
, so no need for that either.
And finally, if you give your function an argument which you pass on to the parsing and default it to None
, you can test this function by passing a list of strings.
I think you have slightly over-engineered this. Instead I would use a simple pandas.DataFrame
, which can easily be filtered and sorted.
import pandas as pd
from datetime import datetime
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])
mirrors = get_json()
df = pd.DataFrame(mirrors['urls'])
df['age'] = (datetime.now() - pd.to_datetime(df.last_sync)).dt.total_seconds() / 3600
df = df[df.active]
For the filtering you can either hardcode it similar to how you are currently doing:
if args.max_age is not None:
df = df[df.age <= args.max_age]
if args.countries is not None:
df = df[df.country.isin(args.countries) | df.country_code.isin(args.countries)]
if args.protocols is not None:
df = df[df.protocol.isin(args.protocols)]
Or you could accept a query string which you pass along to the dataframe directly:
args.query = "age < 24 and country_code == 'US'"
df = df.query(args.query)
Or even support both.
Regex patterns are also supported:
if args.regex_incl is not None:
df = df[df.url.str.match(args.regex_incl)]
if args.regex_excl is not None:
df = df[~df.url.str.match(args.regex_excl)]
Sorting by arbitrary column names is also quite easy:
df = df.sort_values(args.sort, ascending=not args.reverse)
And so is limiting:
df = df.head(args.limit)
Even with the explanations in between, this code is way less to read than your module. And a lot more readable IMO. It does however add an additional dependency, since pandas
is not in the Python Standard Library.
Of course you could just implement it using just standard library tools:
import argparse
from datetime import datetime
from functools import partial
from operator import itemgetter
from pathlib import Path
import re
import requests
DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
def get_args(args=None):
"""Returns the parsed arguments."""
...
def get_mirrors(url):
res = requests.get(url)
res.raise_for_status()
return res.json()
def filter_mirrors(args, mirror):
if not mirror['active']:
return False
try:
mirror['age'] = (datetime.now() - datetime.strptime(mirror['last_sync'], DATE_FORMAT)).total_seconds() / 3600
except TypeError:
return False
if args.max_age is not None and mirror['age'] > args.max_age:
return False
if args.countries is not None:
if mirror['country'] not in args.countries and mirror['country_code'] not in args.countries:
return False
if args.protocols is not None:
if mirror['protocol'] not in args.protocols:
return False
if args.regex_incl is not None:
match = re.match(args.regex_incl, mirror['url'])
if match is None:
return False
if args.regex_excl is not None:
match = re.match(args.regex_excl, mirror['url'])
if match is not None:
return False
return True
def match_mirrors(mirrors, args):
mirrors = mirrors['urls']
mirrors = filter(partial(filter_mirrors, args), mirrors)
mirrors = sorted(mirrors, key=itemgetter(*args.sort), reverse=args.reverse)
return [mirror['url'] for mirror in mirrors[:args.limit]]
if __name__ == "__main__":
mirrors = get_mirrors('https://www.archlinux.org/mirrors/status/json/')
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])
print(match_mirrors(mirrors, args))
And this would still be vastly shorter and more readable than your code...
$endgroup$
2
$begingroup$
@RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
$endgroup$
– Graipher
Feb 28 at 11:51
$begingroup$
@RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation usingpandas
with all of your functionalities (and a lot less code).
$endgroup$
– Graipher
Feb 28 at 13:30
add a comment |
$begingroup$
The classic Python file structure is this:
import this
CONSTANT = None
class Foo:
def methods(self):
pass
def function():
pass
def main():
pass
if __name__ == "__main__":
main()
While you do have all of those elements, by putting the classes all the way at the end you had me quite confused.
In your regex
function you are just printing the name of the exception, not the exception text. So you will always just get back ValueError: <class 'sre_constants.error'>
instead of a helpful description. Just use as
:
def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error as e:
raise ValueError(str(e))
For the command line interface I would use the functionalities argparse
supplies for multiple arguments, instead of parsing it yourself:
def get_args(args=None):
"""Returns the parsed arguments."""
parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', nargs="+", help='sort by the respective properties')
parser.add_argument('--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', nargs="+", help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', nargs="+",
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=int, default=None,
help='match mirrors updated more recently than this')
parser.add_argument(
'--regex-incl', '-i',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=int,
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args(args)
Also the default value of an unspecified optional argument already is None
, so you don't need to specify it every time and an option like --regex-incl
will automatically be stored in regex_incl
, so no need for that either.
And finally, if you give your function an argument which you pass on to the parsing and default it to None
, you can test this function by passing a list of strings.
I think you have slightly over-engineered this. Instead I would use a simple pandas.DataFrame
, which can easily be filtered and sorted.
import pandas as pd
from datetime import datetime
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])
mirrors = get_json()
df = pd.DataFrame(mirrors['urls'])
df['age'] = (datetime.now() - pd.to_datetime(df.last_sync)).dt.total_seconds() / 3600
df = df[df.active]
For the filtering you can either hardcode it similar to how you are currently doing:
if args.max_age is not None:
df = df[df.age <= args.max_age]
if args.countries is not None:
df = df[df.country.isin(args.countries) | df.country_code.isin(args.countries)]
if args.protocols is not None:
df = df[df.protocol.isin(args.protocols)]
Or you could accept a query string which you pass along to the dataframe directly:
args.query = "age < 24 and country_code == 'US'"
df = df.query(args.query)
Or even support both.
Regex patterns are also supported:
if args.regex_incl is not None:
df = df[df.url.str.match(args.regex_incl)]
if args.regex_excl is not None:
df = df[~df.url.str.match(args.regex_excl)]
Sorting by arbitrary column names is also quite easy:
df = df.sort_values(args.sort, ascending=not args.reverse)
And so is limiting:
df = df.head(args.limit)
Even with the explanations in between, this code is way less to read than your module. And a lot more readable IMO. It does however add an additional dependency, since pandas
is not in the Python Standard Library.
Of course you could just implement it using just standard library tools:
import argparse
from datetime import datetime
from functools import partial
from operator import itemgetter
from pathlib import Path
import re
import requests
DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
def get_args(args=None):
"""Returns the parsed arguments."""
...
def get_mirrors(url):
res = requests.get(url)
res.raise_for_status()
return res.json()
def filter_mirrors(args, mirror):
if not mirror['active']:
return False
try:
mirror['age'] = (datetime.now() - datetime.strptime(mirror['last_sync'], DATE_FORMAT)).total_seconds() / 3600
except TypeError:
return False
if args.max_age is not None and mirror['age'] > args.max_age:
return False
if args.countries is not None:
if mirror['country'] not in args.countries and mirror['country_code'] not in args.countries:
return False
if args.protocols is not None:
if mirror['protocol'] not in args.protocols:
return False
if args.regex_incl is not None:
match = re.match(args.regex_incl, mirror['url'])
if match is None:
return False
if args.regex_excl is not None:
match = re.match(args.regex_excl, mirror['url'])
if match is not None:
return False
return True
def match_mirrors(mirrors, args):
mirrors = mirrors['urls']
mirrors = filter(partial(filter_mirrors, args), mirrors)
mirrors = sorted(mirrors, key=itemgetter(*args.sort), reverse=args.reverse)
return [mirror['url'] for mirror in mirrors[:args.limit]]
if __name__ == "__main__":
mirrors = get_mirrors('https://www.archlinux.org/mirrors/status/json/')
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])
print(match_mirrors(mirrors, args))
And this would still be vastly shorter and more readable than your code...
$endgroup$
2
$begingroup$
@RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
$endgroup$
– Graipher
Feb 28 at 11:51
$begingroup$
@RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation usingpandas
with all of your functionalities (and a lot less code).
$endgroup$
– Graipher
Feb 28 at 13:30
add a comment |
$begingroup$
The classic Python file structure is this:
import this
CONSTANT = None
class Foo:
def methods(self):
pass
def function():
pass
def main():
pass
if __name__ == "__main__":
main()
While you do have all of those elements, by putting the classes all the way at the end you had me quite confused.
In your regex
function you are just printing the name of the exception, not the exception text. So you will always just get back ValueError: <class 'sre_constants.error'>
instead of a helpful description. Just use as
:
def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error as e:
raise ValueError(str(e))
For the command line interface I would use the functionalities argparse
supplies for multiple arguments, instead of parsing it yourself:
def get_args(args=None):
"""Returns the parsed arguments."""
parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', nargs="+", help='sort by the respective properties')
parser.add_argument('--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', nargs="+", help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', nargs="+",
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=int, default=None,
help='match mirrors updated more recently than this')
parser.add_argument(
'--regex-incl', '-i',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=int,
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args(args)
Also the default value of an unspecified optional argument already is None
, so you don't need to specify it every time and an option like --regex-incl
will automatically be stored in regex_incl
, so no need for that either.
And finally, if you give your function an argument which you pass on to the parsing and default it to None
, you can test this function by passing a list of strings.
I think you have slightly over-engineered this. Instead I would use a simple pandas.DataFrame
, which can easily be filtered and sorted.
import pandas as pd
from datetime import datetime
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])
mirrors = get_json()
df = pd.DataFrame(mirrors['urls'])
df['age'] = (datetime.now() - pd.to_datetime(df.last_sync)).dt.total_seconds() / 3600
df = df[df.active]
For the filtering you can either hardcode it similar to how you are currently doing:
if args.max_age is not None:
df = df[df.age <= args.max_age]
if args.countries is not None:
df = df[df.country.isin(args.countries) | df.country_code.isin(args.countries)]
if args.protocols is not None:
df = df[df.protocol.isin(args.protocols)]
Or you could accept a query string which you pass along to the dataframe directly:
args.query = "age < 24 and country_code == 'US'"
df = df.query(args.query)
Or even support both.
Regex patterns are also supported:
if args.regex_incl is not None:
df = df[df.url.str.match(args.regex_incl)]
if args.regex_excl is not None:
df = df[~df.url.str.match(args.regex_excl)]
Sorting by arbitrary column names is also quite easy:
df = df.sort_values(args.sort, ascending=not args.reverse)
And so is limiting:
df = df.head(args.limit)
Even with the explanations in between, this code is way less to read than your module. And a lot more readable IMO. It does however add an additional dependency, since pandas
is not in the Python Standard Library.
Of course you could just implement it using just standard library tools:
import argparse
from datetime import datetime
from functools import partial
from operator import itemgetter
from pathlib import Path
import re
import requests
DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
def get_args(args=None):
"""Returns the parsed arguments."""
...
def get_mirrors(url):
res = requests.get(url)
res.raise_for_status()
return res.json()
def filter_mirrors(args, mirror):
if not mirror['active']:
return False
try:
mirror['age'] = (datetime.now() - datetime.strptime(mirror['last_sync'], DATE_FORMAT)).total_seconds() / 3600
except TypeError:
return False
if args.max_age is not None and mirror['age'] > args.max_age:
return False
if args.countries is not None:
if mirror['country'] not in args.countries and mirror['country_code'] not in args.countries:
return False
if args.protocols is not None:
if mirror['protocol'] not in args.protocols:
return False
if args.regex_incl is not None:
match = re.match(args.regex_incl, mirror['url'])
if match is None:
return False
if args.regex_excl is not None:
match = re.match(args.regex_excl, mirror['url'])
if match is not None:
return False
return True
def match_mirrors(mirrors, args):
mirrors = mirrors['urls']
mirrors = filter(partial(filter_mirrors, args), mirrors)
mirrors = sorted(mirrors, key=itemgetter(*args.sort), reverse=args.reverse)
return [mirror['url'] for mirror in mirrors[:args.limit]]
if __name__ == "__main__":
mirrors = get_mirrors('https://www.archlinux.org/mirrors/status/json/')
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])
print(match_mirrors(mirrors, args))
And this would still be vastly shorter and more readable than your code...
$endgroup$
The classic Python file structure is this:
import this
CONSTANT = None
class Foo:
def methods(self):
pass
def function():
pass
def main():
pass
if __name__ == "__main__":
main()
While you do have all of those elements, by putting the classes all the way at the end you had me quite confused.
In your regex
function you are just printing the name of the exception, not the exception text. So you will always just get back ValueError: <class 'sre_constants.error'>
instead of a helpful description. Just use as
:
def regex(string: str) -> Pattern:
"""Returns a regular expression."""
try:
return compile(string)
except error as e:
raise ValueError(str(e))
For the command line interface I would use the functionalities argparse
supplies for multiple arguments, instead of parsing it yourself:
def get_args(args=None):
"""Returns the parsed arguments."""
parser = ArgumentParser(description=__doc__)
parser.add_argument(
'--sort', '-s', nargs="+", help='sort by the respective properties')
parser.add_argument('--reverse', '-r', action='store_true', help='sort in reversed order')
parser.add_argument(
'--countries', '-c', nargs="+", help='match mirrors of these countries')
parser.add_argument(
'--protocols', '-p', nargs="+",
help='match mirrors that use one of the specified protocols')
parser.add_argument(
'--max-age', '-a', type=int, default=None,
help='match mirrors updated more recently than this')
parser.add_argument(
'--regex-incl', '-i',
help='match mirrors that match the regular expression')
parser.add_argument(
'--regex-excl', '-x',
help='exclude mirrors that match the regular expression')
parser.add_argument(
'--limit', '-l', type=int,
help='limit output to this amount of results')
parser.add_argument(
'--output', '-o', type=Path, default=None, metavar='file',
help='write the output to the specified file instead of stdout')
return parser.parse_args(args)
Also the default value of an unspecified optional argument already is None
, so you don't need to specify it every time and an option like --regex-incl
will automatically be stored in regex_incl
, so no need for that either.
And finally, if you give your function an argument which you pass on to the parsing and default it to None
, you can test this function by passing a list of strings.
I think you have slightly over-engineered this. Instead I would use a simple pandas.DataFrame
, which can easily be filtered and sorted.
import pandas as pd
from datetime import datetime
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])
mirrors = get_json()
df = pd.DataFrame(mirrors['urls'])
df['age'] = (datetime.now() - pd.to_datetime(df.last_sync)).dt.total_seconds() / 3600
df = df[df.active]
For the filtering you can either hardcode it similar to how you are currently doing:
if args.max_age is not None:
df = df[df.age <= args.max_age]
if args.countries is not None:
df = df[df.country.isin(args.countries) | df.country_code.isin(args.countries)]
if args.protocols is not None:
df = df[df.protocol.isin(args.protocols)]
Or you could accept a query string which you pass along to the dataframe directly:
args.query = "age < 24 and country_code == 'US'"
df = df.query(args.query)
Or even support both.
Regex patterns are also supported:
if args.regex_incl is not None:
df = df[df.url.str.match(args.regex_incl)]
if args.regex_excl is not None:
df = df[~df.url.str.match(args.regex_excl)]
Sorting by arbitrary column names is also quite easy:
df = df.sort_values(args.sort, ascending=not args.reverse)
And so is limiting:
df = df.head(args.limit)
Even with the explanations in between, this code is way less to read than your module. And a lot more readable IMO. It does however add an additional dependency, since pandas
is not in the Python Standard Library.
Of course you could just implement it using just standard library tools:
import argparse
from datetime import datetime
from functools import partial
from operator import itemgetter
from pathlib import Path
import re
import requests
DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
def get_args(args=None):
"""Returns the parsed arguments."""
...
def get_mirrors(url):
res = requests.get(url)
res.raise_for_status()
return res.json()
def filter_mirrors(args, mirror):
if not mirror['active']:
return False
try:
mirror['age'] = (datetime.now() - datetime.strptime(mirror['last_sync'], DATE_FORMAT)).total_seconds() / 3600
except TypeError:
return False
if args.max_age is not None and mirror['age'] > args.max_age:
return False
if args.countries is not None:
if mirror['country'] not in args.countries and mirror['country_code'] not in args.countries:
return False
if args.protocols is not None:
if mirror['protocol'] not in args.protocols:
return False
if args.regex_incl is not None:
match = re.match(args.regex_incl, mirror['url'])
if match is None:
return False
if args.regex_excl is not None:
match = re.match(args.regex_excl, mirror['url'])
if match is not None:
return False
return True
def match_mirrors(mirrors, args):
mirrors = mirrors['urls']
mirrors = filter(partial(filter_mirrors, args), mirrors)
mirrors = sorted(mirrors, key=itemgetter(*args.sort), reverse=args.reverse)
return [mirror['url'] for mirror in mirrors[:args.limit]]
if __name__ == "__main__":
mirrors = get_mirrors('https://www.archlinux.org/mirrors/status/json/')
args = get_args(["--countries", "US", "Germany",
"--sort", "age", "country",
"--max-age", "24",
"--limit", "10",
"--regex-incl", "https://"])
print(match_mirrors(mirrors, args))
And this would still be vastly shorter and more readable than your code...
edited Feb 28 at 15:55
answered Feb 28 at 11:34
GraipherGraipher
26.6k54092
26.6k54092
2
$begingroup$
@RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
$endgroup$
– Graipher
Feb 28 at 11:51
$begingroup$
@RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation usingpandas
with all of your functionalities (and a lot less code).
$endgroup$
– Graipher
Feb 28 at 13:30
add a comment |
2
$begingroup$
@RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
$endgroup$
– Graipher
Feb 28 at 11:51
$begingroup$
@RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation usingpandas
with all of your functionalities (and a lot less code).
$endgroup$
– Graipher
Feb 28 at 13:30
2
2
$begingroup$
@RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
$endgroup$
– Graipher
Feb 28 at 11:51
$begingroup$
@RichardNeumann: I'll try to find a reference and finish the alternative solution after lunch...
$endgroup$
– Graipher
Feb 28 at 11:51
$begingroup$
@RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation using
pandas
with all of your functionalities (and a lot less code).$endgroup$
– Graipher
Feb 28 at 13:30
$begingroup$
@RichardNeumann: Updated the answer with some comments about argument parsing and an alternate implementation using
pandas
with all of your functionalities (and a lot less code).$endgroup$
– Graipher
Feb 28 at 13:30
add a comment |
Thanks for contributing an answer to Code Review Stack Exchange!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
Use MathJax to format equations. MathJax reference.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f214459%2fspeculum-a-simple-straightforward-arch-linux-mirror-list-optimizer%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
2
$begingroup$
Which Python 3 version is this supposed to run on? With my Python 3.6 I get an error when doing
from __future__ import annotations
andfrom re import Pattern
.$endgroup$
– Graipher
Feb 28 at 11:20
4
$begingroup$
"Any feedback is welcome." I can't say I'm overly fond of the name.
$endgroup$
– Tom Chadwin
Feb 28 at 14:25
3
$begingroup$
Naming is hard, but knowing that "speculum" is a bad option isn't.
$endgroup$
– Danikov
Feb 28 at 14:25
3
$begingroup$
Yes. I know it also describes a certain medical device. It is latin for mirror and describes a special kind of mirror used in telescopes. And not every user here needs to tell me that they don't like the name. Just someone write it in an answer that the others can upvote.
$endgroup$
– Richard Neumann
Feb 28 at 14:40
3
$begingroup$
If you're going to name a tool "speculum" because it is Latin for "mirror", you should expect more than a few jokes about that choice. I encourage this behavior and I hope we can expect to see libraries "candida" (which implements a white-list security system, "candida" being Latin for "white"), "fistula" (which implements data pipeline management, "fistula" being Latin for "pipe") and "chlamydia" (which implements online privacy controls, "χλαμύδιον" being Greek for "cloak").
$endgroup$
– Eric Lippert
Feb 28 at 18:54