Compare commits
2 Commits
6b65d9cd15
...
95e434a750
| Author | SHA1 | Date | |
|---|---|---|---|
| 95e434a750 | |||
| 167b37f471 |
@@ -9,10 +9,10 @@ from collections.abc import Sequence
|
||||
from datetime import date, datetime
|
||||
from typing import assert_never
|
||||
|
||||
from sqlalchemy import ColumnElement, and_, or_
|
||||
from sqlalchemy import ColumnElement, Select, and_, func, or_, select
|
||||
from sqlalchemy.orm import InstrumentedAttribute, joinedload
|
||||
|
||||
from hxbooks.search import IsOperatorValue, QueryParser, ValueT
|
||||
from hxbooks.search import IsOperatorValue, QueryParser, SortDirection, ValueT
|
||||
|
||||
from .db import db
|
||||
from .gbooks import fetch_google_book_data
|
||||
@@ -78,8 +78,7 @@ def get_book(book_id: int) -> Book | None:
|
||||
return (
|
||||
db.session
|
||||
.execute(
|
||||
db
|
||||
.select(Book)
|
||||
select(Book)
|
||||
.options(
|
||||
joinedload(Book.authors),
|
||||
joinedload(Book.genres),
|
||||
@@ -216,7 +215,7 @@ def search_books(
|
||||
|
||||
For now implements basic filtering - advanced query parsing will be added later.
|
||||
"""
|
||||
query = db.select(Book).options(
|
||||
query = select(Book).options(
|
||||
joinedload(Book.authors), joinedload(Book.genres), joinedload(Book.owner)
|
||||
)
|
||||
|
||||
@@ -282,8 +281,12 @@ def search_books_advanced(
|
||||
"""Advanced search with field filters supporting comparison operators."""
|
||||
parsed_query = query_parser.parse(query_string)
|
||||
|
||||
query = db.select(Book).options(
|
||||
joinedload(Book.authors), joinedload(Book.genres), joinedload(Book.owner)
|
||||
query = (
|
||||
select(Book)
|
||||
.options(
|
||||
joinedload(Book.authors), joinedload(Book.genres), joinedload(Book.owner)
|
||||
)
|
||||
.outerjoin(User)
|
||||
)
|
||||
|
||||
conditions = []
|
||||
@@ -307,8 +310,13 @@ def search_books_advanced(
|
||||
conditions.append(or_(*text_conditions))
|
||||
|
||||
# Advanced field filters
|
||||
if parsed_query.field_filters:
|
||||
for field_filter in parsed_query.field_filters:
|
||||
sort_columns = []
|
||||
for field_filter in parsed_query.field_filters:
|
||||
if field_filter.field == Field.SORT:
|
||||
query, sort_column = _build_sort_column(query, field_filter.value, username)
|
||||
if sort_column is not None:
|
||||
sort_columns.append(sort_column)
|
||||
else:
|
||||
condition = _build_field_condition(field_filter, username)
|
||||
|
||||
if condition is not None:
|
||||
@@ -320,6 +328,10 @@ def search_books_advanced(
|
||||
if conditions:
|
||||
query = query.filter(and_(*conditions))
|
||||
|
||||
# Ensure deterministic order by adding added date as tiebreaker
|
||||
sort_columns.append(Book.added_date.desc())
|
||||
query = query.order_by(*sort_columns)
|
||||
|
||||
query = query.distinct().limit(limit)
|
||||
|
||||
result = db.session.execute(query)
|
||||
@@ -359,7 +371,7 @@ def _build_field_condition(
|
||||
case Field.LOANED_DATE:
|
||||
field_attr = Book.loaned_date
|
||||
case Field.OWNER:
|
||||
return Book.owner.has(_apply_operator(User.username, operator, value))
|
||||
field_attr = User.username
|
||||
case Field.YEAR:
|
||||
field_attr = Book.first_published
|
||||
case Field.RATING:
|
||||
@@ -406,6 +418,8 @@ def _build_field_condition(
|
||||
return None
|
||||
case _:
|
||||
assert_never(value)
|
||||
case Field.SORT:
|
||||
return None
|
||||
case _:
|
||||
assert_never(field)
|
||||
|
||||
@@ -413,6 +427,91 @@ def _build_field_condition(
|
||||
return condition
|
||||
|
||||
|
||||
def _build_sort_column(
|
||||
query: Select, value: ValueT, username: str | None = None
|
||||
) -> tuple[Select, ColumnElement | None]:
|
||||
"""Build a sort column for the 'sort' field."""
|
||||
assert isinstance(value, tuple) and len(value) == 2
|
||||
field, direction = value
|
||||
assert isinstance(field, Field) and isinstance(direction, SortDirection)
|
||||
|
||||
match field:
|
||||
case Field.TITLE:
|
||||
column = Book.title
|
||||
case Field.YEAR:
|
||||
column = Book.first_published
|
||||
case Field.ADDED_DATE:
|
||||
column = Book.added_date
|
||||
case Field.BOUGHT_DATE:
|
||||
column = Book.bought_date
|
||||
case Field.LOANED_DATE:
|
||||
column = Book.loaned_date
|
||||
case Field.ISBN:
|
||||
column = Book.isbn
|
||||
case Field.PLACE:
|
||||
column = Book.location_place
|
||||
case Field.BOOKSHELF:
|
||||
column = Book.location_bookshelf
|
||||
case Field.SHELF:
|
||||
column = Book.location_shelf
|
||||
case Field.OWNER:
|
||||
column = User.username
|
||||
case Field.READ_DATE:
|
||||
# Special handling for sorting by read date - sort by latest reading end
|
||||
# date
|
||||
subq = (
|
||||
select(
|
||||
Reading.book_id,
|
||||
func.max(Reading.end_date).label("latest_read_date"),
|
||||
)
|
||||
.where(
|
||||
Reading.user.has(User.username == username),
|
||||
Reading.end_date.isnot(None),
|
||||
)
|
||||
.group_by(Reading.book_id)
|
||||
.subquery("latest_readings")
|
||||
)
|
||||
query = query.outerjoin(subq, Book.id == subq.c.book_id)
|
||||
column = subq.c.latest_read_date
|
||||
case Field.RATING:
|
||||
# Special handling for sorting by rating - sort by latest reading rating
|
||||
subq = (
|
||||
select(
|
||||
Reading.book_id,
|
||||
Reading.rating.label("latest_rating"),
|
||||
func
|
||||
.row_number()
|
||||
.over(
|
||||
partition_by=Reading.book_id,
|
||||
order_by=Reading.end_date.desc(),
|
||||
)
|
||||
.label("rn"),
|
||||
)
|
||||
.where(
|
||||
Reading.user.has(User.username == username),
|
||||
Reading.rating.isnot(None),
|
||||
Reading.end_date.isnot(None),
|
||||
)
|
||||
.subquery("latest_ratings")
|
||||
)
|
||||
query = query.outerjoin(
|
||||
subq, (Book.id == subq.c.book_id) & (subq.c.rn == 1)
|
||||
)
|
||||
column = subq.c.latest_rating
|
||||
# These fields don't make sense to sort by
|
||||
case Field.AUTHOR | Field.GENRE | Field.IS | Field.SORT:
|
||||
return query, None
|
||||
case _:
|
||||
assert_never(field)
|
||||
|
||||
if direction == SortDirection.ASC:
|
||||
return query, column.asc().nullslast()
|
||||
elif direction == SortDirection.DESC:
|
||||
return query, column.desc().nullslast()
|
||||
else:
|
||||
return query, None
|
||||
|
||||
|
||||
def _apply_operator(
|
||||
field_attr: InstrumentedAttribute, operator: ComparisonOperator, value: ValueT
|
||||
) -> ColumnElement:
|
||||
@@ -498,7 +597,7 @@ def get_books_by_location(
|
||||
def _get_or_create_author(name: str) -> Author:
|
||||
"""Get existing author or create a new one."""
|
||||
author = db.session.execute(
|
||||
db.select(Author).filter(Author.name == name)
|
||||
select(Author).filter(Author.name == name)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if author is None:
|
||||
@@ -512,7 +611,7 @@ def _get_or_create_author(name: str) -> Author:
|
||||
def _get_or_create_genre(name: str) -> Genre:
|
||||
"""Get existing genre or create a new one."""
|
||||
genre = db.session.execute(
|
||||
db.select(Genre).filter(Genre.name == name)
|
||||
select(Genre).filter(Genre.name == name)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if genre is None:
|
||||
@@ -539,7 +638,7 @@ def start_reading(
|
||||
|
||||
# Check if already reading this book
|
||||
existing_reading = db.session.execute(
|
||||
db.select(Reading).filter(
|
||||
select(Reading).filter(
|
||||
and_(
|
||||
Reading.book_id == book_id,
|
||||
Reading.user_id == user_id,
|
||||
@@ -572,8 +671,7 @@ def finish_reading(
|
||||
) -> Reading:
|
||||
"""Finish a reading session."""
|
||||
reading = db.session.execute(
|
||||
db
|
||||
.select(Reading)
|
||||
select(Reading)
|
||||
.options(joinedload(Reading.book))
|
||||
.filter(Reading.id == reading_id)
|
||||
).scalar_one_or_none()
|
||||
@@ -607,8 +705,7 @@ def drop_reading(
|
||||
) -> Reading:
|
||||
"""Mark a reading session as dropped."""
|
||||
reading = db.session.execute(
|
||||
db
|
||||
.select(Reading)
|
||||
select(Reading)
|
||||
.options(joinedload(Reading.book))
|
||||
.filter(Reading.id == reading_id)
|
||||
).scalar_one_or_none()
|
||||
@@ -635,8 +732,7 @@ def get_current_readings(user_id: int) -> Sequence[Reading]:
|
||||
return (
|
||||
db.session
|
||||
.execute(
|
||||
db
|
||||
.select(Reading)
|
||||
select(Reading)
|
||||
.options(joinedload(Reading.book).joinedload(Book.authors))
|
||||
.filter(
|
||||
and_(
|
||||
@@ -657,8 +753,7 @@ def get_reading_history(user_id: int, limit: int = 50) -> Sequence[Reading]:
|
||||
return (
|
||||
db.session
|
||||
.execute(
|
||||
db
|
||||
.select(Reading)
|
||||
select(Reading)
|
||||
.options(joinedload(Reading.book).joinedload(Book.authors))
|
||||
.filter(Reading.user_id == user_id)
|
||||
.order_by(Reading.start_date.desc())
|
||||
@@ -695,7 +790,7 @@ def add_to_wishlist(book_id: int, user_id: int) -> Wishlist:
|
||||
|
||||
# Check if already in wishlist
|
||||
existing = db.session.execute(
|
||||
db.select(Wishlist).filter(
|
||||
select(Wishlist).filter(
|
||||
and_(
|
||||
Wishlist.book_id == book_id,
|
||||
Wishlist.user_id == user_id,
|
||||
@@ -719,7 +814,7 @@ def add_to_wishlist(book_id: int, user_id: int) -> Wishlist:
|
||||
def remove_from_wishlist(book_id: int, user_id: int) -> bool:
|
||||
"""Remove a book from user's wishlist."""
|
||||
wishlist_item = db.session.execute(
|
||||
db.select(Wishlist).filter(
|
||||
select(Wishlist).filter(
|
||||
and_(
|
||||
Wishlist.book_id == book_id,
|
||||
Wishlist.user_id == user_id,
|
||||
@@ -740,8 +835,7 @@ def get_wishlist(user_id: int) -> Sequence[Wishlist]:
|
||||
return (
|
||||
db.session
|
||||
.execute(
|
||||
db
|
||||
.select(Wishlist)
|
||||
select(Wishlist)
|
||||
.options(joinedload(Wishlist.book).joinedload(Book.authors))
|
||||
.filter(Wishlist.user_id == user_id)
|
||||
.order_by(Wishlist.wishlisted_date.desc())
|
||||
@@ -756,7 +850,7 @@ def create_user(username: str) -> User:
|
||||
"""Create a new user."""
|
||||
# Check if username already exists
|
||||
existing = db.session.execute(
|
||||
db.select(User).filter(User.username == username)
|
||||
select(User).filter(User.username == username)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
@@ -771,10 +865,10 @@ def create_user(username: str) -> User:
|
||||
def get_user_by_username(username: str) -> User | None:
|
||||
"""Get a user by username."""
|
||||
return db.session.execute(
|
||||
db.select(User).filter(User.username == username)
|
||||
select(User).filter(User.username == username)
|
||||
).scalar_one_or_none()
|
||||
|
||||
|
||||
def list_users() -> Sequence[User]:
|
||||
"""List all users."""
|
||||
return db.session.execute(db.select(User).order_by(User.username)).scalars().all()
|
||||
return db.session.execute(select(User).order_by(User.username)).scalars().all()
|
||||
|
||||
@@ -4,6 +4,7 @@ Main application routes for HXBooks frontend.
|
||||
Provides clean URL structure and integrates with library.py business logic.
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from datetime import date
|
||||
from typing import Annotated, Any
|
||||
|
||||
@@ -128,6 +129,10 @@ def index() -> ResponseReturnValue:
|
||||
books = library.search_books_advanced(query, limit=100, username=viewing_user)
|
||||
except Exception as e:
|
||||
flash(f"Search error: {e}", "error")
|
||||
# print traceback for debugging
|
||||
|
||||
traceback.print_exc()
|
||||
|
||||
books = []
|
||||
|
||||
return render_template("book/list.html.j2", books=books, query=query)
|
||||
|
||||
@@ -42,6 +42,14 @@ class Field(StrEnum):
|
||||
LOANED_DATE = "loaned"
|
||||
OWNER = "owner"
|
||||
IS = "is"
|
||||
SORT = "sort"
|
||||
|
||||
|
||||
class SortDirection(StrEnum):
|
||||
"""Supported sort directions for 'sort' field."""
|
||||
|
||||
ASC = "asc"
|
||||
DESC = "desc"
|
||||
|
||||
|
||||
class IsOperatorValue(StrEnum):
|
||||
@@ -55,7 +63,7 @@ class IsOperatorValue(StrEnum):
|
||||
UNKNOWN = "_unknown_"
|
||||
|
||||
|
||||
ValueT = str | int | float | date | IsOperatorValue
|
||||
ValueT = str | int | float | date | IsOperatorValue | tuple[Field, SortDirection]
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -221,5 +229,18 @@ def _convert_value(field: Field, value_str: str) -> ValueT:
|
||||
return IsOperatorValue(value_str)
|
||||
else:
|
||||
return IsOperatorValue.UNKNOWN
|
||||
case Field.SORT:
|
||||
parts = value_str.split("-")
|
||||
if (
|
||||
len(parts) == 2
|
||||
and parts[0] in Field
|
||||
and parts[0] not in {Field.IS, Field.SORT}
|
||||
and parts[1] in SortDirection
|
||||
):
|
||||
return (Field(parts[0]), SortDirection(parts[1]))
|
||||
elif len(parts) == 1 and parts[0] in Field:
|
||||
return (Field(parts[0]), SortDirection.ASC)
|
||||
else:
|
||||
return (Field.SORT, SortDirection.ASC) # Default sort if invalid
|
||||
case _:
|
||||
assert_never(field)
|
||||
|
||||
@@ -329,33 +329,59 @@ class TestBookSearchCommand:
|
||||
[
|
||||
# String field filters
|
||||
("title:Hobbit", "", ["The Hobbit"]),
|
||||
("author:Tolkien", "", ["The Hobbit", "The Fellowship"]),
|
||||
("genre:Fantasy", "", ["The Hobbit", "The Fellowship"]),
|
||||
("owner:alice", "", ["The Hobbit", "The Fellowship", "Dune"]),
|
||||
("place:home", "", ["The Hobbit", "Programming Book"]),
|
||||
("bookshelf:fantasy", "", ["The Hobbit", "The Fellowship"]),
|
||||
("author:Tolkien", "", ["The Fellowship", "The Hobbit"]),
|
||||
("genre:Fantasy", "", ["The Fellowship", "The Hobbit"]),
|
||||
("owner:alice", "", ["Dune", "The Fellowship", "The Hobbit"]),
|
||||
("place:home", "", ["Programming Book", "The Hobbit"]),
|
||||
("bookshelf:fantasy", "", ["The Fellowship", "The Hobbit"]),
|
||||
# Numeric field filters
|
||||
("rating>=4", "", ["The Hobbit", "Programming Book"]),
|
||||
("rating>=4", "", ["Programming Book", "The Hobbit"]),
|
||||
("rating=3", "", ["Dune"]),
|
||||
("shelf>1", "", ["The Fellowship", "Programming Book"]),
|
||||
("year>=1954", "", ["The Fellowship", "Dune", "Programming Book"]),
|
||||
("shelf>1", "", ["Programming Book", "The Fellowship"]),
|
||||
("year>=1954", "", ["Programming Book", "Dune", "The Fellowship"]),
|
||||
# Date field filters
|
||||
(
|
||||
"added>=2026-03-15",
|
||||
"",
|
||||
["The Hobbit", "The Fellowship", "Dune", "Programming Book"],
|
||||
["Programming Book", "Dune", "The Fellowship", "The Hobbit"],
|
||||
),
|
||||
("bought<2026-01-01", "", ["Programming Book"]),
|
||||
# Negation
|
||||
("-genre:Fantasy", "", ["Dune", "Programming Book"]),
|
||||
("-owner:bob", "", ["The Hobbit", "The Fellowship", "Dune"]),
|
||||
("-genre:Fantasy", "", ["Programming Book", "Dune"]),
|
||||
("-owner:bob", "", ["Dune", "The Fellowship", "The Hobbit"]),
|
||||
# Complex query with multiple filters
|
||||
("-genre:Fantasy owner:alice", "", ["Dune"]),
|
||||
# User-specific queries
|
||||
("rating>=4", "alice", ["The Hobbit"]),
|
||||
("is:reading", "alice", ["The Fellowship"]),
|
||||
("is:read", "alice", ["The Hobbit", "Dune"]),
|
||||
("is:read", "alice", ["Dune", "The Hobbit"]),
|
||||
("is:wished", "alice", ["Programming Book"]),
|
||||
# Sorting
|
||||
(
|
||||
"sort:added-desc",
|
||||
"",
|
||||
["Programming Book", "Dune", "The Fellowship", "The Hobbit"],
|
||||
),
|
||||
(
|
||||
"sort:added-asc",
|
||||
"",
|
||||
["The Hobbit", "The Fellowship", "Dune", "Programming Book"],
|
||||
),
|
||||
(
|
||||
"sort:read-desc",
|
||||
"alice",
|
||||
["Dune", "The Hobbit", "Programming Book", "The Fellowship"],
|
||||
),
|
||||
(
|
||||
"sort:owner-asc",
|
||||
"",
|
||||
["Dune", "The Fellowship", "The Hobbit", "Programming Book"],
|
||||
),
|
||||
(
|
||||
"sort:rating-desc",
|
||||
"alice",
|
||||
["The Hobbit", "Dune", "Programming Book", "The Fellowship"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_book_search_advanced_queries(
|
||||
@@ -383,7 +409,7 @@ class TestBookSearchCommand:
|
||||
actual_titles = [book["title"] for book in search_results]
|
||||
|
||||
# Verify expected titles are present (order doesn't matter)
|
||||
assert set(expected_titles) == set(actual_titles), (
|
||||
assert expected_titles == actual_titles, (
|
||||
f"Query '{query}' expected {expected_titles}, got {actual_titles}"
|
||||
)
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from hxbooks.search import (
|
||||
IsOperatorValue,
|
||||
QueryParser,
|
||||
SearchQuery,
|
||||
SortDirection,
|
||||
_convert_value, # noqa: PLC2701
|
||||
)
|
||||
|
||||
@@ -267,6 +268,21 @@ class TestTypeConversion:
|
||||
result = _convert_value(Field.IS, "invalid-status")
|
||||
assert result == IsOperatorValue.UNKNOWN
|
||||
|
||||
def test_convert_sort_field(self, parser: QueryParser) -> None:
|
||||
"""Test converting values for 'sort' field."""
|
||||
result = _convert_value(Field.SORT, "added")
|
||||
assert result == (Field.ADDED_DATE, SortDirection.ASC)
|
||||
|
||||
result = _convert_value(Field.SORT, "added-desc")
|
||||
assert result == (Field.ADDED_DATE, SortDirection.DESC)
|
||||
|
||||
# Invalid field or direction should fallback to a default value
|
||||
result = _convert_value(Field.SORT, "added-invalid")
|
||||
assert result == (Field.SORT, SortDirection.ASC)
|
||||
|
||||
result = _convert_value(Field.SORT, "invalid-asc")
|
||||
assert result == (Field.SORT, SortDirection.ASC)
|
||||
|
||||
|
||||
class TestParsingEdgeCases:
|
||||
"""Test edge cases and error handling in query parsing."""
|
||||
|
||||
Reference in New Issue
Block a user