Compare commits

..

2 Commits

5 changed files with 204 additions and 42 deletions

View File

@@ -9,10 +9,10 @@ from collections.abc import Sequence
from datetime import date, datetime from datetime import date, datetime
from typing import assert_never from typing import assert_never
from sqlalchemy import ColumnElement, and_, or_ from sqlalchemy import ColumnElement, Select, and_, func, or_, select
from sqlalchemy.orm import InstrumentedAttribute, joinedload from sqlalchemy.orm import InstrumentedAttribute, joinedload
from hxbooks.search import IsOperatorValue, QueryParser, ValueT from hxbooks.search import IsOperatorValue, QueryParser, SortDirection, ValueT
from .db import db from .db import db
from .gbooks import fetch_google_book_data from .gbooks import fetch_google_book_data
@@ -78,8 +78,7 @@ def get_book(book_id: int) -> Book | None:
return ( return (
db.session db.session
.execute( .execute(
db select(Book)
.select(Book)
.options( .options(
joinedload(Book.authors), joinedload(Book.authors),
joinedload(Book.genres), joinedload(Book.genres),
@@ -216,7 +215,7 @@ def search_books(
For now implements basic filtering - advanced query parsing will be added later. For now implements basic filtering - advanced query parsing will be added later.
""" """
query = db.select(Book).options( query = select(Book).options(
joinedload(Book.authors), joinedload(Book.genres), joinedload(Book.owner) joinedload(Book.authors), joinedload(Book.genres), joinedload(Book.owner)
) )
@@ -282,9 +281,13 @@ def search_books_advanced(
"""Advanced search with field filters supporting comparison operators.""" """Advanced search with field filters supporting comparison operators."""
parsed_query = query_parser.parse(query_string) parsed_query = query_parser.parse(query_string)
query = db.select(Book).options( query = (
select(Book)
.options(
joinedload(Book.authors), joinedload(Book.genres), joinedload(Book.owner) joinedload(Book.authors), joinedload(Book.genres), joinedload(Book.owner)
) )
.outerjoin(User)
)
conditions = [] conditions = []
@@ -307,8 +310,13 @@ def search_books_advanced(
conditions.append(or_(*text_conditions)) conditions.append(or_(*text_conditions))
# Advanced field filters # Advanced field filters
if parsed_query.field_filters: sort_columns = []
for field_filter in parsed_query.field_filters: for field_filter in parsed_query.field_filters:
if field_filter.field == Field.SORT:
query, sort_column = _build_sort_column(query, field_filter.value, username)
if sort_column is not None:
sort_columns.append(sort_column)
else:
condition = _build_field_condition(field_filter, username) condition = _build_field_condition(field_filter, username)
if condition is not None: if condition is not None:
@@ -320,6 +328,10 @@ def search_books_advanced(
if conditions: if conditions:
query = query.filter(and_(*conditions)) query = query.filter(and_(*conditions))
# Ensure deterministic order by adding added date as tiebreaker
sort_columns.append(Book.added_date.desc())
query = query.order_by(*sort_columns)
query = query.distinct().limit(limit) query = query.distinct().limit(limit)
result = db.session.execute(query) result = db.session.execute(query)
@@ -359,7 +371,7 @@ def _build_field_condition(
case Field.LOANED_DATE: case Field.LOANED_DATE:
field_attr = Book.loaned_date field_attr = Book.loaned_date
case Field.OWNER: case Field.OWNER:
return Book.owner.has(_apply_operator(User.username, operator, value)) field_attr = User.username
case Field.YEAR: case Field.YEAR:
field_attr = Book.first_published field_attr = Book.first_published
case Field.RATING: case Field.RATING:
@@ -406,6 +418,8 @@ def _build_field_condition(
return None return None
case _: case _:
assert_never(value) assert_never(value)
case Field.SORT:
return None
case _: case _:
assert_never(field) assert_never(field)
@@ -413,6 +427,91 @@ def _build_field_condition(
return condition return condition
def _build_sort_column(
query: Select, value: ValueT, username: str | None = None
) -> tuple[Select, ColumnElement | None]:
"""Build a sort column for the 'sort' field."""
assert isinstance(value, tuple) and len(value) == 2
field, direction = value
assert isinstance(field, Field) and isinstance(direction, SortDirection)
match field:
case Field.TITLE:
column = Book.title
case Field.YEAR:
column = Book.first_published
case Field.ADDED_DATE:
column = Book.added_date
case Field.BOUGHT_DATE:
column = Book.bought_date
case Field.LOANED_DATE:
column = Book.loaned_date
case Field.ISBN:
column = Book.isbn
case Field.PLACE:
column = Book.location_place
case Field.BOOKSHELF:
column = Book.location_bookshelf
case Field.SHELF:
column = Book.location_shelf
case Field.OWNER:
column = User.username
case Field.READ_DATE:
# Special handling for sorting by read date - sort by latest reading end
# date
subq = (
select(
Reading.book_id,
func.max(Reading.end_date).label("latest_read_date"),
)
.where(
Reading.user.has(User.username == username),
Reading.end_date.isnot(None),
)
.group_by(Reading.book_id)
.subquery("latest_readings")
)
query = query.outerjoin(subq, Book.id == subq.c.book_id)
column = subq.c.latest_read_date
case Field.RATING:
# Special handling for sorting by rating - sort by latest reading rating
subq = (
select(
Reading.book_id,
Reading.rating.label("latest_rating"),
func
.row_number()
.over(
partition_by=Reading.book_id,
order_by=Reading.end_date.desc(),
)
.label("rn"),
)
.where(
Reading.user.has(User.username == username),
Reading.rating.isnot(None),
Reading.end_date.isnot(None),
)
.subquery("latest_ratings")
)
query = query.outerjoin(
subq, (Book.id == subq.c.book_id) & (subq.c.rn == 1)
)
column = subq.c.latest_rating
# These fields don't make sense to sort by
case Field.AUTHOR | Field.GENRE | Field.IS | Field.SORT:
return query, None
case _:
assert_never(field)
if direction == SortDirection.ASC:
return query, column.asc().nullslast()
elif direction == SortDirection.DESC:
return query, column.desc().nullslast()
else:
return query, None
def _apply_operator( def _apply_operator(
field_attr: InstrumentedAttribute, operator: ComparisonOperator, value: ValueT field_attr: InstrumentedAttribute, operator: ComparisonOperator, value: ValueT
) -> ColumnElement: ) -> ColumnElement:
@@ -498,7 +597,7 @@ def get_books_by_location(
def _get_or_create_author(name: str) -> Author: def _get_or_create_author(name: str) -> Author:
"""Get existing author or create a new one.""" """Get existing author or create a new one."""
author = db.session.execute( author = db.session.execute(
db.select(Author).filter(Author.name == name) select(Author).filter(Author.name == name)
).scalar_one_or_none() ).scalar_one_or_none()
if author is None: if author is None:
@@ -512,7 +611,7 @@ def _get_or_create_author(name: str) -> Author:
def _get_or_create_genre(name: str) -> Genre: def _get_or_create_genre(name: str) -> Genre:
"""Get existing genre or create a new one.""" """Get existing genre or create a new one."""
genre = db.session.execute( genre = db.session.execute(
db.select(Genre).filter(Genre.name == name) select(Genre).filter(Genre.name == name)
).scalar_one_or_none() ).scalar_one_or_none()
if genre is None: if genre is None:
@@ -539,7 +638,7 @@ def start_reading(
# Check if already reading this book # Check if already reading this book
existing_reading = db.session.execute( existing_reading = db.session.execute(
db.select(Reading).filter( select(Reading).filter(
and_( and_(
Reading.book_id == book_id, Reading.book_id == book_id,
Reading.user_id == user_id, Reading.user_id == user_id,
@@ -572,8 +671,7 @@ def finish_reading(
) -> Reading: ) -> Reading:
"""Finish a reading session.""" """Finish a reading session."""
reading = db.session.execute( reading = db.session.execute(
db select(Reading)
.select(Reading)
.options(joinedload(Reading.book)) .options(joinedload(Reading.book))
.filter(Reading.id == reading_id) .filter(Reading.id == reading_id)
).scalar_one_or_none() ).scalar_one_or_none()
@@ -607,8 +705,7 @@ def drop_reading(
) -> Reading: ) -> Reading:
"""Mark a reading session as dropped.""" """Mark a reading session as dropped."""
reading = db.session.execute( reading = db.session.execute(
db select(Reading)
.select(Reading)
.options(joinedload(Reading.book)) .options(joinedload(Reading.book))
.filter(Reading.id == reading_id) .filter(Reading.id == reading_id)
).scalar_one_or_none() ).scalar_one_or_none()
@@ -635,8 +732,7 @@ def get_current_readings(user_id: int) -> Sequence[Reading]:
return ( return (
db.session db.session
.execute( .execute(
db select(Reading)
.select(Reading)
.options(joinedload(Reading.book).joinedload(Book.authors)) .options(joinedload(Reading.book).joinedload(Book.authors))
.filter( .filter(
and_( and_(
@@ -657,8 +753,7 @@ def get_reading_history(user_id: int, limit: int = 50) -> Sequence[Reading]:
return ( return (
db.session db.session
.execute( .execute(
db select(Reading)
.select(Reading)
.options(joinedload(Reading.book).joinedload(Book.authors)) .options(joinedload(Reading.book).joinedload(Book.authors))
.filter(Reading.user_id == user_id) .filter(Reading.user_id == user_id)
.order_by(Reading.start_date.desc()) .order_by(Reading.start_date.desc())
@@ -695,7 +790,7 @@ def add_to_wishlist(book_id: int, user_id: int) -> Wishlist:
# Check if already in wishlist # Check if already in wishlist
existing = db.session.execute( existing = db.session.execute(
db.select(Wishlist).filter( select(Wishlist).filter(
and_( and_(
Wishlist.book_id == book_id, Wishlist.book_id == book_id,
Wishlist.user_id == user_id, Wishlist.user_id == user_id,
@@ -719,7 +814,7 @@ def add_to_wishlist(book_id: int, user_id: int) -> Wishlist:
def remove_from_wishlist(book_id: int, user_id: int) -> bool: def remove_from_wishlist(book_id: int, user_id: int) -> bool:
"""Remove a book from user's wishlist.""" """Remove a book from user's wishlist."""
wishlist_item = db.session.execute( wishlist_item = db.session.execute(
db.select(Wishlist).filter( select(Wishlist).filter(
and_( and_(
Wishlist.book_id == book_id, Wishlist.book_id == book_id,
Wishlist.user_id == user_id, Wishlist.user_id == user_id,
@@ -740,8 +835,7 @@ def get_wishlist(user_id: int) -> Sequence[Wishlist]:
return ( return (
db.session db.session
.execute( .execute(
db select(Wishlist)
.select(Wishlist)
.options(joinedload(Wishlist.book).joinedload(Book.authors)) .options(joinedload(Wishlist.book).joinedload(Book.authors))
.filter(Wishlist.user_id == user_id) .filter(Wishlist.user_id == user_id)
.order_by(Wishlist.wishlisted_date.desc()) .order_by(Wishlist.wishlisted_date.desc())
@@ -756,7 +850,7 @@ def create_user(username: str) -> User:
"""Create a new user.""" """Create a new user."""
# Check if username already exists # Check if username already exists
existing = db.session.execute( existing = db.session.execute(
db.select(User).filter(User.username == username) select(User).filter(User.username == username)
).scalar_one_or_none() ).scalar_one_or_none()
if existing: if existing:
@@ -771,10 +865,10 @@ def create_user(username: str) -> User:
def get_user_by_username(username: str) -> User | None: def get_user_by_username(username: str) -> User | None:
"""Get a user by username.""" """Get a user by username."""
return db.session.execute( return db.session.execute(
db.select(User).filter(User.username == username) select(User).filter(User.username == username)
).scalar_one_or_none() ).scalar_one_or_none()
def list_users() -> Sequence[User]: def list_users() -> Sequence[User]:
"""List all users.""" """List all users."""
return db.session.execute(db.select(User).order_by(User.username)).scalars().all() return db.session.execute(select(User).order_by(User.username)).scalars().all()

View File

@@ -4,6 +4,7 @@ Main application routes for HXBooks frontend.
Provides clean URL structure and integrates with library.py business logic. Provides clean URL structure and integrates with library.py business logic.
""" """
import traceback
from datetime import date from datetime import date
from typing import Annotated, Any from typing import Annotated, Any
@@ -128,6 +129,10 @@ def index() -> ResponseReturnValue:
books = library.search_books_advanced(query, limit=100, username=viewing_user) books = library.search_books_advanced(query, limit=100, username=viewing_user)
except Exception as e: except Exception as e:
flash(f"Search error: {e}", "error") flash(f"Search error: {e}", "error")
# print traceback for debugging
traceback.print_exc()
books = [] books = []
return render_template("book/list.html.j2", books=books, query=query) return render_template("book/list.html.j2", books=books, query=query)

View File

@@ -42,6 +42,14 @@ class Field(StrEnum):
LOANED_DATE = "loaned" LOANED_DATE = "loaned"
OWNER = "owner" OWNER = "owner"
IS = "is" IS = "is"
SORT = "sort"
class SortDirection(StrEnum):
"""Supported sort directions for 'sort' field."""
ASC = "asc"
DESC = "desc"
class IsOperatorValue(StrEnum): class IsOperatorValue(StrEnum):
@@ -55,7 +63,7 @@ class IsOperatorValue(StrEnum):
UNKNOWN = "_unknown_" UNKNOWN = "_unknown_"
ValueT = str | int | float | date | IsOperatorValue ValueT = str | int | float | date | IsOperatorValue | tuple[Field, SortDirection]
@dataclass @dataclass
@@ -221,5 +229,18 @@ def _convert_value(field: Field, value_str: str) -> ValueT:
return IsOperatorValue(value_str) return IsOperatorValue(value_str)
else: else:
return IsOperatorValue.UNKNOWN return IsOperatorValue.UNKNOWN
case Field.SORT:
parts = value_str.split("-")
if (
len(parts) == 2
and parts[0] in Field
and parts[0] not in {Field.IS, Field.SORT}
and parts[1] in SortDirection
):
return (Field(parts[0]), SortDirection(parts[1]))
elif len(parts) == 1 and parts[0] in Field:
return (Field(parts[0]), SortDirection.ASC)
else:
return (Field.SORT, SortDirection.ASC) # Default sort if invalid
case _: case _:
assert_never(field) assert_never(field)

View File

@@ -329,33 +329,59 @@ class TestBookSearchCommand:
[ [
# String field filters # String field filters
("title:Hobbit", "", ["The Hobbit"]), ("title:Hobbit", "", ["The Hobbit"]),
("author:Tolkien", "", ["The Hobbit", "The Fellowship"]), ("author:Tolkien", "", ["The Fellowship", "The Hobbit"]),
("genre:Fantasy", "", ["The Hobbit", "The Fellowship"]), ("genre:Fantasy", "", ["The Fellowship", "The Hobbit"]),
("owner:alice", "", ["The Hobbit", "The Fellowship", "Dune"]), ("owner:alice", "", ["Dune", "The Fellowship", "The Hobbit"]),
("place:home", "", ["The Hobbit", "Programming Book"]), ("place:home", "", ["Programming Book", "The Hobbit"]),
("bookshelf:fantasy", "", ["The Hobbit", "The Fellowship"]), ("bookshelf:fantasy", "", ["The Fellowship", "The Hobbit"]),
# Numeric field filters # Numeric field filters
("rating>=4", "", ["The Hobbit", "Programming Book"]), ("rating>=4", "", ["Programming Book", "The Hobbit"]),
("rating=3", "", ["Dune"]), ("rating=3", "", ["Dune"]),
("shelf>1", "", ["The Fellowship", "Programming Book"]), ("shelf>1", "", ["Programming Book", "The Fellowship"]),
("year>=1954", "", ["The Fellowship", "Dune", "Programming Book"]), ("year>=1954", "", ["Programming Book", "Dune", "The Fellowship"]),
# Date field filters # Date field filters
( (
"added>=2026-03-15", "added>=2026-03-15",
"", "",
["The Hobbit", "The Fellowship", "Dune", "Programming Book"], ["Programming Book", "Dune", "The Fellowship", "The Hobbit"],
), ),
("bought<2026-01-01", "", ["Programming Book"]), ("bought<2026-01-01", "", ["Programming Book"]),
# Negation # Negation
("-genre:Fantasy", "", ["Dune", "Programming Book"]), ("-genre:Fantasy", "", ["Programming Book", "Dune"]),
("-owner:bob", "", ["The Hobbit", "The Fellowship", "Dune"]), ("-owner:bob", "", ["Dune", "The Fellowship", "The Hobbit"]),
# Complex query with multiple filters # Complex query with multiple filters
("-genre:Fantasy owner:alice", "", ["Dune"]), ("-genre:Fantasy owner:alice", "", ["Dune"]),
# User-specific queries # User-specific queries
("rating>=4", "alice", ["The Hobbit"]), ("rating>=4", "alice", ["The Hobbit"]),
("is:reading", "alice", ["The Fellowship"]), ("is:reading", "alice", ["The Fellowship"]),
("is:read", "alice", ["The Hobbit", "Dune"]), ("is:read", "alice", ["Dune", "The Hobbit"]),
("is:wished", "alice", ["Programming Book"]), ("is:wished", "alice", ["Programming Book"]),
# Sorting
(
"sort:added-desc",
"",
["Programming Book", "Dune", "The Fellowship", "The Hobbit"],
),
(
"sort:added-asc",
"",
["The Hobbit", "The Fellowship", "Dune", "Programming Book"],
),
(
"sort:read-desc",
"alice",
["Dune", "The Hobbit", "Programming Book", "The Fellowship"],
),
(
"sort:owner-asc",
"",
["Dune", "The Fellowship", "The Hobbit", "Programming Book"],
),
(
"sort:rating-desc",
"alice",
["The Hobbit", "Dune", "Programming Book", "The Fellowship"],
),
], ],
) )
def test_book_search_advanced_queries( def test_book_search_advanced_queries(
@@ -383,7 +409,7 @@ class TestBookSearchCommand:
actual_titles = [book["title"] for book in search_results] actual_titles = [book["title"] for book in search_results]
# Verify expected titles are present (order doesn't matter) # Verify expected titles are present (order doesn't matter)
assert set(expected_titles) == set(actual_titles), ( assert expected_titles == actual_titles, (
f"Query '{query}' expected {expected_titles}, got {actual_titles}" f"Query '{query}' expected {expected_titles}, got {actual_titles}"
) )

View File

@@ -15,6 +15,7 @@ from hxbooks.search import (
IsOperatorValue, IsOperatorValue,
QueryParser, QueryParser,
SearchQuery, SearchQuery,
SortDirection,
_convert_value, # noqa: PLC2701 _convert_value, # noqa: PLC2701
) )
@@ -267,6 +268,21 @@ class TestTypeConversion:
result = _convert_value(Field.IS, "invalid-status") result = _convert_value(Field.IS, "invalid-status")
assert result == IsOperatorValue.UNKNOWN assert result == IsOperatorValue.UNKNOWN
def test_convert_sort_field(self, parser: QueryParser) -> None:
"""Test converting values for 'sort' field."""
result = _convert_value(Field.SORT, "added")
assert result == (Field.ADDED_DATE, SortDirection.ASC)
result = _convert_value(Field.SORT, "added-desc")
assert result == (Field.ADDED_DATE, SortDirection.DESC)
# Invalid field or direction should fallback to a default value
result = _convert_value(Field.SORT, "added-invalid")
assert result == (Field.SORT, SortDirection.ASC)
result = _convert_value(Field.SORT, "invalid-asc")
assert result == (Field.SORT, SortDirection.ASC)
class TestParsingEdgeCases: class TestParsingEdgeCases:
"""Test edge cases and error handling in query parsing.""" """Test edge cases and error handling in query parsing."""