253 lines
7.0 KiB
Python
Executable File
253 lines
7.0 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
import re
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from email.message import EmailMessage
|
|
from email.parser import BytesParser
|
|
from email.policy import default
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
from urllib.parse import quote, urljoin
|
|
|
|
import click
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
from feedgen.feed import FeedGenerator
|
|
from tqdm import tqdm
|
|
|
|
DEFAULT_IN_DIR = Path("/home/fran/mail/personal/rss/cur/")
|
|
DEFAULT_OUT_DIR = Path("/home/yunohost.app/gossa/")
|
|
DEFAULT_CAT_ID = 11 # webnovels
|
|
|
|
|
|
@dataclass
|
|
class RssItem:
|
|
title: str
|
|
url: str
|
|
date: datetime
|
|
feed_title: str
|
|
description: Optional[str] = None
|
|
author: Optional[str] = None
|
|
feed_description: Optional[str] = None
|
|
feed_url: Optional[str] = None
|
|
icon: Optional[str] = None
|
|
|
|
|
|
class ParserFFN:
|
|
SUBJECT_RE = re.compile(r"^Chapter: (.*) Ch[0-9]+ .*")
|
|
BODY_RE = re.compile(
|
|
r"""
|
|
New\ chapter\ from\ (?P<author>.+),\n
|
|
\n
|
|
\s*(?P<feed_title>.+)\n
|
|
\s*Chapter\ \d+:\ (?P<title>.+)\n
|
|
\n
|
|
\s*(?P<url>.+/)[^/]*\n
|
|
""",
|
|
re.VERBOSE,
|
|
)
|
|
SUMMARY_RE = re.compile(r"^\s*Summary: (.*)", re.MULTILINE)
|
|
FEED_URL_RE = re.compile(r"(.*/)\d+/")
|
|
|
|
@staticmethod
|
|
def parse(email: EmailMessage) -> RssItem:
|
|
subject = email["subject"]
|
|
body = email.get_content()
|
|
m = ParserFFN.SUBJECT_RE.match(subject)
|
|
if m is None:
|
|
raise ValueError("Not an ffn email")
|
|
|
|
m = ParserFFN.BODY_RE.search(body)
|
|
if m is None:
|
|
print("Malformed ffn email:")
|
|
print(email)
|
|
raise ValueError("Malformed ffn email")
|
|
|
|
date = datetime.strptime(email["date"], "%a, %d %b %Y %H:%M:%S %z")
|
|
item = RssItem(
|
|
title=m.group("title"),
|
|
url=m.group("url"),
|
|
author=m.group("author"),
|
|
feed_title=m.group("feed_title"),
|
|
date=date,
|
|
icon="https://www.fanfiction.net/favicon.ico",
|
|
)
|
|
|
|
m = ParserFFN.FEED_URL_RE.search(item.url)
|
|
if m is None:
|
|
print(f"Malformed ffn URL: {item.url}")
|
|
raise ValueError(f"Malformed ffn email URL: {item.url}")
|
|
item.feed_url = m.group(1)
|
|
|
|
m = ParserFFN.SUMMARY_RE.search(body)
|
|
if m is not None:
|
|
item.feed_description = m.group(1)
|
|
|
|
return item
|
|
|
|
|
|
EXTRACTORS = [ParserFFN]
|
|
|
|
|
|
def escape_fn(fn):
|
|
escapes = r" '?()"
|
|
for c in escapes:
|
|
fn = fn.replace(c, f"\\{c}")
|
|
|
|
return fn
|
|
|
|
|
|
def remove_tricky_characters(title):
|
|
chars = r"?"
|
|
for c in chars:
|
|
title = title.replace(c, "")
|
|
return title
|
|
|
|
|
|
def parse_emails(path: Path) -> dict[str, list[RssItem]]:
|
|
item_map = defaultdict(list)
|
|
|
|
for fn in tqdm(list(path.iterdir())):
|
|
with open(fn, "rb") as f:
|
|
email = BytesParser(policy=default).parse(f)
|
|
for ext in EXTRACTORS:
|
|
try:
|
|
item = ext.parse(email)
|
|
item_map[item.feed_title].append(item)
|
|
except ValueError:
|
|
pass
|
|
|
|
return item_map
|
|
|
|
|
|
def generate_feed(items: list[RssItem]) -> FeedGenerator:
|
|
fg = FeedGenerator()
|
|
items = sorted(items, key=lambda i: i.date, reverse=True)
|
|
fg.title(items[0].feed_title)
|
|
fg.description(items[0].feed_description)
|
|
fg.link(href=items[0].feed_url, rel="alternate")
|
|
fg.icon(items[0].icon)
|
|
|
|
for item in items:
|
|
fe = fg.add_entry()
|
|
fe.title(item.title)
|
|
fe.link(href=item.url, rel="alternate")
|
|
fe.author(name=item.author)
|
|
fe.published(item.date)
|
|
|
|
return fg
|
|
|
|
|
|
def write_feeds_from_mails(in_path: Path, out_path: Path) -> list[str]:
|
|
print("Parsing mails...")
|
|
item_map = parse_emails(in_path)
|
|
|
|
print("Building feeds...")
|
|
feeds = {title: generate_feed(items) for title, items in item_map.items()}
|
|
|
|
print("Writing feeds...")
|
|
fns = []
|
|
for title, fg in feeds.items():
|
|
fn = out_path / f"{title}.xml"
|
|
fg.rss_file(str(fn))
|
|
fns.append(fn.name)
|
|
|
|
return fns
|
|
|
|
|
|
class TTRSS:
|
|
def __init__(self, url: str) -> None:
|
|
self.url = url
|
|
self.sid = None
|
|
|
|
def _make_request(self, op: str, **kwargs: Any) -> Any:
|
|
data = {"op": op, **kwargs}
|
|
if self.sid is not None:
|
|
data["sid"] = self.sid
|
|
|
|
r = requests.post(self.url, json=data)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
def login(self, user: str, password: str) -> None:
|
|
body = self._make_request("login", user=user, password=password)
|
|
if "session_id" in body["content"]:
|
|
self.sid = body["content"]["session_id"]
|
|
else:
|
|
raise Exception(f"Login to TTRSS failed with error {body}")
|
|
|
|
def _assert_logged_in(self) -> None:
|
|
if self.sid is None:
|
|
raise Exception("Must be logged in to use this method")
|
|
|
|
def get_feeds(self, cat_id: int = -3) -> list[dict]:
|
|
self._assert_logged_in()
|
|
return self._make_request("getFeeds", cat_id=cat_id)["content"]
|
|
|
|
def subscribe(self, feed_url: str, cat_id: int = 0) -> None:
|
|
self._assert_logged_in()
|
|
body = self._make_request(
|
|
"subscribeToFeed", feed_url=feed_url, category_id=cat_id
|
|
)
|
|
status = body["content"]["status"]["code"]
|
|
if status == 0:
|
|
raise Exception("Feed already exists")
|
|
if status != 1:
|
|
raise Exception(f"Failed to subscribe to {feed_url}: {body}")
|
|
|
|
|
|
def subscribe_to_feeds(
|
|
base_url: str, feed_fns: list[str], ttrss: TTRSS, cat_id: int = 0
|
|
) -> None:
|
|
urls = [urljoin(base_url, quote(fn)) for fn in feed_fns]
|
|
existing_feeds = ttrss.get_feeds(cat_id)
|
|
existing_urls = [feed["feed_url"] for feed in existing_feeds]
|
|
new_urls = [url for url in urls if url not in existing_urls]
|
|
for url in new_urls:
|
|
try:
|
|
print(f"Subscribing to {url}")
|
|
ttrss.subscribe(url, cat_id)
|
|
except Exception as ex:
|
|
print(ex)
|
|
|
|
|
|
@click.command()
|
|
@click.option("--subscribe", is_flag=True)
|
|
@click.option("--ttrss-url", type=str)
|
|
@click.option("--ttrss-user", type=str)
|
|
@click.option("--ttrss-password", type=str)
|
|
@click.option("--base-url", type=str)
|
|
@click.option("--cat-id", type=int, default=DEFAULT_CAT_ID)
|
|
@click.argument(
|
|
"in_path",
|
|
type=click.Path(exists=True, file_okay=False, path_type=Path),
|
|
default=DEFAULT_IN_DIR,
|
|
)
|
|
@click.argument(
|
|
"out_path",
|
|
type=click.Path(exists=True, file_okay=False, path_type=Path),
|
|
default=DEFAULT_OUT_DIR,
|
|
)
|
|
def cli(
|
|
subscribe: bool,
|
|
ttrss_url: str,
|
|
ttrss_user: str,
|
|
ttrss_password: str,
|
|
base_url: str,
|
|
cat_id: int,
|
|
in_path: Path,
|
|
out_path: Path,
|
|
):
|
|
feed_fns = write_feeds_from_mails(in_path, out_path)
|
|
if subscribe:
|
|
ttrss = TTRSS(ttrss_url)
|
|
ttrss.login(ttrss_user, ttrss_password)
|
|
subscribe_to_feeds(base_url, feed_fns, ttrss, cat_id)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
load_dotenv()
|
|
cli(auto_envvar_prefix="RSS")
|