Extended search functionality

This commit is contained in:
2026-03-16 19:18:11 +01:00
parent d427cec8d5
commit 0083e3d896
5 changed files with 232 additions and 135 deletions

View File

@@ -73,7 +73,7 @@ def db_group() -> None:
# Book commands
@book.command("add")
@click.argument("title")
@click.option("--owner", required=True, help="Username of book owner")
@click.option("--owner", help="Username of book owner")
@click.option("--authors", help="Comma-separated list of authors")
@click.option("--genres", help="Comma-separated list of genres")
@click.option("--isbn", help="ISBN number")
@@ -100,7 +100,10 @@ def add_book(
) -> None:
"""Add a new book to the library."""
app = get_app()
user_id = ensure_user_exists(app, owner)
if owner:
user_id = ensure_user_exists(app, owner)
else:
user_id = None
with app.app_context():
try:
@@ -196,7 +199,9 @@ def list_books(
@book.command("search")
@click.argument("query")
@click.option("--owner", help="Filter by owner username")
@click.option(
"--username", help="Username to apply user-specific filters (e.g., for ratings)"
)
@click.option(
"--format",
"output_format",
@@ -207,7 +212,7 @@ def list_books(
@click.option("--limit", type=int, default=20, help="Maximum number of results")
def search_books(
query: str,
owner: str | None = None,
username: str | None = None,
output_format: str = "table",
limit: int = 20,
) -> None:
@@ -216,26 +221,51 @@ def search_books(
with app.app_context():
try:
results = library.search_books_advanced(query_string=query, limit=limit)
books = library.search_books_advanced(
query_string=query, limit=limit, username=username
)
if output_format == "json":
results = []
for book in books:
results.append({
"id": book.id,
"title": book.title,
"authors": [author.name for author in book.authors],
"genres": [genre.name for genre in book.genres],
"owner": book.owner.username if book.owner else None,
"isbn": book.isbn,
"publisher": book.publisher,
"description": book.description,
"location": {
"place": book.location_place,
"bookshelf": book.location_bookshelf,
"shelf": book.location_shelf,
},
"loaned_to": book.loaned_to,
"loaned_date": book.loaned_date.isoformat()
if book.loaned_date
else None,
"added_date": book.added_date.isoformat(),
"bought_date": book.bought_date.isoformat()
if book.bought_date
else None,
})
click.echo(json.dumps(results, indent=2))
else:
# Table format
if not results:
if not books:
click.echo("No books found.")
return
click.echo(f"{'ID':<4} {'Title':<35} {'Authors':<30}")
click.echo("-" * 72)
for book in results:
authors_str = ", ".join(book["authors"])[:27]
for book in books:
authors_str = ", ".join(a.name for a in book.authors)[:27]
if len(authors_str) == 27:
authors_str += "..."
click.echo(
f"{book['id']:<4} {book['title'][:32]:<35} {authors_str:<30}"
)
click.echo(f"{book.id:<4} {book.title[:32]:<35} {authors_str:<30}")
except Exception as e:
click.echo(f"Error searching books: {e}", err=True)

View File

@@ -7,11 +7,12 @@ Separated from web interface concerns to enable both CLI and web access.
from collections.abc import Sequence
from datetime import date, datetime
from typing import assert_never
from sqlalchemy import ColumnElement, and_, or_
from sqlalchemy.orm import InstrumentedAttribute, joinedload
from hxbooks.search import QueryParser, ValueT
from hxbooks.search import IsOperatorValue, QueryParser, ValueT
from .db import db
from .gbooks import fetch_google_book_data
@@ -232,7 +233,9 @@ def search_books(
query_parser = QueryParser()
def search_books_advanced(query_string: str, limit: int = 50) -> Sequence[Book]:
def search_books_advanced(
query_string: str, limit: int = 50, username: str | None = None
) -> Sequence[Book]:
"""Advanced search with field filters supporting comparison operators."""
parsed_query = query_parser.parse(query_string)
@@ -263,7 +266,7 @@ def search_books_advanced(query_string: str, limit: int = 50) -> Sequence[Book]:
# Advanced field filters
if parsed_query.field_filters:
for field_filter in parsed_query.field_filters:
condition = _build_field_condition(field_filter)
condition = _build_field_condition(field_filter, username)
if condition is not None:
if field_filter.negated:
@@ -277,33 +280,12 @@ def search_books_advanced(query_string: str, limit: int = 50) -> Sequence[Book]:
query = query.distinct().limit(limit)
result = db.session.execute(query)
# return result.scalars().unique().all()
results = []
for book in result.scalars().unique().all():
results.append({
"id": book.id,
"title": book.title,
"authors": [author.name for author in book.authors],
"genres": [genre.name for genre in book.genres],
"owner": book.owner.username if book.owner else None,
"isbn": book.isbn,
"publisher": book.publisher,
"description": book.description,
"location": {
"place": book.location_place,
"bookshelf": book.location_bookshelf,
"shelf": book.location_shelf,
},
"loaned_to": book.loaned_to,
"loaned_date": book.loaned_date.isoformat() if book.loaned_date else None,
"added_date": book.added_date.isoformat(),
"bought_date": book.bought_date.isoformat() if book.bought_date else None,
})
return results
return result.scalars().unique().all()
def _build_field_condition(field_filter: FieldFilter) -> ColumnElement | None:
def _build_field_condition(
field_filter: FieldFilter, username: str | None = None
) -> ColumnElement | None:
"""
Build a SQLAlchemy condition for a field filter.
"""
@@ -312,31 +294,77 @@ def _build_field_condition(field_filter: FieldFilter) -> ColumnElement | None:
value = field_filter.value
# Map field names to Book attributes or special handling
if field == Field.TITLE:
field_attr = Book.title
elif field == Field.AUTHOR:
return Book.authors.any(_apply_operator(Author.name, operator, value))
elif field == Field.GENRE:
return Book.genres.any(_apply_operator(Genre.name, operator, value))
elif field == Field.ISBN:
field_attr = Book.isbn
elif field == Field.PLACE:
field_attr = Book.location_place
elif field == Field.BOOKSHELF:
field_attr = Book.location_bookshelf
elif field == Field.SHELF:
field_attr = Book.location_shelf
elif field == Field.ADDED_DATE:
field_attr = Book.added_date
elif field == Field.BOUGHT_DATE:
field_attr = Book.bought_date
elif field == Field.LOANED_DATE:
field_attr = Book.loaned_date
elif field == Field.OWNER:
return Book.owner.has(_apply_operator(User.username, operator, value))
else:
# Unknown field, skip
return None
match field:
case Field.TITLE:
field_attr = Book.title
case Field.AUTHOR:
return Book.authors.any(_apply_operator(Author.name, operator, value))
case Field.GENRE:
return Book.genres.any(_apply_operator(Genre.name, operator, value))
case Field.ISBN:
field_attr = Book.isbn
case Field.PLACE:
field_attr = Book.location_place
case Field.BOOKSHELF:
field_attr = Book.location_bookshelf
case Field.SHELF:
field_attr = Book.location_shelf
case Field.ADDED_DATE:
field_attr = Book.added_date
case Field.BOUGHT_DATE:
field_attr = Book.bought_date
case Field.LOANED_DATE:
field_attr = Book.loaned_date
case Field.OWNER:
return Book.owner.has(_apply_operator(User.username, operator, value))
case Field.YEAR:
field_attr = Book.first_published
case Field.RATING:
any_condition = _apply_operator(Reading.rating, operator, value)
if username:
any_condition &= Reading.user.has(User.username == username)
return Book.readings.any(any_condition)
case Field.READ_DATE:
any_condition = _apply_operator(Reading.end_date, operator, value)
if username:
any_condition &= Reading.user.has(User.username == username)
return Book.readings.any(any_condition)
case Field.IS:
assert isinstance(value, IsOperatorValue)
match value:
case IsOperatorValue.LOANED:
return Book.loaned_to != ""
case IsOperatorValue.READING:
any_condition = Reading.end_date.is_(None)
if username:
any_condition &= Reading.user.has(User.username == username)
return Book.readings.any(any_condition)
case IsOperatorValue.READ:
any_condition = (~Reading.end_date.is_(None)) & Reading.dropped.is_(
False
)
if username:
any_condition &= Reading.user.has(User.username == username)
return Book.readings.any(any_condition)
case IsOperatorValue.DROPPED:
any_condition = (~Reading.end_date.is_(None)) & Reading.dropped.is_(
True
)
if username:
any_condition &= Reading.user.has(User.username == username)
return Book.readings.any(any_condition)
case IsOperatorValue.WISHED:
return Book.wished_by.any(
Wishlist.user.has(User.username == username)
if username
else None
)
case IsOperatorValue.UNKNOWN:
return None
case _:
assert_never(value)
case _:
assert_never(field)
condition = _apply_operator(field_attr, operator, value)
return condition

View File

@@ -8,6 +8,7 @@ Currently implements basic search - will be enhanced with pyparsing for advanced
from dataclasses import dataclass, field
from datetime import date, datetime
from enum import StrEnum
from typing import assert_never
import pyparsing as pp
@@ -40,9 +41,21 @@ class Field(StrEnum):
ADDED_DATE = "added"
LOANED_DATE = "loaned"
OWNER = "owner"
IS = "is"
ValueT = str | int | float | date
class IsOperatorValue(StrEnum):
"""Supported values for 'is' operator."""
LOANED = "loaned"
READ = "read"
READING = "reading"
DROPPED = "dropped"
WISHED = "wished"
UNKNOWN = "_unknown_"
ValueT = str | int | float | date | IsOperatorValue
@dataclass
@@ -173,30 +186,40 @@ class QueryParser:
return SearchQuery(text_terms=text_terms, field_filters=field_filters)
def _convert_value(field: Field, value_str: str) -> str | int | float | date:
def _convert_value(field: Field, value_str: str) -> ValueT:
"""Convert string value to appropriate type based on field."""
# Date fields
if field in {
Field.READ_DATE,
Field.BOUGHT_DATE,
Field.ADDED_DATE,
Field.LOANED_DATE,
}:
try:
return datetime.strptime(value_str, "%Y-%m-%d").date()
except ValueError:
match field:
# Date fields
case Field.READ_DATE | Field.BOUGHT_DATE | Field.ADDED_DATE | Field.LOANED_DATE:
try:
return datetime.strptime(value_str, "%Y-%m-%d").date()
except ValueError:
return value_str
# Numeric fields
case Field.RATING | Field.SHELF | Field.YEAR:
try:
if "." in value_str:
return float(value_str)
else:
return int(value_str)
except ValueError:
return value_str
# String fields
case (
Field.OWNER
| Field.TITLE
| Field.AUTHOR
| Field.ISBN
| Field.GENRE
| Field.PLACE
| Field.BOOKSHELF
):
return value_str
# Numeric fields
if field in {Field.RATING, Field.SHELF, Field.YEAR}:
try:
if "." in value_str:
return float(value_str)
case Field.IS:
if value_str in IsOperatorValue:
return IsOperatorValue(value_str)
else:
return int(value_str)
except ValueError:
return value_str
# String fields (default)
return value_str
return IsOperatorValue.UNKNOWN
case _:
assert_never(field)

View File

@@ -124,24 +124,6 @@ class TestBookAddCommand:
assert len(book.authors) == 0 # No authors provided
assert len(book.genres) == 0 # No genres provided
def test_book_add_missing_owner_fails(
self, app: Flask, cli_runner: CliRunner
) -> None:
"""Test that book addition fails when owner is not provided."""
result = cli_runner.invoke(
cli,
[
"book",
"add",
"Test Book",
# Missing --owner parameter
],
)
# Should fail with exit code 2 (Click validation error)
assert result.exit_code == 2
assert "Missing option '--owner'" in result.output
class TestBookListCommand:
"""Test the 'hxbooks book list' command."""
@@ -343,46 +325,46 @@ class TestBookSearchCommand:
# Results format depends on BookService.search_books_advanced implementation
@pytest.mark.parametrize(
"query,expected_titles",
"query,username,expected_titles",
[
# String field filters
("title:Hobbit", ["The Hobbit"]),
("author:Tolkien", ["The Hobbit", "The Fellowship"]),
("genre:Fantasy", ["The Hobbit", "The Fellowship"]),
("owner:alice", ["The Hobbit", "The Fellowship", "Dune"]),
("place:home", ["The Hobbit", "Programming Book"]),
("bookshelf:fantasy", ["The Hobbit", "The Fellowship"]),
("title:Hobbit", "", ["The Hobbit"]),
("author:Tolkien", "", ["The Hobbit", "The Fellowship"]),
("genre:Fantasy", "", ["The Hobbit", "The Fellowship"]),
("owner:alice", "", ["The Hobbit", "The Fellowship", "Dune"]),
("place:home", "", ["The Hobbit", "Programming Book"]),
("bookshelf:fantasy", "", ["The Hobbit", "The Fellowship"]),
# Numeric field filters
pytest.param(
"rating>=4",
["The Hobbit", "Programming Book"],
marks=pytest.mark.xfail(reason="Rating filter not implemented yet"),
),
pytest.param(
"rating=3",
["Dune"],
marks=pytest.mark.xfail(reason="Rating filter not implemented yet"),
),
("shelf>1", ["The Fellowship", "Programming Book"]),
(
"year>=1954",
["The Hobbit", "The Fellowship", "Dune", "Programming Book"],
),
("rating>=4", "", ["The Hobbit", "Programming Book"]),
("rating=3", "", ["Dune"]),
("shelf>1", "", ["The Fellowship", "Programming Book"]),
("year>=1954", "", ["The Fellowship", "Dune", "Programming Book"]),
# Date field filters
(
"added>=2026-03-15",
"",
["The Hobbit", "The Fellowship", "Dune", "Programming Book"],
),
("bought<2026-01-01", ["Programming Book"]),
("bought<2026-01-01", "", ["Programming Book"]),
# Negation
("-genre:Fantasy", ["Dune", "Programming Book"]),
("-owner:bob", ["The Hobbit", "The Fellowship", "Dune"]),
("-genre:Fantasy", "", ["Dune", "Programming Book"]),
("-owner:bob", "", ["The Hobbit", "The Fellowship", "Dune"]),
# Complex query with multiple filters
("-genre:Fantasy owner:alice", ["Dune"]),
("-genre:Fantasy owner:alice", "", ["Dune"]),
# User-specific queries
("rating>=4", "alice", ["The Hobbit"]),
("is:reading", "alice", ["The Fellowship"]),
("is:read", "alice", ["The Hobbit", "Dune"]),
("is:wished", "alice", ["Programming Book"]),
],
)
def test_book_search_advanced_queries(
self, app: Flask, cli_runner: CliRunner, query: str, expected_titles: list[str]
self,
app: Flask,
cli_runner: CliRunner,
query: str,
username: str,
expected_titles: list[str],
) -> None:
"""Test advanced search queries with various field filters."""
# Set up comprehensive test data
@@ -390,7 +372,8 @@ class TestBookSearchCommand:
# Execute the search query
result = cli_runner.invoke(
cli, ["book", "search", "--format", "json", "--", query]
cli,
["book", "search", "--format", "json", "--username", username, "--", query],
)
assert result.exit_code == 0, f"Search query '{query}' failed: {result.output}"
@@ -517,6 +500,9 @@ class TestBookSearchCommand:
cli_runner.invoke(cli, ["reading", "start", str(hobbit_id), "--owner", "alice"])
cli_runner.invoke(cli, ["reading", "start", str(dune_id), "--owner", "alice"])
cli_runner.invoke(cli, ["reading", "start", str(prog_id), "--owner", "bob"])
cli_runner.invoke(
cli, ["reading", "start", str(fellowship_id), "--owner", "alice"]
)
with app.app_context():
# Get reading session IDs
@@ -562,6 +548,12 @@ class TestBookSearchCommand:
db.session.commit()
# Add a book to wishlist
cli_runner.invoke(
cli,
["wishlist", "add", str(prog_id), "--owner", "alice"],
)
class TestReadingCommands:
"""Test reading-related CLI commands."""

View File

@@ -12,6 +12,7 @@ import pytest
from hxbooks.search import (
ComparisonOperator,
Field,
IsOperatorValue,
QueryParser,
SearchQuery,
_convert_value, # noqa: PLC2701
@@ -87,6 +88,14 @@ class TestFieldFilters:
assert filter.field == Field.AUTHOR
assert filter.value == "tolkien"
def test_parse_is_filter(self, parser: QueryParser) -> None:
"""Test parsing 'is' operator field filter."""
result = parser.parse("is:reading")
assert len(result.field_filters) == 1
filter = result.field_filters[0]
assert filter.field == Field.IS
assert filter.value == IsOperatorValue.READING
def test_parse_negated_filter(self, parser: QueryParser) -> None:
"""Test parsing negated field filter."""
result = parser.parse("-genre:romance")
@@ -243,6 +252,21 @@ class TestTypeConversion:
assert result == "123"
assert isinstance(result, str)
def test_convert_is_operator(self, parser: QueryParser) -> None:
"""Test converting values for 'is' operator fields."""
result = _convert_value(Field.IS, "reading")
assert result == IsOperatorValue.READING
result = _convert_value(Field.IS, "dropped")
assert result == IsOperatorValue.DROPPED
result = _convert_value(Field.IS, "wished")
assert result == IsOperatorValue.WISHED
# Invalid value should return UNKNOWN
result = _convert_value(Field.IS, "invalid-status")
assert result == IsOperatorValue.UNKNOWN
class TestParsingEdgeCases:
"""Test edge cases and error handling in query parsing."""