Generate RSS feeds from FFN email alerts
This commit is contained in:
249
mail-rssify.py
Executable file
249
mail-rssify.py
Executable file
@@ -0,0 +1,249 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from email.message import EmailMessage
|
||||
from email.parser import BytesParser
|
||||
from email.policy import default
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
from urllib.parse import quote, urljoin
|
||||
|
||||
import click
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
from feedgen.feed import FeedGenerator
|
||||
from tqdm import tqdm
|
||||
|
||||
DEFAULT_IN_DIR = Path("/home/fran/mail/personal/rss/cur/")
|
||||
DEFAULT_OUT_DIR = Path("/home/yunohost.app/gossa/")
|
||||
DEFAULT_CAT_ID = 11 # webnovels
|
||||
|
||||
|
||||
@dataclass
|
||||
class RssItem:
|
||||
title: str
|
||||
url: str
|
||||
date: datetime
|
||||
feed_title: str
|
||||
description: Optional[str] = None
|
||||
author: Optional[str] = None
|
||||
feed_description: Optional[str] = None
|
||||
feed_url: Optional[str] = None
|
||||
icon: Optional[str] = None
|
||||
|
||||
|
||||
class ParserFFN:
|
||||
SUBJECT_RE = re.compile(r"^Chapter: (.*) Ch[0-9]+ .*")
|
||||
BODY_RE = re.compile(
|
||||
r"""
|
||||
New\ chapter\ from\ (?P<author>.+),\n
|
||||
\n
|
||||
\s*(?P<feed_title>.+)\n
|
||||
\s*Chapter\ \d+:\ (?P<title>.+)\n
|
||||
\n
|
||||
\s*(?P<url>.+/)[^/]*\n
|
||||
""",
|
||||
re.VERBOSE,
|
||||
)
|
||||
SUMMARY_RE = re.compile(r"^\s*Summary: (.*)", re.MULTILINE)
|
||||
FEED_URL_RE = re.compile(r"(.*/)\d+/")
|
||||
|
||||
@staticmethod
|
||||
def parse(email: EmailMessage) -> RssItem:
|
||||
subject = email["subject"]
|
||||
body = email.get_content()
|
||||
m = ParserFFN.SUBJECT_RE.match(subject)
|
||||
if m is None:
|
||||
raise ValueError("Not an ffn email")
|
||||
|
||||
m = ParserFFN.BODY_RE.search(body)
|
||||
if m is None:
|
||||
print("Malformed ffn email:")
|
||||
print(email)
|
||||
raise ValueError("Malformed ffn email")
|
||||
|
||||
date = datetime.strptime(email["date"], "%a, %d %b %Y %H:%M:%S %z")
|
||||
item = RssItem(
|
||||
title=m.group("title"),
|
||||
url=m.group("url"),
|
||||
author=m.group("author"),
|
||||
feed_title=m.group("feed_title"),
|
||||
date=date,
|
||||
icon="https://www.fanfiction.net/favicon.ico",
|
||||
)
|
||||
|
||||
m = ParserFFN.FEED_URL_RE.search(item.url)
|
||||
if m is None:
|
||||
print(f"Malformed ffn URL: {item.url}")
|
||||
raise ValueError(f"Malformed ffn email URL: {item.url}")
|
||||
item.feed_url = m.group(1)
|
||||
|
||||
m = ParserFFN.SUMMARY_RE.search(body)
|
||||
if m is not None:
|
||||
item.feed_description = m.group(1)
|
||||
|
||||
return item
|
||||
|
||||
|
||||
EXTRACTORS = [ParserFFN]
|
||||
|
||||
|
||||
def escape_fn(fn):
|
||||
escapes = r" '?()"
|
||||
for c in escapes:
|
||||
fn = fn.replace(c, f"\\{c}")
|
||||
|
||||
return fn
|
||||
|
||||
|
||||
def remove_tricky_characters(title):
|
||||
chars = r"?"
|
||||
for c in chars:
|
||||
title = title.replace(c, "")
|
||||
return title
|
||||
|
||||
|
||||
def parse_emails(path: Path) -> dict[str, list[RssItem]]:
|
||||
item_map = defaultdict(list)
|
||||
|
||||
for fn in tqdm(list(path.iterdir())):
|
||||
with open(fn, "rb") as f:
|
||||
email = BytesParser(policy=default).parse(f)
|
||||
for ext in EXTRACTORS:
|
||||
try:
|
||||
item = ext.parse(email)
|
||||
item_map[item.feed_title].append(item)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return item_map
|
||||
|
||||
|
||||
def generate_feed(items: list[RssItem]) -> FeedGenerator:
|
||||
fg = FeedGenerator()
|
||||
items = sorted(items, key=lambda i: i.date, reverse=True)
|
||||
fg.title(items[0].feed_title)
|
||||
fg.description(items[0].feed_description)
|
||||
fg.link(href=items[0].feed_url, rel="alternate")
|
||||
fg.icon(items[0].icon)
|
||||
|
||||
for item in items:
|
||||
fe = fg.add_entry()
|
||||
fe.title(item.title)
|
||||
fe.link(href=item.url, rel="alternate")
|
||||
fe.author(name=item.author)
|
||||
fe.published(item.date)
|
||||
|
||||
return fg
|
||||
|
||||
|
||||
def write_feeds_from_mails(in_path: Path, out_path: Path) -> list[str]:
|
||||
print("Parsing mails...")
|
||||
item_map = parse_emails(in_path)
|
||||
|
||||
print("Building feeds...")
|
||||
feeds = {title: generate_feed(items) for title, items in item_map.items()}
|
||||
|
||||
print("Writing feeds...")
|
||||
fns = []
|
||||
for title, fg in feeds.items():
|
||||
fn = out_path / f"{title}.xml"
|
||||
fg.rss_file(str(fn))
|
||||
fns.append(fn.name)
|
||||
|
||||
return fns
|
||||
|
||||
|
||||
class TTRSS:
|
||||
def __init__(self, url: str) -> None:
|
||||
self.url = url
|
||||
self.sid = None
|
||||
|
||||
def _make_request(self, op: str, **kwargs: Any) -> Any:
|
||||
data = {"op": op, **kwargs}
|
||||
if self.sid is not None:
|
||||
data["sid"] = self.sid
|
||||
|
||||
r = requests.post(self.url, json=data)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def login(self, user: str, password: str) -> None:
|
||||
body = self._make_request("login", user=user, password=password)
|
||||
if "session_id" in body["content"]:
|
||||
self.sid = body["content"]["session_id"]
|
||||
else:
|
||||
raise Exception(f"Login to TTRSS failed with error {body}")
|
||||
|
||||
def _assert_logged_in(self) -> None:
|
||||
if self.sid is None:
|
||||
raise Exception("Must be logged in to use this method")
|
||||
|
||||
def get_feeds(self, cat_id: int = -3) -> list[dict]:
|
||||
self._assert_logged_in()
|
||||
return self._make_request("getFeeds", cat_id=cat_id)["content"]
|
||||
|
||||
def subscribe(self, feed_url: str, cat_id: int = 0) -> None:
|
||||
self._assert_logged_in()
|
||||
body = self._make_request(
|
||||
"subscribeToFeed", feed_url=feed_url, category_id=cat_id
|
||||
)
|
||||
if body["content"]["status"]["code"] != 1:
|
||||
raise Exception(f"Failed to subscribe to {feed_url}: {body}")
|
||||
|
||||
|
||||
def subscribe_to_feeds(
|
||||
base_url: str, feed_fns: list[str], ttrss: TTRSS, cat_id: int = 0
|
||||
) -> None:
|
||||
urls = [urljoin(base_url, quote(fn)) for fn in feed_fns]
|
||||
existing_feeds = ttrss.get_feeds(cat_id)
|
||||
existing_urls = [feed["feed_url"] for feed in existing_feeds]
|
||||
new_urls = [url for url in urls if url not in existing_urls]
|
||||
for url in new_urls:
|
||||
try:
|
||||
print(f"Subscribing to {url}")
|
||||
ttrss.subscribe(url, cat_id)
|
||||
except Exception as ex:
|
||||
print(ex)
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option("--subscribe", is_flag=True)
|
||||
@click.option("--ttrss-url", type=str)
|
||||
@click.option("--ttrss-user", type=str)
|
||||
@click.option("--ttrss-password", type=str)
|
||||
@click.option("--base-url", type=str)
|
||||
@click.option("--cat-id", type=int, default=DEFAULT_CAT_ID)
|
||||
@click.argument(
|
||||
"in_path",
|
||||
type=click.Path(exists=True, file_okay=False, path_type=Path),
|
||||
default=DEFAULT_IN_DIR,
|
||||
)
|
||||
@click.argument(
|
||||
"out_path",
|
||||
type=click.Path(exists=True, file_okay=False, path_type=Path),
|
||||
default=DEFAULT_OUT_DIR,
|
||||
)
|
||||
def cli(
|
||||
subscribe: bool,
|
||||
ttrss_url: str,
|
||||
ttrss_user: str,
|
||||
ttrss_password: str,
|
||||
base_url: str,
|
||||
cat_id: int,
|
||||
in_path: Path,
|
||||
out_path: Path,
|
||||
):
|
||||
feed_fns = write_feeds_from_mails(in_path, out_path)
|
||||
if subscribe:
|
||||
ttrss = TTRSS(ttrss_url)
|
||||
ttrss.login(ttrss_user, ttrss_password)
|
||||
subscribe_to_feeds(base_url, feed_fns, ttrss, cat_id)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
load_dotenv()
|
||||
cli(auto_envvar_prefix="RSS")
|
||||
Reference in New Issue
Block a user