#!/usr/bin/python import re from collections import defaultdict from dataclasses import dataclass from datetime import datetime from email.message import EmailMessage from email.parser import BytesParser from email.policy import default from pathlib import Path from typing import Any, Optional from urllib.parse import quote, urljoin import click import requests from dotenv import load_dotenv from feedgen.feed import FeedGenerator from tqdm import tqdm DEFAULT_IN_DIR = Path("/home/fran/mail/personal/rss/cur/") DEFAULT_OUT_DIR = Path("/home/yunohost.app/gossa/") DEFAULT_CAT_ID = 11 # webnovels @dataclass class RssItem: title: str url: str date: datetime feed_title: str description: Optional[str] = None author: Optional[str] = None feed_description: Optional[str] = None feed_url: Optional[str] = None icon: Optional[str] = None class ParserFFN: SUBJECT_RE = re.compile(r"^Chapter: (.*) Ch[0-9]+ .*") BODY_RE = re.compile( r""" New\ chapter\ from\ (?P.+),\n \n \s*(?P.+)\n \s*Chapter\ \d+:\ (?P.+)\n \n \s*(?P<url>.+/)[^/]*\n """, re.VERBOSE, ) SUMMARY_RE = re.compile(r"^\s*Summary: (.*)", re.MULTILINE) FEED_URL_RE = re.compile(r"(.*/)\d+/") @staticmethod def parse(email: EmailMessage) -> RssItem: subject = email["subject"] body = email.get_content() m = ParserFFN.SUBJECT_RE.match(subject) if m is None: raise ValueError("Not an ffn email") m = ParserFFN.BODY_RE.search(body) if m is None: print("Malformed ffn email:") print(email) raise ValueError("Malformed ffn email") date = datetime.strptime(email["date"], "%a, %d %b %Y %H:%M:%S %z") item = RssItem( title=m.group("title"), url=m.group("url"), author=m.group("author"), feed_title=m.group("feed_title"), date=date, icon="https://www.fanfiction.net/favicon.ico", ) m = ParserFFN.FEED_URL_RE.search(item.url) if m is None: print(f"Malformed ffn URL: {item.url}") raise ValueError(f"Malformed ffn email URL: {item.url}") item.feed_url = m.group(1) m = ParserFFN.SUMMARY_RE.search(body) if m is not None: item.feed_description = m.group(1) return item EXTRACTORS = [ParserFFN] def escape_fn(fn): escapes = r" '?()" for c in escapes: fn = fn.replace(c, f"\\{c}") return fn def remove_tricky_characters(title): chars = r"?" for c in chars: title = title.replace(c, "") return title def parse_emails(path: Path) -> dict[str, list[RssItem]]: item_map = defaultdict(list) for fn in tqdm(list(path.iterdir())): with open(fn, "rb") as f: email = BytesParser(policy=default).parse(f) for ext in EXTRACTORS: try: item = ext.parse(email) item_map[item.feed_title].append(item) except ValueError: pass return item_map def generate_feed(items: list[RssItem]) -> FeedGenerator: fg = FeedGenerator() items = sorted(items, key=lambda i: i.date, reverse=True) fg.title(items[0].feed_title) fg.description(items[0].feed_description) fg.link(href=items[0].feed_url, rel="alternate") fg.icon(items[0].icon) for item in items: fe = fg.add_entry() fe.title(item.title) fe.link(href=item.url, rel="alternate") fe.author(name=item.author) fe.published(item.date) return fg def write_feeds_from_mails(in_path: Path, out_path: Path) -> list[str]: print("Parsing mails...") item_map = parse_emails(in_path) print("Building feeds...") feeds = {title: generate_feed(items) for title, items in item_map.items()} print("Writing feeds...") fns = [] for title, fg in feeds.items(): fn = out_path / f"{title}.xml" fg.rss_file(str(fn)) fns.append(fn.name) return fns class TTRSS: def __init__(self, url: str) -> None: self.url = url self.sid = None def _make_request(self, op: str, **kwargs: Any) -> Any: data = {"op": op, **kwargs} if self.sid is not None: data["sid"] = self.sid r = requests.post(self.url, json=data) r.raise_for_status() return r.json() def login(self, user: str, password: str) -> None: body = self._make_request("login", user=user, password=password) if "session_id" in body["content"]: self.sid = body["content"]["session_id"] else: raise Exception(f"Login to TTRSS failed with error {body}") def _assert_logged_in(self) -> None: if self.sid is None: raise Exception("Must be logged in to use this method") def get_feeds(self, cat_id: int = -3) -> list[dict]: self._assert_logged_in() return self._make_request("getFeeds", cat_id=cat_id)["content"] def subscribe(self, feed_url: str, cat_id: int = 0) -> None: self._assert_logged_in() body = self._make_request( "subscribeToFeed", feed_url=feed_url, category_id=cat_id ) status = body["content"]["status"]["code"] if status == 0: raise Exception("Feed already exists") if status != 1: raise Exception(f"Failed to subscribe to {feed_url}: {body}") def subscribe_to_feeds( base_url: str, feed_fns: list[str], ttrss: TTRSS, cat_id: int = 0 ) -> None: urls = [urljoin(base_url, quote(fn)) for fn in feed_fns] existing_feeds = ttrss.get_feeds(cat_id) existing_urls = [feed["feed_url"] for feed in existing_feeds] new_urls = [url for url in urls if url not in existing_urls] for url in new_urls: try: print(f"Subscribing to {url}") ttrss.subscribe(url, cat_id) except Exception as ex: print(ex) @click.command() @click.option("--subscribe", is_flag=True) @click.option("--ttrss-url", type=str) @click.option("--ttrss-user", type=str) @click.option("--ttrss-password", type=str) @click.option("--base-url", type=str) @click.option("--cat-id", type=int, default=DEFAULT_CAT_ID) @click.argument( "in_path", type=click.Path(exists=True, file_okay=False, path_type=Path), default=DEFAULT_IN_DIR, ) @click.argument( "out_path", type=click.Path(exists=True, file_okay=False, path_type=Path), default=DEFAULT_OUT_DIR, ) def cli( subscribe: bool, ttrss_url: str, ttrss_user: str, ttrss_password: str, base_url: str, cat_id: int, in_path: Path, out_path: Path, ): feed_fns = write_feeds_from_mails(in_path, out_path) if subscribe: ttrss = TTRSS(ttrss_url) ttrss.login(ttrss_user, ttrss_password) subscribe_to_feeds(base_url, feed_fns, ttrss, cat_id) if __name__ == "__main__": load_dotenv() cli(auto_envvar_prefix="RSS")