commit 2adb6a5243adfa3352cfa9f0a8d4a34ef998602d Author: Francisco Penedo Date: Fri Oct 27 12:55:35 2023 +0200 Generate RSS feeds from FFN email alerts diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2eea525 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..dd8ef5b --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "mypy.enabled": false, + "python.analysis.typeCheckingMode": "basic" +} \ No newline at end of file diff --git a/mail-rssify.py b/mail-rssify.py new file mode 100755 index 0000000..e11c977 --- /dev/null +++ b/mail-rssify.py @@ -0,0 +1,249 @@ +#!/usr/bin/python + +import re +from collections import defaultdict +from dataclasses import dataclass +from datetime import datetime +from email.message import EmailMessage +from email.parser import BytesParser +from email.policy import default +from pathlib import Path +from typing import Any, Optional +from urllib.parse import quote, urljoin + +import click +import requests +from dotenv import load_dotenv +from feedgen.feed import FeedGenerator +from tqdm import tqdm + +DEFAULT_IN_DIR = Path("/home/fran/mail/personal/rss/cur/") +DEFAULT_OUT_DIR = Path("/home/yunohost.app/gossa/") +DEFAULT_CAT_ID = 11 # webnovels + + +@dataclass +class RssItem: + title: str + url: str + date: datetime + feed_title: str + description: Optional[str] = None + author: Optional[str] = None + feed_description: Optional[str] = None + feed_url: Optional[str] = None + icon: Optional[str] = None + + +class ParserFFN: + SUBJECT_RE = re.compile(r"^Chapter: (.*) Ch[0-9]+ .*") + BODY_RE = re.compile( + r""" + New\ chapter\ from\ (?P.+),\n + \n + \s*(?P.+)\n + \s*Chapter\ \d+:\ (?P.+)\n + \n + \s*(?P<url>.+/)[^/]*\n + """, + re.VERBOSE, + ) + SUMMARY_RE = re.compile(r"^\s*Summary: (.*)", re.MULTILINE) + FEED_URL_RE = re.compile(r"(.*/)\d+/") + + @staticmethod + def parse(email: EmailMessage) -> RssItem: + subject = email["subject"] + body = email.get_content() + m = ParserFFN.SUBJECT_RE.match(subject) + if m is None: + raise ValueError("Not an ffn email") + + m = ParserFFN.BODY_RE.search(body) + if m is None: + print("Malformed ffn email:") + print(email) + raise ValueError("Malformed ffn email") + + date = datetime.strptime(email["date"], "%a, %d %b %Y %H:%M:%S %z") + item = RssItem( + title=m.group("title"), + url=m.group("url"), + author=m.group("author"), + feed_title=m.group("feed_title"), + date=date, + icon="https://www.fanfiction.net/favicon.ico", + ) + + m = ParserFFN.FEED_URL_RE.search(item.url) + if m is None: + print(f"Malformed ffn URL: {item.url}") + raise ValueError(f"Malformed ffn email URL: {item.url}") + item.feed_url = m.group(1) + + m = ParserFFN.SUMMARY_RE.search(body) + if m is not None: + item.feed_description = m.group(1) + + return item + + +EXTRACTORS = [ParserFFN] + + +def escape_fn(fn): + escapes = r" '?()" + for c in escapes: + fn = fn.replace(c, f"\\{c}") + + return fn + + +def remove_tricky_characters(title): + chars = r"?" + for c in chars: + title = title.replace(c, "") + return title + + +def parse_emails(path: Path) -> dict[str, list[RssItem]]: + item_map = defaultdict(list) + + for fn in tqdm(list(path.iterdir())): + with open(fn, "rb") as f: + email = BytesParser(policy=default).parse(f) + for ext in EXTRACTORS: + try: + item = ext.parse(email) + item_map[item.feed_title].append(item) + except ValueError: + pass + + return item_map + + +def generate_feed(items: list[RssItem]) -> FeedGenerator: + fg = FeedGenerator() + items = sorted(items, key=lambda i: i.date, reverse=True) + fg.title(items[0].feed_title) + fg.description(items[0].feed_description) + fg.link(href=items[0].feed_url, rel="alternate") + fg.icon(items[0].icon) + + for item in items: + fe = fg.add_entry() + fe.title(item.title) + fe.link(href=item.url, rel="alternate") + fe.author(name=item.author) + fe.published(item.date) + + return fg + + +def write_feeds_from_mails(in_path: Path, out_path: Path) -> list[str]: + print("Parsing mails...") + item_map = parse_emails(in_path) + + print("Building feeds...") + feeds = {title: generate_feed(items) for title, items in item_map.items()} + + print("Writing feeds...") + fns = [] + for title, fg in feeds.items(): + fn = out_path / f"{title}.xml" + fg.rss_file(str(fn)) + fns.append(fn.name) + + return fns + + +class TTRSS: + def __init__(self, url: str) -> None: + self.url = url + self.sid = None + + def _make_request(self, op: str, **kwargs: Any) -> Any: + data = {"op": op, **kwargs} + if self.sid is not None: + data["sid"] = self.sid + + r = requests.post(self.url, json=data) + r.raise_for_status() + return r.json() + + def login(self, user: str, password: str) -> None: + body = self._make_request("login", user=user, password=password) + if "session_id" in body["content"]: + self.sid = body["content"]["session_id"] + else: + raise Exception(f"Login to TTRSS failed with error {body}") + + def _assert_logged_in(self) -> None: + if self.sid is None: + raise Exception("Must be logged in to use this method") + + def get_feeds(self, cat_id: int = -3) -> list[dict]: + self._assert_logged_in() + return self._make_request("getFeeds", cat_id=cat_id)["content"] + + def subscribe(self, feed_url: str, cat_id: int = 0) -> None: + self._assert_logged_in() + body = self._make_request( + "subscribeToFeed", feed_url=feed_url, category_id=cat_id + ) + if body["content"]["status"]["code"] != 1: + raise Exception(f"Failed to subscribe to {feed_url}: {body}") + + +def subscribe_to_feeds( + base_url: str, feed_fns: list[str], ttrss: TTRSS, cat_id: int = 0 +) -> None: + urls = [urljoin(base_url, quote(fn)) for fn in feed_fns] + existing_feeds = ttrss.get_feeds(cat_id) + existing_urls = [feed["feed_url"] for feed in existing_feeds] + new_urls = [url for url in urls if url not in existing_urls] + for url in new_urls: + try: + print(f"Subscribing to {url}") + ttrss.subscribe(url, cat_id) + except Exception as ex: + print(ex) + + +@click.command() +@click.option("--subscribe", is_flag=True) +@click.option("--ttrss-url", type=str) +@click.option("--ttrss-user", type=str) +@click.option("--ttrss-password", type=str) +@click.option("--base-url", type=str) +@click.option("--cat-id", type=int, default=DEFAULT_CAT_ID) +@click.argument( + "in_path", + type=click.Path(exists=True, file_okay=False, path_type=Path), + default=DEFAULT_IN_DIR, +) +@click.argument( + "out_path", + type=click.Path(exists=True, file_okay=False, path_type=Path), + default=DEFAULT_OUT_DIR, +) +def cli( + subscribe: bool, + ttrss_url: str, + ttrss_user: str, + ttrss_password: str, + base_url: str, + cat_id: int, + in_path: Path, + out_path: Path, +): + feed_fns = write_feeds_from_mails(in_path, out_path) + if subscribe: + ttrss = TTRSS(ttrss_url) + ttrss.login(ttrss_user, ttrss_password) + subscribe_to_feeds(base_url, feed_fns, ttrss, cat_id) + + +if __name__ == "__main__": + load_dotenv() + cli(auto_envvar_prefix="RSS")