diff options
author | ComradeCrow <comradecrow@vivaldi.net> | 2025-04-10 17:14:24 -0700 |
---|---|---|
committer | ComradeCrow <comradecrow@vivaldi.net> | 2025-04-10 17:14:24 -0700 |
commit | 1ba921568426768f36820b5641a49275e3a7e31c (patch) | |
tree | 7ab32a47f7044d3eda277a4b43cac3cb5fb3c995 | |
parent | f8715aae47ac8e54f032312fe3058ca984db722e (diff) | |
download | podweb-1ba921568426768f36820b5641a49275e3a7e31c.tar.gz |
breaking changes
-rwxr-xr-x | PodWeb.py | 150 |
1 files changed, 106 insertions, 44 deletions
diff --git a/PodWeb.py b/PodWeb.py index f506802..83c4f07 100755 --- a/PodWeb.py +++ b/PodWeb.py @@ -22,13 +22,17 @@ options = { os.path.join(os.path.expanduser("~/.config/podweb"), "serverlist") ), "downloadlocation": os.path.expanduser("~/Podcasts"), - "default_download_limit": -1, - "default_download_order": "ASC", + "default_download_limit": 10, + "default_playback_order": "DSC", } if os.name == "nt": options.update( - {"serverlist": os.path.join(os.getenv("LOCALAPPDATA"), "podweb", "serverlist")} + { + "serverlist": os.path.normpath( + os.path.join(os.getenv("LOCALAPPDATA"), "podweb", "serverlist") + ) + } ) yaml = YAML() @@ -37,19 +41,20 @@ yaml.allow_duplicate_keys = True class PodWeb: def __init__( - self, + self: any, debug: bool = False, simulate: bool = False, config: None | str = None, server_list: None | str = None, download_location: None | str = None, ) -> None: + global options self.options = options self.options.update({"DEBUG": debug}) self.servers = [] self.simulate = simulate self.DEFAULT_SERVERLIST_HEADING = """## You can add podcast xml feeds here. -## You can also optionally add categories, website url, image urls, and names for the podcasts. +## You can also optionally add categories, website url, image urls, download_limit, playback_order, and names for the podcasts. ## The order of category, name, and url does not matter. ## Here are some example entries: ## - category: example category @@ -57,7 +62,8 @@ class PodWeb: ## url: https://example.com/feed.xml ## img: https://example.com/image.jpg ## site: https://example.com -## autodownload: 10 +## download_limit: 10 +## playback_order: DSC ## - name: example podcast 2 ## url: example.com/feed2.xml @@ -116,10 +122,10 @@ class PodWeb: os.makedirs(options["downloadlocation"]) self._load_serverlist() - def __del__(self): + def __del__(self: any) -> None: self._close_db() - def _open_db(self) -> None: + def _open_db(self: any) -> None: """Opens SQLite database to track podcast episodes.""" if not os.path.exists(self.db_path): os.makedirs(self.db_path) @@ -127,10 +133,10 @@ class PodWeb: self.data = self.con.cursor() self._create_tables() - def _close_db(self) -> None: + def _close_db(self: any) -> None: self.con.close() - def _create_tables(self) -> None: + def _create_tables(self: any) -> None: self.data.execute( """ CREATE TABLE @@ -146,6 +152,7 @@ class PodWeb: "season" INTEGER, "deleted" BOOL NOT NULL CHECK("deleted" in (0,1)), "number" INTEGER NOT NULL, + "pubdate" INTEGER, PRIMARY KEY("guid","podcast_url") ) """ @@ -183,28 +190,28 @@ class PodWeb: """ ) - def _win_safe_filename(self, filename: str) -> str: + def _win_safe_filename(self: any, filename: str) -> str: restricted_filenames = ["com", "prn", "aux", "nul"] restricted_filenames += [f"com{i}" for i in range(0, 10)] restricted_filenames += [f"lpt{i}" for i in range(0, 10)] - if filename.lower() not in restricted_filenames: - pass + if filename.lower() in restricted_filenames: + raise RuntimeError(f"File cannot be named {filename}!") safe_chars = [" ", ".", "_"] return "".join( char for char in filename if char.isalnum() or char in safe_chars ).rstrip() - def _unix_safe_filename(self, filename: str) -> str: + def _unix_safe_filename(self: any, filename: str) -> str: if filename.lower() == "null": raise RuntimeError("File cannot be named null!") return filename.replace("/", " ") - def safe_filename(self, filename: str) -> str: + def safe_filename(self: any, filename: str) -> str: if os.name == "nt": return self._win_safe_filename(filename) return self._unix_safe_filename(filename) - def _load_config(self) -> None: + def _load_config(self: any) -> None: """Loads current config""" if not os.path.exists(self.config_path): os.makedirs(self.config_path) @@ -219,9 +226,19 @@ class PodWeb: if data is None: yaml.dump(self.options, f) else: + if "default_download_limit" not in data or ( + type(data["default_download_limit"]) != int + and data["default_download_limit"].upper() + not in ["ALL", "NONE"] + ): + data["defualt_download_limit"] = 10 + if "default_playback_order" not in data or data[ + "default_playback_order" + ].upper() not in ["ASC", "DSC"]: + data["default_playback_order"] = "DSC" self.options.update(data) - def _update_config(self, changed_option: dict) -> None: + def _update_config(self: any, changed_option: dict) -> None: """Makes a change to the config file""" with open(self.config_filepath, "rt") as f: config_options = yaml.load(f) @@ -229,7 +246,7 @@ class PodWeb: with open(self.config_filepath, "wt") as f: yaml.dump(config_options, f) - def _load_serverlist(self, do_return: bool = False) -> None | list[dict]: + def _load_serverlist(self: any, do_return: bool = False) -> None | list[dict]: """Loads the contents of the serverlist""" self._create_serverlist() with open(self.options["serverlist"], "r") as f: @@ -241,13 +258,13 @@ class PodWeb: i["url"] = podcastparser.normalize_feed_url(i["url"]) self.servers = content - def _create_serverlist(self) -> None: + def _create_serverlist(self: any) -> None: """Checks if the serverlist does not exist and creates it if not""" if not os.path.isfile(self.options["serverlist"]): with open(self.options["serverlist"], "w") as f: f.write(self.DEFAULT_SERVERLIST_HEADING) - def _update_serverlist(self) -> None: + def _update_serverlist(self: any) -> None: """Overwrites the current serverlist with the stored serverlist""" serverlist = self._load_serverlist(True) if len(self.servers): @@ -289,12 +306,12 @@ class PodWeb: self._sync_episodes(feedurl, parsed=parsed) def _sync_episodes( - self, + self: any, feedurl: str, min_size: bool = True, parsed: dict | None = None, progressbar: bool = False, - ) -> None: + ) -> list[str]: """syncs the available episodes for download for the given feedurl""" feedurl = podcastparser.normalize_feed_url(feedurl) if parsed == None: @@ -308,12 +325,38 @@ class PodWeb: feedurl = new_feedurl break guid_list = [] - parsed["episodes"].reverse() + podcast_settings = next( + iter([i for i in self.servers if i["url"] == feedurl]), None + ) + if ( + podcast_settings == None + or ( + "playback_order" in podcast_settings + and podcast_settings["playback_order"] == "DSC" + ) + or ( + "playback_order" not in podcast_settings + and self.options["default_playback_order"] == "DSC" + ) + ): + parsed["episodes"].reverse() with click.progressbar( length=len(parsed["episodes"]), label=f"Fetching {parsed['title']} episodes" ) if progressbar else nullcontext() as bar: number = 1 + new_episodes_guids = [] for i in parsed["episodes"]: + self.data.execute( + """ + SELECT COUNT(1) + FROM "episodes" + WHERE guid = :GUID; + """, + {"GUID": i["guid"]}, + ) + isfound = self.execute.fetchone()[0] + if isfound == 0: + new_episodes_guids.append(i["guid"]) enclosure_list = sorted( i["enclosures"], key=lambda d: d["file_size"], reverse=min_size ) @@ -338,7 +381,8 @@ class PodWeb: :WEBSITE, :SEASON, :DELETED, - :NUMBER + :NUMBER, + :PUBDATE ); """, { @@ -352,6 +396,7 @@ class PodWeb: "SEASON": i.get("number", -1), "DELETED": False, "NUMBER": number, + "PUBDATE": podcastparser.parse_pubdate(i.get("pubDate", "")), }, ) self.con.commit() @@ -372,15 +417,16 @@ class PodWeb: if i not in guid_list: self.data.execute( """ - UPDATE \"episodes\" + UPDATE \"episodes\" SET \"deleted\" = 1 WHERE \"guid\" = :GUID; """, {"GUID": i}, ) + return new_episodes_guids def download_episode_index( - self, title: str, number: int, overwrite: bool = False + self: any, title: str, number: int, overwrite: bool = False ) -> None: response = self.data.execute( """ @@ -395,7 +441,7 @@ class PodWeb: if guid != None: self.download_episode(guid, overwrite) - def download_episode(self, guid: str, overwrite: bool = False) -> None: + def download_episode(self: any, guid: str, overwrite: bool = False) -> None: response = self.data.execute( """ SELECT COUNT(1) @@ -433,10 +479,12 @@ class PodWeb: ext = "mp3" if ext is None else ext podcast = next(iter([i for i in self.servers if i["url"] == feedurl])) filename = f"{number:09}.{guid}.{self.safe_filename(title)}.{ext}" - filepath = os.path.join(options["downloadlocation"], podcast["name"]) + filepath = os.path.join( + self.options["downloadlocation"], self.safe_filename(podcast["name"]) + ) if not os.path.exists(filepath): os.makedirs(filepath) - filepath = os.path.join(filepath, filename) + filepath = os.path.abspath(os.path.join(filepath, filename)) with open(filepath, "wb") as f: f.write(audio_data) self.data.execute( @@ -459,7 +507,7 @@ class PodWeb: ) self.con.commit() - def import_opml(self, opml_path: str) -> None: + def import_opml(self: any, opml_path: str) -> None: body = xmlet.parse(source=opml_path).getroot().find("body") if body is None: raise SyntaxError("OPML does not have body tag") @@ -473,11 +521,11 @@ class PodWeb: img=i.get("imageUrl"), ) - def _parse_rss(self, url: str) -> dict: + def _parse_rss(self: any, url: str) -> dict: parsed = podcastparser.parse(url, urllib.request.urlopen(url)) return parsed - def _parse_local_rss(self, file: str) -> dict: + def _parse_local_rss(self: any, file: str) -> dict: with open(file, "rb") as f: parsed = podcastparser.parse(file, f) return parsed @@ -491,13 +539,13 @@ class PodWeb: @click.option("--server-list", default=None) @click.option("--download-location", default=None) def cli( - ctx, + ctx: any, debug: bool, simulate: bool, config: None | str, server_list: None | str, download_location: None | str, -): +) -> None: """a simple podfetcher for the CLI.""" ctx.obj = PodWeb( debug=debug, @@ -518,7 +566,7 @@ def cli( ), ) @click.pass_obj -def get_setting(obj, setting): +def get_setting(obj: any, setting: str) -> None: if setting == "configlocation": click.echo(obj.config_filepath) if setting == "serverlistlocation": @@ -542,7 +590,7 @@ def get_setting(obj, setting): default="pprint", ) @click.pass_obj -def parse(obj, url, format): +def parse(obj: any, url: str, format: str) -> None: if format == "pprint": click.echo(pprint.pformat(obj._parse_rss(url))) else: @@ -558,7 +606,7 @@ def parse(obj, url, format): default="pprint", ) @click.pass_obj -def parse_file(obj, filepath, format): +def parse_file(obj: any, filepath: str, format: str) -> None: if format == "pprint": click.echo(pprint.pformat(obj._parse_local_rss(filepath))) else: @@ -575,27 +623,27 @@ def parse_file(obj, filepath, format): @click.option("-i", "--img") @click.pass_obj def add_podcast( - obj, + obj: any, url: str, name: str | None, category: str | None, site: str | None, img: str | None, -): +) -> None: obj.add_podcast(feedurl=url, name=name, category=category, site=site, img=img) @cli.command() @click.argument("opml-file", type=click.Path(exists=True)) @click.pass_obj -def import_opml(obj, opml_file: str): +def import_opml(obj: any, opml_file: str) -> None: obj.import_opml(opml_file) click.echo(f"imported {opml_file}") @cli.command() @click.pass_obj -def ls(obj): +def ls(obj: any) -> None: for i in range(1, len(obj.servers) + 1): click.echo(f"{i}: {obj.servers[i-1]['name']}") @@ -603,18 +651,32 @@ def ls(obj): @cli.command() @click.pass_obj @click.option("-i", "--index", type=click.IntRange(min=1, max=None, clamp=False)) -def fetch(obj, index: str | None): +@click.option("-q", "--quiet", is_flag=True) +def fetch(obj: any, index: int | None, quiet: bool) -> None: serverlen = len(obj.servers) if index != None and index > serverlen: click.echo(f"Podcast {index} does not exist.", err=True) elif index == None and serverlen != 1: with click.progressbar(obj.servers, label="Fetching episodes") as bar: for i in bar: - click.echo(i["name"]) - obj._sync_episodes(i["url"]) + if not quiet: + click.echo(i["name"]) + new_episodes_guids = obj._sync_episodes(i["url"]) + if not quiet: + obj.data.execute( + """ + SELECT "title" + """ + ) else: obj._sync_episodes(obj.servers[index - 1]["url"], progressbar=True) +@cli.command() +@click.pass_obj +def download(obj: any) -> None: + pass + + if __name__ == "__main__": cli() |