#!/usr/bin/env python3 import json import logging import mimetypes import os import pprint import sqlite3 import urllib import urllib.parse import urllib.request import xml.etree.ElementTree as xmlet import click import fake_useragent import podcastparser from ruamel.yaml import YAML global options global serverlen serverlen = 0 options = { "DEBUG": False, "serverlist": os.path.normpath( os.path.join(os.path.expanduser("~/.config/podweb"), "serverlist") ), "downloadlocation": os.path.expanduser("~/Podcasts"), } if os.name == "nt": options.update( {"serverlist": os.path.join(os.getenv("LOCALAPPDATA"), "podweb", "serverlist")} ) yaml = YAML() yaml.allow_duplicate_keys = True class PodWeb: def __init__( self, debug: bool = False, simulate: bool = False, config: None | str = None, server_list: None | str = None, download_location: None | str = None, ) -> None: self.options = options self.options.update({"DEBUG": debug}) self.servers = [] self.simulate = simulate self.DEFAULT_SERVERLIST_HEADING = """## You can add podcast xml feeds here. ## You can also optionally add categories, website url, image urls, and names for the podcasts. ## The order of category, name, and url does not matter. ## Here are some example entries: ## - category: example category ## name: example podcast 1 ## url: https://example.com/feed.xml ## img: https://example.com/image.jpg ## site: https://example.com ## - name: example podcast 2 ## url: example.com/feed2.xml """ if options["DEBUG"]: log_level = logging.DEBUG else: log_level = logging.ERROR self.log = logging.getLogger("PodWeb") self.log.setLevel(log_level) if not self.log.handlers: ch = logging.StreamHandler() ch.setLevel(log_level) formatter = logging.Formatter("%(levelname)s:%(name)s:%(message)s") ch.setFormatter(formatter) self.log.addHandler(ch) if self.options["DEBUG"]: self.config_path = os.path.abspath(os.path.curdir) self.config_filepath = "debug_config.yaml" self.db_path = self.config_path self.db_filepath = "debug_podweb.db" self.options["serverlist"] = os.path.join( self.config_path, "debug_serverlist" ) self.options["downloadlocation"] = os.path.join( self.config_path, "podcasts" ) else: if os.name == "nt": self.config_path = os.path.join(os.getenv("LOCALAPPDATA"), "podweb") self.db_path = os.path.join(self.config_path, "data") else: self.config_path = os.path.normpath( os.path.expanduser("~/.config/podweb") ) self.db_path = os.path.expanduser("~/.local/share/podweb") self.config_filepath = os.path.join(self.config_path, "config.yaml") self.db_filepath = os.path.join(self.db_path, "podweb.db") if config: self.config_filepath = os.path.normpath(os.path.expanduser(config)) self._open_db() self._load_config() self._update_config(self.options) if server_list: self.options["serverlist"] = os.path.normpath( os.path.expanduser(server_list) ) if download_location: self.options["downloadlocation"] = os.path.normpath( os.path.expanduser(download_location) ) if not os.path.exists(self.options["downloadlocation"]): os.makedirs(options["downloadlocation"]) self._load_serverlist() def __del__(self): self._close_db() def _open_db(self) -> None: """Opens SQLite database to track podcast episodes.""" if not os.path.exists(self.db_path): os.makedirs(self.db_path) self.con = sqlite3.connect(self.db_filepath) self.data = self.con.cursor() self._create_tables() def _close_db(self) -> None: self.con.close() def _create_tables(self) -> None: self.data.execute( """ CREATE TABLE IF NOT EXISTS "episodes" ( "guid" TEXT NOT NULL UNIQUE, "podcast_url" TEXT NOT NULL, "title" TEXT, "description" TEXT, "img" TEXT, "url" TEXT, "website" TEXT, "season" INTEGER, "deleted" BOOL NOT NULL CHECK("deleted" in (0,1)), "number" INTEGER NOT NULL, PRIMARY KEY("guid","podcast_url") ) """ ) self.data.execute( """ CREATE TABLE IF NOT EXISTS "downloads" ( "guid" TEXT NOT NULL UNIQUE, "podcast_url" TEXT NOT NULL, "mime_type" TEXT NOT NULL, "filepath" TEXT NOT NULL UNIQUE, PRIMARY KEY("guid","podcast_url"), FOREIGN KEY("guid") REFERENCES "episodes"("guid") ) """ ) self.data.execute( """ CREATE UNIQUE INDEX IF NOT EXISTS "downloadGuid" ON "downloads" ( "guid" ); """ ) self.data.execute( """ CREATE UNIQUE INDEX IF NOT EXISTS "episodeGuid" ON "episodes" ( "guid" ); """ ) def _win_safe_filename(self, filename: str) -> str: safe_chars = [" ", ".", "_"] return "".join( char for char in filename if char.isalnum() or char in safe_chars ).rstrip() def _unix_safe_filename(self, filename: str) -> str: if filename.lower() == "null": raise RuntimeError("File cannot be named null!") return filename.replace("/", " ") def safe_filename(self, filename: str) -> str: if os.name == "nt": return self._win_safe_filename(filename) return self._unix_safe_filename(filename) def _load_config(self) -> None: """Loads current config""" if not os.path.exists(self.config_path): os.makedirs(self.config_path) if not os.path.isfile(self.config_filepath): with open(self.config_filepath, "w") as f: yaml.dump(self.options, f) else: with open(self.config_filepath, "r+t") as f: data = yaml.load(f) if data is None: yaml.dump(self.options, f) else: self.options.update(data) def _update_config(self, changed_option: dict) -> None: """Makes a change to the config file""" with open(self.config_filepath, "rt") as f: config_options = yaml.load(f) config_options.update(changed_option) with open(self.config_filepath, "wt") as f: yaml.dump(config_options, f) def _load_serverlist(self, do_return: bool = False) -> None | list[dict]: """Loads the contents of the serverlist""" self._create_serverlist() with open(self.options["serverlist"], "r") as f: content = yaml.load(f) if do_return: return content if content: global serverlen for i in content: i["url"] = podcastparser.normalize_feed_url(i["url"]) self.servers = content serverlen = len(self.servers) breakpoint() def _create_serverlist(self) -> None: """Checks if the serverlist does not exist and creates it if not""" if not os.path.isfile(self.options["serverlist"]): with open(self.options["serverlist"], "w") as f: f.write(self.DEFAULT_SERVERLIST_HEADING) def _update_serverlist(self) -> None: """Overwrites the current serverlist with the stored serverlist""" serverlist = self._load_serverlist(True) if len(self.servers): global serverlen with open(self.options["serverlist"], "w") as f: if serverlist is None: f.write(self.DEFAULT_SERVERLIST_HEADING) yaml.dump(self.servers, f) serverlen = len(self.servers) def add_podcast( self, feedurl: str, name=None, category=None, site=None, img=None ) -> None: """Adds a new podcast to the serverlist""" feedurl = podcastparser.normalize_feed_url(feedurl) parsed = podcastparser.parse(feedurl, urllib.request.urlopen(feedurl)) if parsed.get("newLocation") != None: feedurl = podcastparser.normalize_feed_url(parsed.get("newLocation")) feedparse = urllib.parse.urlparse(feedurl) for i in self.servers: iparse = urllib.parse.urlparse(i["url"]) if iparse.hostname == feedparse.hostname and iparse.path == feedparse.path: return None new_feed = {"url": feedurl} if name is None: name = parsed.get("title") if img is None: img = parsed.get("cover_url") if site is None: site = parsed.get("link") if name: new_feed.update({"name": name}) if site: new_feed.update({"site": site}) if img: new_feed.update({"img": img}) if category: new_feed.update({"category": category}) self.servers.append(new_feed) self._update_serverlist() self._sync_episodes(feedurl, parsed=parsed) def _sync_episodes( self, feedurl: str, min_size: bool = True, parsed: dict | None = None, progressbar: bool = False, ) -> None: """syncs the available episodes for download for the given feedurl""" feedurl = podcastparser.normalize_feed_url(feedurl) if parsed == None: parsed = podcastparser.parse(feedurl, urllib.request.urlopen(feedurl)) if parsed.get("newLocation") != None: new_feedurl = podcastparser.normalize_feed_url(parsed.get("newLocation")) for i in self.servers: if i["url"] == feedurl: i["url"] = new_feedurl self._update_serverlist() feedurl = new_feedurl break guid_list = [] parsed["episodes"].reverse() if progressbar: bar = click.progressbar( length=len(parsed["episodes"]), label=f"Fetching {parsed['title']} episodes", ).__enter_() try: number = 1 for i in parsed["episodes"]: enclosure_list = sorted( i["enclosures"], key=lambda d: d["file_size"], reverse=min_size ) mime = True j = 0 size = len(enclosure_list) while mime and j < size: episode_url = enclosure_list[j]["url"] if enclosure_list[j]["mime_type"] == "audio/mpeg": mime = False else: j += 1 self.data.execute( """ INSERT OR REPLACE INTO "episodes" VALUES ( :GUID, :PODCAST_URL, :TITLE, :DESCRIPTION, :IMG, :URL, :WEBSITE, :SEASON, :DELETED, :NUMBER ); """, { "GUID": i["guid"], "PODCAST_URL": feedurl, "TITLE": i["title"], "DESCRIPTION": i["description"], "IMG": i["episode_art_url"], "URL": episode_url, "WEBSITE": i["link"], "SEASON": i.get("number", -1), "DELETED": False, "NUMBER": number, }, ) self.con.commit() number += 1 guid_list.append(i["guid"]) bar.update(1) finally: if progressbar: bar.__exit__() self.data.execute( """ SELECT \"guid\" FROM \"episodes\" WHERE podcast_url = :PODCAST_URL; """, {"PODCAST_URL": feedurl}, ) db_guid_list = [i[0] for i in self.data.fetchall()] for i in db_guid_list: if i not in guid_list: self.data.execute( """ UPDATE \"episodes\" SET \"deleted\" = 1 WHERE \"guid\" = :GUID; """, {"GUID": i}, ) def download_episode(self, guid: str, overwrite: bool = False) -> None: response = self.data.execute( """ SELECT COUNT(1) FROM "downloads" WHERE guid = :GUID; """, {"GUID": guid}, ) if response.fetchone()[0] == 1 and not overwrite: return None response = self.data.execute( """ SELECT "url","podcast_url","title","number" FROM "episodes" WHERE guid = :GUID; """, {"GUID": guid}, ) url, feedurl, title, number = response.fetchone() if url == None: click.echo("No URL found!", err=True) return 1 ua = fake_useragent.UserAgent() request_obj = urllib.request.Request( url, data=None, headers={"User-Agent": ua.ff} ) try: audio_response = urllib.request.urlopen(request_obj) except urllib.error.HTTPError: self._sync_episodes(feedurl) audio_response = urllib.request.urlopen(request_obj) audio_data = audio_response.read() mime_type = audio_response.info().get_content_type() ext = mimetypes.guess_extension(mime_type) ext = "mp3" if ext is None else ext podcast = next(iter([i for i in self.servers if i["url"] == feedurl])) filename = f"{number:09}.{guid}.{self.safe_filename(title)}.{ext}" filepath = os.path.join(options["downloadlocation"], podcast["name"]) if not os.path.exists(filepath): os.makedirs(filepath) filepath = os.path.join(filepath, filename) with open(filepath, "wb") as f: f.write(audio_data) self.data.execute( """ INSERT OR REPLACE INTO "downloads" VALUES ( :GUID, :PODCAST_URL, :MIME_TYPE, :FILEPATH ); """, { "GUID": guid, "PODCAST_URL": feedurl, "MIME_TYPE": mime_type, "FILEPATH": filepath, }, ) self.con.commit() def import_opml(self, opml_path: str) -> None: body = xmlet.parse(source=opml_path).getroot().find("body") if body is None: raise SyntaxError("OPML does not have body tag") for child in body: i = child.attrib if i["type"] == "rss": self.add_podcast( feedurl=i["xmlUrl"], name=i.get("text"), site=i.get("htmlUrl"), img=i.get("imageUrl"), ) def _parse_rss(self, url: str) -> dict: parsed = podcastparser.parse(url, urllib.request.urlopen(url)) return parsed def _parse_local_rss(self, file: str) -> dict: with open(file, "rb") as f: parsed = podcastparser.parse(file, f) return parsed @click.group() @click.pass_context @click.option("-d", "--debug", is_flag=True) @click.option("--simulate", is_flag=True) @click.option("--config", default=None) @click.option("--server-list", default=None) @click.option("--download-location", default=None) def cli( ctx, debug: bool, simulate: bool, config: None | str, server_list: None | str, download_location: None | str, ): """a simple podfetcher for the CLI.""" ctx.obj = PodWeb( debug=debug, simulate=simulate, config=config, server_list=server_list, download_location=download_location, ) ctx.show_default = True @cli.command() @click.argument( "setting", type=click.Choice( ["configlocation", "serverlistlocation", "downloadlocation", "servers"], case_sensitive=False, ), ) @click.pass_obj def get_setting(obj, setting): if setting == "configlocation": click.echo(obj.config_filepath) if setting == "serverlistlocation": click.echo(obj.options["serverlist"]) if setting == "downloadlocation": click.echo(obj.options["podcastpath"]) if setting == "servers": for i in obj.servers: name = "" if i.get("name"): name = f"{i['name']} - " click.echo(f"{name}{i['url']}") @cli.command() @click.argument("url") @click.option( "-F", "--format", type=click.Choice(["pprint", "json"], case_sensitive=False), default="pprint", ) @click.pass_obj def parse(obj, url, format): if format == "pprint": click.echo(pprint.pformat(obj._parse_rss(url))) else: click.echo(json.dumps(obj._parse_rss(url), indent=4, separators=(",", ": "))) @cli.command() @click.argument("filepath", type=click.Path(exists=True)) @click.option( "-F", "--format", type=click.Choice(["pprint", "json"], case_sensitive=False), default="pprint", ) @click.pass_obj def parse_file(obj, filepath, format): if format == "pprint": click.echo(pprint.pformat(obj._parse_local_rss(filepath))) else: click.echo( json.dumps(obj._parse_local_rss(filepath), indent=4, separators=(",", ": ")) ) @cli.command() @click.argument("url") @click.option("-n", "--name") @click.option("-c", "--category") @click.option("-s", "--site") @click.option("-i", "--img") @click.pass_obj def add_podcast( obj, url: str, name: str | None, category: str | None, site: str | None, img: str | None, ): obj.add_podcast(feedurl=url, name=name, category=category, site=site, img=img) @cli.command() @click.argument("opml-file", type=click.Path(exists=True)) @click.pass_obj def import_opml(obj, opml_file: str): obj.import_opml(opml_file) click.echo(f"imported {opml_file}") @cli.command() @click.pass_obj @click.option("-i", "--index", type=click.IntRange(min=1, max=serverlen, clamp=False)) def fetch(obj, index: str | None): global serverlen if serverlen == 0: click.echo("OUUUUUUUUUUUUUUGH") elif index == None and serverlen != 1: with click.progressbar(obj.servers, label="fetching episodes") as bar: for i in bar: click.echo(i["name"]) obj._sync_episodes(i["url"]) else: obj._sync_episodes(obj.servers[index - 1], progressbar=True) @cli.command() @click.pass_obj def list(obj): for i in range(1, len(obj.servers) + 1): click.echo(f"{i}: {obj.servers[i-1]['name']}") if __name__ == "__main__": cli()