#!/usr/bin/env python3 import os import click import json import xml.etree.ElementTree as xmlet import podcastparser import urllib import urllib.request import urllib.parse import pprint import logging import sqlite3 from ruamel.yaml import YAML global options options = { "DEBUG": False, "serverlist": os.path.normpath( os.path.join(os.path.expanduser("~/.config/podweb"), "serverlist") ), "downloadlocation": os.path.expanduser("~/Podcasts"), } yaml = YAML() yaml.allow_duplicate_keys = True class PodWeb: def __init__( self, debug: bool = False, simulate: bool = False, config: None | str = None, server_list: None | str = None, download_location: None | str = None, ) -> None: self.options = options self.options.update({"DEBUG": debug}) self.servers = [] self.simulate = simulate self.DEFAULT_SERVERLIST_HEADING = """## You can add podcast xml feeds here. ## You can also optionally add categories, website url, image urls, and names for the podcasts. ## The order of category, name, and url does not matter. ## Here are some example entries: ## - category: example category ## name: example podcast 1 ## url: https://example.com/feed.xml ## img: https://example.com/image.jpg ## site: https://example.com ## - name: example podcast 2 ## url: example.com/feed2.xml """ if options["DEBUG"]: log_level = logging.DEBUG else: log_level = logging.ERROR self.log = logging.getLogger("PodWeb") self.log.setLevel(log_level) if not self.log.handlers: ch = logging.StreamHandler() ch.setLevel(log_level) formatter = logging.Formatter("%(levelname)s:%(name)s:%(message)s") ch.setFormatter(formatter) self.log.addHandler(ch) if self.options["DEBUG"]: self.config_path = os.path.abspath(os.path.curdir) self.config_filepath = "debug_config.yaml" self.db_path = self.config_path self.db_filepath = "debug_podweb.db" self.options["serverlist"] = os.path.join( self.config_path, "debug_serverlist" ) self.options["downloadlocation"] = os.path.join( self.config_path, "podcasts" ) else: self.config_path = os.path.normpath(os.path.expanduser("~/.config/podweb")) self.config_filepath = os.path.join(self.config_path, "config.yaml") self.db_path = os.path.expanduser("~/.local/share/podweb") self.db_filepath = os.path.join(self.db_path, "podweb.db") if config: self.config_filepath = os.path.normpath(os.path.expanduser(config)) self._open_db() self._load_config() self._update_config(self.options) if server_list: self.options["serverlist"] = os.path.normpath( os.path.expanduser(server_list) ) if download_location: self.options["downloadlocation"] = os.path.normpath( os.path.expanduser(download_location) ) if not os.path.exists(self.options["downloadlocation"]): os.makedirs(options["downloadlocation"]) self._load_serverlist() def __del__(self): self._close_db() def _open_db(self) -> None: """Opens SQLite database to track podcast episodes.""" if not os.path.exists(self.db_path): os.makedirs(self.db_path) self.con = sqlite3.connect(self.db_filepath) self.data = self.con.cursor() self._create_tables() def _close_db(self) -> None: self.con.close() def _create_tables(self) -> None: self.data.execute("""CREATE TABLE IF NOT EXISTS "episodes" ( "guid" TEXT NOT NULL UNIQUE, "title" TEXT, "description" TEXT, "img" TEXT, "url" TEXT, "season" TEXT, PRIMARY KEY("guid") )""") self.data.execute("""CREATE TABLE IF NOT EXISTS "downloads" ( "guid" TEXT NOT NULL UNIQUE, "filepath" TEXT NOT NULL UNIQUE, PRIMARY KEY("guid"), FOREIGN KEY("guid") REFERENCES "episodes"("guid") )""") def _load_config(self) -> None: """Loads current config""" if not os.path.exists(self.config_path): os.makedirs(self.config_path) if not os.path.isfile(self.config_filepath): with open(self.config_filepath, "w") as f: yaml.dump(self.options, f) else: with open(self.config_filepath, "r+t") as f: data = yaml.load(f) if data is None: yaml.dump(self.options, f) else: self.options.update(data) def _update_config(self, changed_option: dict) -> None: """Makes a change to the config file""" with open(self.config_filepath, "rt") as f: config_options = yaml.load(f) config_options.update(changed_option) with open(self.config_filepath, "wt") as f: yaml.dump(config_options, f) def _load_serverlist(self, do_return : bool = False): """Loads the contents of the serverlist""" self._create_serverlist() with open(self.options["serverlist"], "r") as f: content = yaml.load(f) if do_return: return content if content: for i in content: content["url"] = podcastparser.normalize_feed_url(content["url"]) self.servers = content def _create_serverlist(self) -> None: """Checks if the serverlist does not exist and creates it if not""" if not os.path.isfile(self.options["serverlist"]): with open(self.options["serverlist"], "w") as f: f.write(self.DEFAULT_SERVERLIST_HEADING) def _update_serverlist(self) -> None: """Overwrites the current serverlist with the stored serverlist""" serverlist = self._load_serverlist(True) if len(self.servers): with open(self.options["serverlist"], "w") as f: if serverlist is None: f.write(self.DEFAULT_SERVERLIST_HEADING) yaml.dump(self.servers, f) def add_podcast( self, feedurl: str, name=None, category=None, site=None, img=None ) -> None: feedurl = podcastparser.normalize_feed_url(feedurl) feedparse = urllib.parse.urlparse(feedurl) for i in self.servers: iparse = urllib.parse.urlparse(i["url"]) if iparse.hostname == feedparse.hostname and iparse.path == feedparse.path: return None new_feed = {"url": feedurl} if name is None or img is None or site is None: parsed = podcastparser.parse(feedurl, urllib.request.urlopen(feedurl)) if name is None: name = parsed.get("title") if img is None: img = parsed.get("cover_url") if site is None: site = parsed.get("link") if name: new_feed.update({"name": name}) if site: new_feed.update({"site": site}) if img: new_feed.update({"img": img}) if category: new_feed.update({"category": category}) self.servers.append(new_feed) self._update_serverlist() def import_opml(self, opml_path: str) -> None: body = xmlet.parse(source=opml_path).getroot().find("body") if body is None: raise SyntaxError("OPML does not have body tag") for child in body: i = child.attrib if i["type"] == "rss": self.add_podcast( feedurl=i["xmlUrl"], name=i.get("text"), site=i.get("htmlUrl"), img=i.get("imageUrl"), ) def _parse_rss(self, url: str) -> dict: parsed = podcastparser.parse(url, urllib.request.urlopen(url)) return parsed def _parse_local_rss(self, file: str) -> dict: with open(file, "rb") as f: parsed = podcastparser.parse(file, f) return parsed @click.group() @click.pass_context @click.option("-d", "--debug", is_flag=True) @click.option("--simulate", is_flag=True) @click.option("--config", default=None) @click.option("--server-list", default=None) @click.option("--download-location", default=None) def cli( ctx, debug: bool, simulate: bool, config: None | str, server_list: None | str, download_location: None | str, ): """a simple podfetcher for the CLI.""" ctx.obj = PodWeb( debug=debug, simulate=simulate, config=config, server_list=server_list, download_location=download_location, ) ctx.show_default = True @cli.command() @click.argument( "setting", type=click.Choice( ["configlocation", "serverlistlocation", "downloadlocation", "servers"], case_sensitive=False, ), ) @click.pass_obj def get_setting(obj, setting): if setting == "configlocation": click.echo(obj.config_filepath) if setting == "serverlistlocation": click.echo(obj.options["serverlist"]) if setting == "downloadlocation": click.echo(obj.options["podcastpath"]) if setting == "servers": for i in obj.servers: name = "" if i.get("name"): name = f"{i['name']} - " click.echo(f"{name}{i['url']}") @cli.command() @click.argument("url") @click.option( "-F", "--format", type=click.Choice(["pprint", "json"], case_sensitive=False), default="pprint", ) @click.pass_obj def parse(obj, url, format): if format == "pprint": click.echo(pprint.pformat(obj._parse_rss(url))) else: click.echo(json.dumps(obj._parse_rss(url), indent=4, separators=(",", ": "))) @cli.command() @click.argument("filepath", type=click.Path(exists=True)) @click.option( "-F", "--format", type=click.Choice(["pprint", "json"], case_sensitive=False), default="pprint", ) @click.pass_obj def parse_file(obj, filepath, format): if format == "pprint": click.echo(pprint.pformat(obj._parse_local_rss(filepath))) else: click.echo( json.dumps(obj._parse_local_rss(filepath), indent=4, separators=(",", ": ")) ) @cli.command() @click.argument("url") @click.option("-n", "--name") @click.option("-c", "--category") @click.option("-s", "--site") @click.option("-i", "--img") @click.pass_obj def add_podcast( obj, url: str, name: str | None, category: str | None, site: str | None, img: str | None, ): obj.add_podcast(feedurl=url, name=name, category=category, site=site, img=img) @cli.command() @click.argument("opml-file", type=click.Path(exists=True)) @click.pass_obj def import_opml(obj, opml_file : str): obj.import_opml(opml_file) click.echo(F"imported {opml_file}") if __name__ == "__main__": cli()