Files
hxbooks/src/hxbooks/main.py

734 lines
26 KiB
Python

"""
Main application routes for HXBooks frontend.
Provides clean URL structure and integrates with library.py business logic.
"""
import logging
import os
import tempfile
from datetime import date
from pathlib import Path
from typing import Annotated, Any, Literal
from flask import (
Blueprint,
Response,
current_app,
flash,
g,
redirect,
render_template,
request,
send_from_directory,
session,
url_for,
)
from flask.typing import ResponseReturnValue
from pydantic import (
AnyHttpUrl,
BaseModel,
BeforeValidator,
Field,
StringConstraints,
ValidationError,
)
from pydantic_extra_types.isbn import ISBN
from hxbooks.models import Reading, User
from hxbooks.search import Field as SearchField
from hxbooks.search import IsOperatorValue, SortDirection
from . import library
from .db import db
bp = Blueprint("main", __name__)
# Get logger for this module
logger = logging.getLogger(__name__)
# Pydantic validation models
StrOrNone = Annotated[str | None, BeforeValidator(lambda v: v.strip() or None)]
StripStr = Annotated[str, StringConstraints(strip_whitespace=True)]
ISBNOrEmpty = Annotated[ISBN | Literal[""], BeforeValidator(lambda v: v.strip() or "")]
TextareaList = Annotated[
list[str],
BeforeValidator(
lambda v: (
[line.strip() for line in v.split("\n") if line.strip()]
if isinstance(v, str)
else v or []
)
),
]
DateOrNone = Annotated[date | None, BeforeValidator(lambda v: v.strip() or None)]
IntOrNone = Annotated[int | None, BeforeValidator(lambda v: v.strip() or None)]
UrlOrNone = Annotated[AnyHttpUrl | None, BeforeValidator(lambda v: v.strip() or None)]
class BookFormData(BaseModel):
title: StripStr = Field(min_length=1)
owner: StrOrNone = None
isbn: ISBNOrEmpty = ""
authors: TextareaList = Field(default_factory=list)
genres: TextareaList = Field(default_factory=list)
first_published: IntOrNone = Field(default=None, le=2030)
publisher: StripStr = Field(default="")
edition: StripStr = Field(default="")
description: StripStr = Field(default="")
notes: StripStr = Field(default="")
location_place: StripStr = Field(default="")
location_bookshelf: StripStr = Field(default="")
location_shelf: IntOrNone = Field(default=None, ge=1)
loaned_to: StripStr = Field(default="")
loaned_date: DateOrNone = None
cover_image_url: UrlOrNone = None
delete_cover: bool = Field(default=False)
class ReadingFormData(BaseModel):
start_date: date
end_date: DateOrNone = None
dropped: bool = Field(default=False)
rating: IntOrNone = Field(default=None, ge=1, le=5)
comments: StripStr = Field(default="")
def _flash_validation_errors(e: ValidationError) -> None:
"""Helper to flash validation errors."""
error = e.errors()[0]
loc = " -> ".join(str(v) for v in error.get("loc", []))
msg = error.get("msg", "Invalid input")
logger.warning(f"Validation error in '{loc}': {msg} | Full errors: {e.errors()}")
flash(f"Validation error in '{loc}': {msg}", "error")
@bp.before_app_request
def load_users() -> None:
"""Load all users and current viewing context."""
# Get all users for the user selector
all_users = library.list_users()
# Set template context
g.users = all_users
g.viewing_user = next(
(user for user in all_users if user.username == session.get("viewing_as_user")),
None,
)
g.saved_searches = {}
# Load saved searches if viewing as a specific user
if g.viewing_user:
g.saved_searches = g.viewing_user.saved_searches or {}
@bp.after_request
def add_header(response: ResponseReturnValue) -> ResponseReturnValue:
# response.cache_control.no_store = True
if isinstance(response, Response) and "Cache-Control" not in response.headers:
response.headers["Cache-Control"] = "no-store"
return response
# Template context processor to make users and searches available in templates
@bp.app_context_processor
def inject_template_vars() -> dict[str, Any]:
return {
"users": getattr(g, "users", []),
"saved_searches": getattr(g, "saved_searches", {}),
}
RESULTS_PER_PAGE = 10
def _search_suggestions() -> dict[str, list[str]]:
"""Get suggestions for search autocomplete."""
s: dict[SearchField, list[str]] = {f: [] for f in SearchField}
s[SearchField.IS] = [
str(v) for v in IsOperatorValue if v != IsOperatorValue.UNKNOWN
]
s[SearchField.SORT] = [
f"{f}-{d}"
for f in SearchField
if f not in {SearchField.IS, SearchField.SORT}
for d in SortDirection
]
s[SearchField.GENRE] = [g.name for g in library.list_genres()]
s[SearchField.AUTHOR] = [a.name for a in library.list_authors()]
s[SearchField.PLACE] = [p for p in library.list_locations().keys()]
s[SearchField.BOOKSHELF] = list([
bs for shelves in library.list_locations().values() for bs in shelves
])
s[SearchField.OWNER] = [u.username for u in library.list_users()]
return {
str(k): v for k, v in s.items()
} # Convert Enum keys to strings for template use
@bp.route("/")
def index() -> ResponseReturnValue:
"""Book list view - main application page."""
# Get search parameters
query = request.args.get("q", "")
page = request.args.get("page", 1, type=int)
# Ensure valid pagination values
page = max(1, page)
offset = (page - 1) * RESULTS_PER_PAGE
# Get current viewing user
viewing_user = session.get("viewing_as_user")
try:
books, total_count = library.search_books_advanced(
query, limit=RESULTS_PER_PAGE, offset=offset, username=viewing_user
)
except Exception as e:
logger.error(f"Search error for query '{query}': {e}", exc_info=True)
flash(f"Search error: {e}", "error")
books, total_count = [], 0
# Calculate pagination info
total_pages = (total_count + RESULTS_PER_PAGE - 1) // RESULTS_PER_PAGE
has_prev = page > 1
has_next = page < total_pages
prev_num = page - 1 if has_prev else None
next_num = page + 1 if has_next else None
return render_template(
"book/list.html.j2",
books=books,
query=query,
pagination={
"page": page,
"per_page": RESULTS_PER_PAGE,
"total": total_count,
"pages": total_pages,
"has_prev": has_prev,
"has_next": has_next,
"prev_num": prev_num,
"next_num": next_num,
},
search_suggestions=_search_suggestions(),
)
@bp.route("/book/<int:book_id>")
def book_detail(book_id: int) -> ResponseReturnValue:
"""Book details and edit page."""
book = library.get_book(book_id)
if not book:
flash("Book not found", "error")
return redirect(url_for("main.index"))
return render_template(
"book/detail.html.j2",
book=book,
genres=library.list_genres(),
authors=library.list_authors(),
locations=library.list_locations(),
)
def _get_or_create_user(username: str) -> int:
"""Helper to get or create a user by username."""
owner = library.get_user_by_username(username)
if not owner:
# Create new user if username doesn't exist
owner = library.create_user(username)
return owner.id
def _handle_cover_image(form_data: BookFormData) -> str | None:
uploaded_file = request.files.get("cover_image_file")
cover_image_url = None
if uploaded_file and uploaded_file.filename and uploaded_file.filename.strip():
# User uploaded a file - replace cover
with tempfile.NamedTemporaryFile(
delete=False, suffix=os.path.splitext(uploaded_file.filename)[1]
) as tmp_file:
uploaded_file.save(tmp_file.name)
cover_image_url = f"file://{tmp_file.name}"
elif form_data.cover_image_url:
# User entered URL - replace cover
cover_image_url = str(form_data.cover_image_url)
elif form_data.delete_cover:
# User wants to delete cover
cover_image_url = ""
return cover_image_url
@bp.route("/book/new", methods=["GET", "POST"])
def create_book() -> ResponseReturnValue:
"""Create a new book."""
if request.method == "POST":
try:
# Validate form data with Pydantic
form_data = BookFormData.model_validate(dict(request.form))
# Handle cover image from file upload or URL
cover_image_url = _handle_cover_image(form_data)
# Get owner ID if provided
owner_id = _get_or_create_user(form_data.owner) if form_data.owner else None
# Create book with validated data
book = library.create_book(
title=form_data.title,
owner_id=owner_id,
authors=form_data.authors,
genres=form_data.genres,
isbn=str(form_data.isbn),
publisher=form_data.publisher,
edition=form_data.edition,
description=form_data.description,
notes=form_data.notes,
location_place=form_data.location_place,
location_bookshelf=form_data.location_bookshelf,
location_shelf=form_data.location_shelf,
first_published=form_data.first_published,
loaned_to=form_data.loaned_to,
loaned_date=form_data.loaned_date,
cover_image_url=cover_image_url,
)
flash(f"Book '{form_data.title}' created successfully!", "success")
return redirect(url_for("main.book_detail", book_id=book.id))
except ValidationError as e:
_flash_validation_errors(e)
return render_template("book/create.html.j2", form_data=request.form)
except Exception as e:
logger.error(f"Error creating book '{form_data.title}': {e}", exc_info=True)
flash(f"Error creating book: {e}", "error")
return render_template("book/create.html.j2", form_data=request.form)
return render_template("book/create.html.j2")
@bp.route("/book/<int:book_id>/edit", methods=["POST"])
def update_book(book_id: int) -> ResponseReturnValue:
"""Update an existing book."""
book = library.get_book(book_id)
if not book:
flash("Book not found", "error")
return redirect(url_for("main.index"))
try:
# Validate form data with Pydantic
form_data = BookFormData.model_validate(dict(request.form))
# Handle cover image from file upload or URL
cover_image_url = _handle_cover_image(form_data)
if cover_image_url is None:
cover_image_url = book.cover_image_path # Keep existing if no new input
# Get owner ID if provided
owner_id = _get_or_create_user(form_data.owner) if form_data.owner else None
# Update book with validated data
library.update_book(
book_id=book_id,
set_all_fields=True, # Ensure all fields are updated, even if None/empty
title=form_data.title,
owner_id=owner_id,
authors=form_data.authors,
genres=form_data.genres,
isbn=form_data.isbn,
publisher=form_data.publisher,
edition=form_data.edition,
description=form_data.description,
notes=form_data.notes,
location_place=form_data.location_place,
location_bookshelf=form_data.location_bookshelf,
location_shelf=form_data.location_shelf,
first_published=form_data.first_published,
loaned_to=form_data.loaned_to,
loaned_date=form_data.loaned_date,
cover_image_url=cover_image_url,
)
flash("Book updated successfully!", "success")
except ValidationError as e:
# Format validation errors for display
_flash_validation_errors(e)
except Exception as e:
logger.error(f"Error updating book '{book_id}': {e}", exc_info=True)
flash(f"Error updating book: {e}", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
@bp.route("/book/<int:book_id>/delete", methods=["GET", "POST"])
def delete_book(book_id: int) -> ResponseReturnValue:
"""Delete a book (GET shows confirmation, POST performs deletion)."""
book = library.get_book(book_id)
if not book:
flash("Book not found", "error")
return redirect(url_for("main.index"))
if request.method == "POST":
# Perform the actual deletion
try:
title = book.title
library.delete_book(book_id)
flash(f"Book '{title}' deleted successfully!", "success")
except Exception as e:
logger.error(f"Error deleting book '{book_id}': {e}", exc_info=True)
flash(f"Error deleting book: {e}", "error")
return redirect(url_for("main.index"))
# Show confirmation page
return render_template("book/delete_confirm.html.j2", book=book)
@bp.route("/import", methods=["GET", "POST"])
def import_book() -> ResponseReturnValue:
"""Import a book from ISBN."""
if request.method == "POST":
isbn = request.form.get("isbn", "").strip()
if not isbn:
flash("ISBN is required", "error")
return redirect(url_for("main.index"))
try:
# Get current viewing user as owner
viewing_user = g.get("viewing_user")
# Import book from ISBN
book = library.import_book_from_isbn(
isbn=isbn, owner_id=viewing_user.id if viewing_user else None
)
flash(f"Book '{book.title}' imported successfully!", "success")
return redirect(url_for("main.book_detail", book_id=book.id))
except Exception as e:
logger.error(f"Error importing book with ISBN '{isbn}': {e}", exc_info=True)
flash(f"Import error: {e}", "error")
return redirect(url_for("main.index"))
@bp.route("/set-user/<username>")
@bp.route("/set-user/")
def set_viewing_user(username: str = "") -> ResponseReturnValue:
"""Set the current viewing user context."""
if username:
user = library.get_user_by_username(username)
if user:
session["viewing_as_user"] = user.username
flash(f"Now viewing as {username.title()}", "info")
else:
flash(f"User '{username}' not found", "error")
else:
session.pop("viewing_as_user", None)
flash("Now viewing all users", "info")
return redirect(request.referrer or url_for("main.index"))
def _save_search(user: User, search_name: str, query_params: str) -> bool:
"""Save a search for a user. Mock implementation."""
# Initialize saved_searches if None
user.saved_searches = user.saved_searches | {search_name: query_params} # noqa: PLR6104
db.session.commit()
return True
def _delete_saved_search(user: User, search_name: str) -> bool:
"""Delete a saved search for a user. Mock implementation."""
if search_name in user.saved_searches:
user.saved_searches = {
k: v for k, v in user.saved_searches.items() if k != search_name
} # needs to be a new object to trigger SQLAlchemy change detection
db.session.commit()
return True
return False
@bp.route("/saved-search", methods=["POST"])
def save_search_route() -> ResponseReturnValue:
"""Save a search for the current user."""
viewing_user = g.get("viewing_user")
if not viewing_user:
flash("You must select a user to save searches", "error")
return redirect(url_for("main.index"))
search_name = request.form.get("name", "").strip()
query_params = request.form.get("query_params", "")
if not search_name:
flash("Search name is required", "error")
return redirect(url_for("main.index", q=query_params))
try:
success = _save_search(viewing_user, search_name, query_params)
if success:
flash(f"Search '{search_name}' saved successfully!", "success")
else:
flash("Error saving search", "error")
except Exception as e:
logger.error(
f"Error saving search '{search_name}' for user '{viewing_user.username}': {e}",
exc_info=True,
)
flash(f"Error saving search: {e}", "error")
return redirect(url_for("main.index", q=query_params))
@bp.route("/book/<int:book_id>/reading/start", methods=["POST"])
def start_reading_route(book_id: int) -> ResponseReturnValue:
"""Start reading a book."""
viewing_user = g.get("viewing_user")
if not viewing_user:
flash("You must select a user to start reading", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
try:
library.start_reading(book_id=book_id, user_id=viewing_user.id)
flash("Started reading!", "success")
except Exception as e:
logger.error(
f"Error starting reading for book '{book_id}' and user '{viewing_user.username}': {e}",
exc_info=True,
)
flash(f"Error starting reading: {e}", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
@bp.route("/book/<int:book_id>/reading/finish", methods=["POST"])
def finish_reading_route(book_id: int) -> ResponseReturnValue:
"""Finish reading a book."""
viewing_user = g.get("viewing_user")
if not viewing_user:
flash("You must select a user to finish reading", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
try:
# Find current reading for this user and book
current_readings = library.get_current_readings(user_id=viewing_user.id)
current_reading = next(
(r for r in current_readings if r.book_id == book_id), None
)
if not current_reading:
flash("No active reading session found", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
library.finish_reading(reading_id=current_reading.id)
flash("Finished reading!", "success")
except Exception as e:
logger.error(
f"Error finishing reading for book '{book_id}' and user '{viewing_user.username}': {e}",
exc_info=True,
)
flash(f"Error finishing reading: {e}", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
@bp.route("/book/<int:book_id>/reading/drop", methods=["POST"])
def drop_reading_route(book_id: int) -> ResponseReturnValue:
"""Drop reading a book."""
viewing_user = g.get("viewing_user")
if not viewing_user:
flash("You must select a user to drop reading", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
try:
# Find current reading for this user and book
current_readings = library.get_current_readings(user_id=viewing_user.id)
current_reading = next(
(r for r in current_readings if r.book_id == book_id), None
)
if not current_reading:
flash("No active reading session found", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
library.drop_reading(reading_id=current_reading.id)
flash("Dropped reading", "info")
except Exception as e:
logger.error(
f"Error dropping reading for book '{book_id}' and user '{viewing_user.username}': {e}",
exc_info=True,
)
flash(f"Error dropping reading: {e}", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
@bp.route("/book/<int:book_id>/wishlist/add", methods=["POST"])
def add_to_wishlist_route(book_id: int) -> ResponseReturnValue:
"""Add book to wishlist."""
viewing_user = g.get("viewing_user")
if not viewing_user:
flash("You must select a user to add to wishlist", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
try:
library.add_to_wishlist(book_id=book_id, user_id=viewing_user.id)
flash("Added to wishlist!", "success")
except Exception as e:
logger.error(
f"Error adding book '{book_id}' to wishlist for user '{viewing_user.username}': {e}",
exc_info=True,
)
flash(f"Error adding to wishlist: {e}", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
@bp.route("/book/<int:book_id>/wishlist/remove", methods=["POST"])
def remove_from_wishlist_route(book_id: int) -> ResponseReturnValue:
"""Remove book from wishlist."""
viewing_user = g.get("viewing_user")
if not viewing_user:
flash("You must select a user to remove from wishlist", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
try:
removed = library.remove_from_wishlist(book_id=book_id, user_id=viewing_user.id)
if removed:
flash("Removed from wishlist", "info")
else:
flash("Book was not in wishlist", "warning")
except Exception as e:
logger.error(
f"Error removing book '{book_id}' from wishlist for user '{viewing_user.username}': {e}",
exc_info=True,
)
flash(f"Error removing from wishlist: {e}", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
@bp.route("/book/<int:book_id>/reading/<int:reading_id>/update", methods=["POST"])
def update_reading_route(book_id: int, reading_id: int) -> ResponseReturnValue:
"""Update a single reading session."""
viewing_user = g.get("viewing_user")
if not viewing_user:
flash("You must select a user to update readings", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
try:
# Get and verify the reading belongs to current user
reading = db.session.get(Reading, reading_id)
if not reading or reading.user_id != viewing_user.id:
flash("Reading not found or not yours to modify", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
# Validate the form data
form_data = ReadingFormData.model_validate(dict(request.form))
# Update the reading with validated data
reading.start_date = form_data.start_date
reading.end_date = form_data.end_date
reading.dropped = form_data.dropped
reading.rating = form_data.rating
reading.comments = form_data.comments
db.session.commit()
flash("Reading updated successfully!", "success")
except ValidationError as e:
db.session.rollback() # Rollback any partial changes on validation error
_flash_validation_errors(e)
except Exception as e:
db.session.rollback() # Rollback any partial changes on general error
logger.error(
f"Error updating reading for book '{book_id}' and user '{viewing_user.username}': {e}",
exc_info=True,
)
flash(f"Error updating reading: {e}", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
@bp.route("/book/<int:book_id>/reading/<int:reading_id>/delete", methods=["POST"])
def delete_reading_route(book_id: int, reading_id: int) -> ResponseReturnValue:
"""Delete a reading session."""
viewing_user = g.get("viewing_user")
if not viewing_user:
flash("You must select a user to delete readings", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
try:
# Verify the reading belongs to the current user
reading = db.session.get(Reading, reading_id)
if reading and reading.user_id == viewing_user.id:
deleted = library.delete_reading(reading_id)
if deleted:
flash("Reading session deleted", "info")
else:
flash("Reading session not found", "warning")
else:
flash("Reading session not found or not yours to delete", "error")
except Exception as e:
logger.error(
f"Error deleting reading for book '{book_id}' and user '{viewing_user.username}': {e}",
exc_info=True,
)
flash(f"Error deleting reading: {e}", "error")
return redirect(url_for("main.book_detail", book_id=book_id))
@bp.route("/saved-search/<search_name>/delete", methods=["GET", "POST"])
def delete_saved_search_route(search_name: str) -> ResponseReturnValue:
"""Delete a saved search (GET shows confirmation, POST performs deletion)."""
viewing_user = g.get("viewing_user")
if not viewing_user:
flash("You must select a user to manage saved searches", "error")
return redirect(url_for("main.index"))
# Check if search exists
saved_searches = viewing_user.saved_searches or {}
if search_name not in saved_searches:
flash(f"Saved search '{search_name}' not found", "error")
return redirect(url_for("main.index"))
if request.method == "POST":
# Perform the actual deletion
try:
success = _delete_saved_search(viewing_user, search_name)
if success:
flash(f"Saved search '{search_name}' deleted successfully!", "success")
else:
flash("Error deleting saved search", "error")
except Exception as e:
logger.error(
f"Error deleting saved search '{search_name}' for user '{viewing_user.username}': {e}",
exc_info=True,
)
flash(f"Error deleting saved search: {e}", "error")
return redirect(url_for("main.index"))
# Show confirmation page
return render_template(
"components/delete_search_confirm.html.j2",
search_name=search_name,
search_params=saved_searches[search_name],
)
@bp.route("/media/covers/<filename>")
def serve_cover(filename: str) -> ResponseReturnValue:
"""Serve book cover images from media directory."""
media_path = Path(current_app.instance_path).parent / "media" / "covers"
return send_from_directory(media_path, filename)