Linted and formatted everything new

This commit is contained in:
2026-03-16 02:42:23 +01:00
parent 40ca08359f
commit d427cec8d5
18 changed files with 1410 additions and 1209 deletions

15
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,15 @@
repos:
- repo: https://github.com/astral-sh/uv-pre-commit
# uv version.
rev: 0.10.10
hooks:
- id: uv-lock
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.15.6
hooks:
# Run the linter.
- id: ruff-check
# Run the formatter.
- id: ruff-format

View File

@@ -12,32 +12,31 @@ config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
logger = logging.getLogger('alembic.env')
logger = logging.getLogger("alembic.env")
def get_engine():
try:
# this works with Flask-SQLAlchemy<3 and Alchemical
return current_app.extensions['migrate'].db.get_engine()
except (TypeError, AttributeError):
return current_app.extensions["migrate"].db.get_engine()
except TypeError, AttributeError:
# this works with Flask-SQLAlchemy>=3
return current_app.extensions['migrate'].db.engine
return current_app.extensions["migrate"].db.engine
def get_engine_url():
try:
return get_engine().url.render_as_string(hide_password=False).replace(
'%', '%%')
return get_engine().url.render_as_string(hide_password=False).replace("%", "%%")
except AttributeError:
return str(get_engine().url).replace('%', '%%')
return str(get_engine().url).replace("%", "%%")
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option('sqlalchemy.url', get_engine_url())
target_db = current_app.extensions['migrate'].db
config.set_main_option("sqlalchemy.url", get_engine_url())
target_db = current_app.extensions["migrate"].db
# other values from the config, defined by the needs of env.py,
# can be acquired:
@@ -46,7 +45,7 @@ target_db = current_app.extensions['migrate'].db
def get_metadata():
if hasattr(target_db, 'metadatas'):
if hasattr(target_db, "metadatas"):
return target_db.metadatas[None]
return target_db.metadata
@@ -64,9 +63,7 @@ def run_migrations_offline():
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=get_metadata(), literal_binds=True
)
context.configure(url=url, target_metadata=get_metadata(), literal_binds=True)
with context.begin_transaction():
context.run_migrations()
@@ -84,13 +81,13 @@ def run_migrations_online():
# when there are no changes to the schema
# reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
def process_revision_directives(context, revision, directives):
if getattr(config.cmd_opts, 'autogenerate', False):
if getattr(config.cmd_opts, "autogenerate", False):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
logger.info('No changes in schema detected.')
logger.info("No changes in schema detected.")
conf_args = current_app.extensions['migrate'].configure_args
conf_args = current_app.extensions["migrate"].configure_args
if conf_args.get("process_revision_directives") is None:
conf_args["process_revision_directives"] = process_revision_directives
@@ -98,9 +95,7 @@ def run_migrations_online():
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=get_metadata(),
**conf_args
connection=connection, target_metadata=get_metadata(), **conf_args
)
with context.begin_transaction():

View File

@@ -5,12 +5,13 @@ Revises:
Create Date: 2026-03-14 22:51:20.059755
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '75e81e4ab7b6'
revision = "75e81e4ab7b6"
down_revision = None
branch_labels = None
depends_on = None
@@ -18,90 +19,125 @@ depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('author',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=200), nullable=False),
sa.PrimaryKeyConstraint('id')
op.create_table(
"author",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(length=200), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_table('genre',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=100), nullable=False),
sa.PrimaryKeyConstraint('id')
op.create_table(
"genre",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(length=100), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_table('user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(), nullable=False),
sa.Column('saved_searches', sa.JSON(), nullable=False),
sa.PrimaryKeyConstraint('id')
op.create_table(
"user",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("username", sa.String(), nullable=False),
sa.Column("saved_searches", sa.JSON(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_table('book',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(length=500), nullable=False),
sa.Column('description', sa.String(), nullable=False),
sa.Column('first_published', sa.Integer(), nullable=True),
sa.Column('edition', sa.String(length=200), nullable=False),
sa.Column('publisher', sa.String(length=200), nullable=False),
sa.Column('isbn', sa.String(length=20), nullable=False),
sa.Column('notes', sa.String(), nullable=False),
sa.Column('added_date', sa.DateTime(), nullable=False),
sa.Column('bought_date', sa.Date(), nullable=True),
sa.Column('location_place', sa.String(length=100), nullable=False),
sa.Column('location_bookshelf', sa.String(length=100), nullable=False),
sa.Column('location_shelf', sa.Integer(), nullable=True),
sa.Column('loaned_to', sa.String(length=200), nullable=False),
sa.Column('loaned_date', sa.Date(), nullable=True),
sa.Column('owner_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['owner_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
op.create_table(
"book",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("title", sa.String(length=500), nullable=False),
sa.Column("description", sa.String(), nullable=False),
sa.Column("first_published", sa.Integer(), nullable=True),
sa.Column("edition", sa.String(length=200), nullable=False),
sa.Column("publisher", sa.String(length=200), nullable=False),
sa.Column("isbn", sa.String(length=20), nullable=False),
sa.Column("notes", sa.String(), nullable=False),
sa.Column("added_date", sa.DateTime(), nullable=False),
sa.Column("bought_date", sa.Date(), nullable=True),
sa.Column("location_place", sa.String(length=100), nullable=False),
sa.Column("location_bookshelf", sa.String(length=100), nullable=False),
sa.Column("location_shelf", sa.Integer(), nullable=True),
sa.Column("loaned_to", sa.String(length=200), nullable=False),
sa.Column("loaned_date", sa.Date(), nullable=True),
sa.Column("owner_id", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["owner_id"],
["user.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table('book_author',
sa.Column('book_id', sa.Integer(), nullable=False),
sa.Column('author_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['author_id'], ['author.id'], ),
sa.ForeignKeyConstraint(['book_id'], ['book.id'], ),
sa.PrimaryKeyConstraint('book_id', 'author_id')
op.create_table(
"book_author",
sa.Column("book_id", sa.Integer(), nullable=False),
sa.Column("author_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["author_id"],
["author.id"],
),
sa.ForeignKeyConstraint(
["book_id"],
["book.id"],
),
sa.PrimaryKeyConstraint("book_id", "author_id"),
)
op.create_table('book_genre',
sa.Column('book_id', sa.Integer(), nullable=False),
sa.Column('genre_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['book_id'], ['book.id'], ),
sa.ForeignKeyConstraint(['genre_id'], ['genre.id'], ),
sa.PrimaryKeyConstraint('book_id', 'genre_id')
op.create_table(
"book_genre",
sa.Column("book_id", sa.Integer(), nullable=False),
sa.Column("genre_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["book_id"],
["book.id"],
),
sa.ForeignKeyConstraint(
["genre_id"],
["genre.id"],
),
sa.PrimaryKeyConstraint("book_id", "genre_id"),
)
op.create_table('reading',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('start_date', sa.Date(), nullable=False),
sa.Column('end_date', sa.Date(), nullable=True),
sa.Column('finished', sa.Boolean(), nullable=False),
sa.Column('dropped', sa.Boolean(), nullable=False),
sa.Column('rating', sa.Integer(), nullable=True),
sa.Column('comments', sa.String(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('book_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['book_id'], ['book.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
op.create_table(
"reading",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("start_date", sa.Date(), nullable=False),
sa.Column("end_date", sa.Date(), nullable=True),
sa.Column("finished", sa.Boolean(), nullable=False),
sa.Column("dropped", sa.Boolean(), nullable=False),
sa.Column("rating", sa.Integer(), nullable=True),
sa.Column("comments", sa.String(), nullable=False),
sa.Column("user_id", sa.Integer(), nullable=False),
sa.Column("book_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["book_id"],
["book.id"],
),
sa.ForeignKeyConstraint(
["user_id"],
["user.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table('wishlist',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('wishlisted_date', sa.Date(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('book_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['book_id'], ['book.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
op.create_table(
"wishlist",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("wishlisted_date", sa.Date(), nullable=False),
sa.Column("user_id", sa.Integer(), nullable=False),
sa.Column("book_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["book_id"],
["book.id"],
),
sa.ForeignKeyConstraint(
["user_id"],
["user.id"],
),
sa.PrimaryKeyConstraint("id"),
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('wishlist')
op.drop_table('reading')
op.drop_table('book_genre')
op.drop_table('book_author')
op.drop_table('book')
op.drop_table('user')
op.drop_table('genre')
op.drop_table('author')
op.drop_table("wishlist")
op.drop_table("reading")
op.drop_table("book_genre")
op.drop_table("book_author")
op.drop_table("book")
op.drop_table("user")
op.drop_table("genre")
op.drop_table("author")
# ### end Alembic commands ###

View File

@@ -15,7 +15,6 @@ dependencies = [
"jinja2-fragments>=1.11.0",
"pydantic>=2.12.5",
"pyparsing>=3.3.2",
"pytest>=9.0.2",
"requests>=2.32.5",
"sqlalchemy>=2.0.48",
]
@@ -27,5 +26,38 @@ hxbooks = "hxbooks.cli:cli"
requires = ["uv_build>=0.10.10,<0.11.0"]
build-backend = "uv_build"
[dependency-groups]
dev = [
"pre-commit>=4.5.1",
"pytest>=9.0.2",
"ruff>=0.15.6",
"ty>=0.0.23",
]
[tool.pytest.ini_options]
addopts = ["-v", "--tb=short"]
[tool.ruff]
preview = true
exclude = [
"migrations/**",
"src/hxbooks/book.py",
"src/hxbooks/util.py",
"src/hxbooks/auth.py",
"src/hxbooks/gbooks.py",
]
[tool.ruff.lint]
select = ["E", "F", "B", "C90", "UP", "RUF", "FURB", "PL", "ANN"]
ignore = ["PLR09", "PLR2004", "E501", "C901", "PLC1901"]
per-file-ignores = { "tests/**.py" = ["PLR6301"] }
[tool.ty.src]
exclude = [
"migrations/**",
"src/hxbooks/book.py",
"src/hxbooks/util.py",
"src/hxbooks/auth.py",
"src/hxbooks/gbooks.py",
"src/hxbooks/htmx.py",
]

View File

@@ -1,54 +0,0 @@
import os
from pathlib import Path
from typing import Optional
from flask import Flask
from flask_migrate import Migrate
from . import auth, book, db
from .htmx import htmx
# Get the project root (parent of src/)
PROJECT_ROOT = Path(__file__).parent.parent.parent
def create_app(test_config: Optional[dict] = None) -> Flask:
# Set instance folder to project root/instance
app = Flask(__name__, instance_path=str(PROJECT_ROOT / "instance"))
app.config.from_mapping(
SECRET_KEY="dev",
# Put database in project root
SQLALCHEMY_DATABASE_URI=f"sqlite:///{PROJECT_ROOT / 'hxbooks.sqlite'}",
)
if test_config is None:
# load the instance config, if it exists, when not testing
app.config.from_pyfile("config.py", silent=True)
else:
# load the test config if passed in
app.config.from_mapping(test_config)
# ensure the instance folder exists
try:
os.makedirs(app.instance_path)
except OSError:
pass
db.init_app(app)
htmx.init_app(app)
# Initialize migrations
migrate = Migrate(app, db.db)
app.register_blueprint(auth.bp)
app.register_blueprint(book.bp)
app.add_url_rule("/", endpoint="books.books")
return app
if __name__ == "__main__":
app = create_app()
app.run()

View File

@@ -1,6 +1,6 @@
import livereload # type: ignore
from hxbooks import create_app
from hxbooks.app import create_app
app = create_app()
app.debug = True

48
src/hxbooks/app.py Normal file
View File

@@ -0,0 +1,48 @@
import os
from pathlib import Path
from flask import Flask
from flask_migrate import Migrate
from . import auth, book, db
from .htmx import htmx
# Get the project root (parent of src/)
PROJECT_ROOT = Path(__file__).parent.parent.parent
def create_app(test_config: dict | None = None) -> Flask:
# Set instance folder to project root/instance
app = Flask(__name__, instance_path=str(PROJECT_ROOT / "instance"))
app.config.from_mapping(
SECRET_KEY="dev",
# Put database in project root
SQLALCHEMY_DATABASE_URI=f"sqlite:///{PROJECT_ROOT / 'hxbooks.sqlite'}",
)
if test_config is None:
# load the instance config, if it exists, when not testing
app.config.from_pyfile("config.py", silent=True)
else:
# load the test config if passed in
app.config.from_mapping(test_config)
# ensure the instance folder exists
try:
os.makedirs(app.instance_path)
except OSError:
pass
db.init_app(app)
htmx.init_app(app)
# Initialize migrations
Migrate(app, db.db)
app.register_blueprint(auth.bp)
app.register_blueprint(book.bp)
app.add_url_rule("/", endpoint="books.books")
return app

View File

@@ -64,19 +64,19 @@ ResultColumn = Literal[
class SearchRequestSchema(BaseModel, extra="forbid"):
q: str = ""
wishlisted: Optional[bool] = None
read: Optional[bool] = None
reading: Optional[bool] = None
dropped: Optional[bool] = None
bought_start: Optional[date] = None
bought_end: Optional[date] = None
started_reading_start: Optional[date] = None
started_reading_end: Optional[date] = None
finished_reading_start: Optional[date] = None
finished_reading_end: Optional[date] = None
wishlisted: bool | None = None
read: bool | None = None
reading: bool | None = None
dropped: bool | None = None
bought_start: date | None = None
bought_end: date | None = None
started_reading_start: date | None = None
started_reading_end: date | None = None
finished_reading_start: date | None = None
finished_reading_end: date | None = None
sort_by: ResultColumn = "title"
sort_order: Literal["asc", "desc"] = "asc"
saved_search: Optional[str] = None
saved_search: str | None = None
@field_validator(
"wishlisted",
@@ -104,13 +104,13 @@ class BookResultSchema(BaseModel):
authors: list[str]
genres: list[str]
publisher: str
first_published: Optional[int]
first_published: int | None
edition: str
added: datetime
description: str
notes: str
isbn: str
owner: Optional[str]
owner: str | None
bought: date
location: str
loaned_to: str
@@ -119,8 +119,8 @@ class BookResultSchema(BaseModel):
read: bool
reading: bool
dropped: bool
started_reading: Optional[date]
finished_reading: Optional[date]
started_reading: date | None
finished_reading: date | None
@bp.route("", methods=["GET"])
@@ -377,12 +377,10 @@ def get_default_searches(username: str) -> dict[str, SearchRequestSchema]:
def get_saved_searches(user: User) -> dict[str, SearchRequestSchema]:
searches = get_default_searches(user.username).copy()
searches.update(
{
name: SearchRequestSchema.model_validate(value)
for name, value in user.saved_searches.items()
}
)
searches.update({
name: SearchRequestSchema.model_validate(value)
for name, value in user.saved_searches.items()
})
for name, search in searches.items():
search.saved_search = name
return searches
@@ -390,14 +388,14 @@ def get_saved_searches(user: User) -> dict[str, SearchRequestSchema]:
class BookRequestSchema(BaseModel):
title: str = Field(min_length=1)
first_published: Optional[int] = None
first_published: int | None = None
edition: str = ""
notes: str = ""
isbn: str = ""
authors: list[str] = []
genres: list[str] = []
publisher: str = ""
owner_id: Optional[int] = None
owner_id: int | None = None
bought: date = Field(default_factory=datetime.today)
location: str = "billy salon"
loaned_to: str = ""
@@ -456,7 +454,8 @@ def book(id: int) -> str | Response:
"users": db.session.execute(select(User)).scalars().all(),
"genres": get_distinct_json_list_values(Book.genres),
"authors": get_distinct_json_list_values(Book.authors),
"locations": db.session.execute(select(Book.location).distinct())
"locations": db.session
.execute(select(Book.location).distinct())
.scalars()
.all(),
"wished_by": [wishlist.user.username for wishlist in book.wished_by],
@@ -481,10 +480,10 @@ def readings_new(id: int) -> str:
class ReadingRequestSchema(BaseModel):
start_date: date = Field(default_factory=datetime.today)
end_date: Optional[date] = None
end_date: date | None = None
finished: bool = False
dropped: bool = False
rating: Optional[int] = None
rating: int | None = None
comments: str = ""
user_id: int
book_id: int

View File

@@ -7,14 +7,14 @@ while keeping business logic separate from web interface concerns.
import json
import sys
from pathlib import Path
from typing import Optional
import click
from flask import Flask
from . import create_app
from .services import BookService, ReadingService, WishlistService
from . import library
from .app import create_app
from .db import db
from .models import Author, Book, Genre, Reading, User, Wishlist
def get_app() -> Flask:
@@ -24,8 +24,6 @@ def get_app() -> Flask:
def ensure_user_exists(app: Flask, username: str) -> int:
"""Ensure a user exists and return their ID."""
from .db import db
from .models import User
with app.app_context():
user = db.session.execute(
@@ -43,31 +41,31 @@ def ensure_user_exists(app: Flask, username: str) -> int:
@click.group()
@click.version_option()
def cli():
def cli() -> None:
"""HXBooks - Personal library management system."""
pass
@cli.group()
def book():
def book() -> None:
"""Book management commands."""
pass
@cli.group()
def reading():
def reading() -> None:
"""Reading tracking commands."""
pass
@cli.group()
def wishlist():
def wishlist() -> None:
"""Wishlist management commands."""
pass
@cli.group()
def db():
@cli.group("db")
def db_group() -> None:
"""Database management commands."""
pass
@@ -89,26 +87,24 @@ def db():
def add_book(
title: str,
owner: str,
authors: Optional[str] = None,
genres: Optional[str] = None,
isbn: Optional[str] = None,
publisher: Optional[str] = None,
edition: Optional[str] = None,
place: Optional[str] = None,
bookshelf: Optional[str] = None,
shelf: Optional[int] = None,
description: Optional[str] = None,
notes: Optional[str] = None,
):
authors: str | None = None,
genres: str | None = None,
isbn: str | None = None,
publisher: str | None = None,
edition: str | None = None,
place: str | None = None,
bookshelf: str | None = None,
shelf: int | None = None,
description: str | None = None,
notes: str | None = None,
) -> None:
"""Add a new book to the library."""
app = get_app()
user_id = ensure_user_exists(app, owner)
with app.app_context():
service = BookService()
try:
book = service.create_book(
book = library.create_book(
title=title,
owner_id=user_id,
authors=authors.split(",") if authors else None,
@@ -142,21 +138,19 @@ def add_book(
)
@click.option("--limit", type=int, default=50, help="Maximum number of books to show")
def list_books(
owner: Optional[str] = None,
place: Optional[str] = None,
bookshelf: Optional[str] = None,
shelf: Optional[int] = None,
owner: str | None = None,
place: str | None = None,
bookshelf: str | None = None,
shelf: int | None = None,
output_format: str = "table",
limit: int = 50,
):
) -> None:
"""List books in the library."""
app = get_app()
with app.app_context():
service = BookService()
try:
books = service.search_books(
books = library.search_books(
owner_username=owner,
location_place=place,
location_bookshelf=bookshelf,
@@ -167,17 +161,15 @@ def list_books(
if output_format == "json":
book_data = []
for book in books:
book_data.append(
{
"id": book.id,
"title": book.title,
"authors": [a.name for a in book.authors],
"genres": [g.name for g in book.genres],
"owner": book.owner.username if book.owner else None,
"location": f"{book.location_place}/{book.location_bookshelf}/{book.location_shelf}",
"isbn": book.isbn,
}
)
book_data.append({
"id": book.id,
"title": book.title,
"authors": [a.name for a in book.authors],
"genres": [g.name for g in book.genres],
"owner": book.owner.username if book.owner else None,
"location": f"{book.location_place}/{book.location_bookshelf}/{book.location_shelf}",
"isbn": book.isbn,
})
click.echo(json.dumps(book_data, indent=2))
else:
# Table format
@@ -215,20 +207,16 @@ def list_books(
@click.option("--limit", type=int, default=20, help="Maximum number of results")
def search_books(
query: str,
owner: Optional[str] = None,
owner: str | None = None,
output_format: str = "table",
limit: int = 20,
):
) -> None:
"""Search books using query language (e.g., 'genre:thriller read>=2025-01-01')."""
app = get_app()
with app.app_context():
book_service = BookService()
try:
results = book_service.search_books_advanced(
query_string=query, limit=limit
)
results = library.search_books_advanced(query_string=query, limit=limit)
if output_format == "json":
click.echo(json.dumps(results, indent=2))
@@ -263,19 +251,17 @@ def search_books(
def import_book(
isbn: str,
owner: str,
place: Optional[str] = None,
bookshelf: Optional[str] = None,
shelf: Optional[int] = None,
):
place: str | None = None,
bookshelf: str | None = None,
shelf: int | None = None,
) -> None:
"""Import book data from ISBN using Google Books API."""
app = get_app()
user_id = ensure_user_exists(app, owner)
with app.app_context():
service = BookService()
try:
book = service.import_book_from_isbn(
book = library.import_book_from_isbn(
isbn=isbn,
owner_id=user_id,
location_place=place,
@@ -294,16 +280,14 @@ def import_book(
@reading.command("start")
@click.argument("book_id", type=int)
@click.option("--owner", required=True, help="Username of reader")
def start_reading(book_id: int, owner: str):
def start_reading(book_id: int, owner: str) -> None:
"""Start a new reading session for a book."""
app = get_app()
user_id = ensure_user_exists(app, owner)
with app.app_context():
service = ReadingService()
try:
reading_session = service.start_reading(book_id=book_id, user_id=user_id)
reading_session = library.start_reading(book_id=book_id, user_id=user_id)
click.echo(
f"Started reading session {reading_session.id} for book {book_id}"
)
@@ -317,16 +301,14 @@ def start_reading(book_id: int, owner: str):
@click.option("--rating", type=click.IntRange(1, 5), help="Rating from 1-5")
@click.option("--comments", help="Reading comments")
def finish_reading(
reading_id: int, rating: Optional[int] = None, comments: Optional[str] = None
):
reading_id: int, rating: int | None = None, comments: str | None = None
) -> None:
"""Finish a reading session."""
app = get_app()
with app.app_context():
service = ReadingService()
try:
reading_session = service.finish_reading(
reading_session = library.finish_reading(
reading_id=reading_id,
rating=rating,
comments=comments,
@@ -343,15 +325,13 @@ def finish_reading(
@reading.command("drop")
@click.argument("reading_id", type=int)
@click.option("--comments", help="Comments about why dropped")
def drop_reading(reading_id: int, comments: Optional[str] = None):
def drop_reading(reading_id: int, comments: str | None = None) -> None:
"""Mark a reading session as dropped."""
app = get_app()
with app.app_context():
service = ReadingService()
try:
reading_session = service.drop_reading(
reading_session = library.drop_reading(
reading_id=reading_id, comments=comments
)
book_title = reading_session.book.title
@@ -371,38 +351,36 @@ def drop_reading(reading_id: int, comments: Optional[str] = None):
default="table",
help="Output format",
)
def list_readings(owner: str, current: bool = False, output_format: str = "table"):
def list_readings(
owner: str, current: bool = False, output_format: str = "table"
) -> None:
"""List reading sessions."""
app = get_app()
user_id = ensure_user_exists(app, owner)
with app.app_context():
service = ReadingService()
try:
if current:
readings = service.get_current_readings(user_id=user_id)
readings = library.get_current_readings(user_id=user_id)
else:
readings = service.get_reading_history(user_id=user_id)
readings = library.get_reading_history(user_id=user_id)
if output_format == "json":
reading_data = []
for reading in readings:
reading_data.append(
{
"id": reading.id,
"book_id": reading.book_id,
"book_title": reading.book.title,
"start_date": reading.start_date.isoformat(),
"end_date": reading.end_date.isoformat()
if reading.end_date
else None,
"finished": reading.finished,
"dropped": reading.dropped,
"rating": reading.rating,
"comments": reading.comments,
}
)
reading_data.append({
"id": reading.id,
"book_id": reading.book_id,
"book_title": reading.book.title,
"start_date": reading.start_date.isoformat(),
"end_date": reading.end_date.isoformat()
if reading.end_date
else None,
"finished": reading.finished,
"dropped": reading.dropped,
"rating": reading.rating,
"comments": reading.comments,
})
click.echo(json.dumps(reading_data, indent=2))
else:
# Table format
@@ -437,16 +415,14 @@ def list_readings(owner: str, current: bool = False, output_format: str = "table
@wishlist.command("add")
@click.argument("book_id", type=int)
@click.option("--owner", required=True, help="Username")
def add_to_wishlist(book_id: int, owner: str):
def add_to_wishlist(book_id: int, owner: str) -> None:
"""Add a book to wishlist."""
app = get_app()
user_id = ensure_user_exists(app, owner)
with app.app_context():
service = WishlistService()
try:
wishlist_item = service.add_to_wishlist(book_id=book_id, user_id=user_id)
wishlist_item = library.add_to_wishlist(book_id=book_id, user_id=user_id)
book_title = wishlist_item.book.title
click.echo(f"Added '{book_title}' to wishlist")
except Exception as e:
@@ -457,16 +433,14 @@ def add_to_wishlist(book_id: int, owner: str):
@wishlist.command("remove")
@click.argument("book_id", type=int)
@click.option("--owner", required=True, help="Username")
def remove_from_wishlist(book_id: int, owner: str):
def remove_from_wishlist(book_id: int, owner: str) -> None:
"""Remove a book from wishlist."""
app = get_app()
user_id = ensure_user_exists(app, owner)
with app.app_context():
service = WishlistService()
try:
if service.remove_from_wishlist(book_id=book_id, user_id=user_id):
if library.remove_from_wishlist(book_id=book_id, user_id=user_id):
click.echo(f"Removed book {book_id} from wishlist")
else:
click.echo(f"Book {book_id} was not in wishlist")
@@ -484,28 +458,24 @@ def remove_from_wishlist(book_id: int, owner: str):
default="table",
help="Output format",
)
def list_wishlist(owner: str, output_format: str = "table"):
def list_wishlist(owner: str, output_format: str = "table") -> None:
"""Show user's wishlist."""
app = get_app()
user_id = ensure_user_exists(app, owner)
with app.app_context():
service = WishlistService()
try:
wishlist_items = service.get_wishlist(user_id=user_id)
wishlist_items = library.get_wishlist(user_id=user_id)
if output_format == "json":
wishlist_data = []
for item in wishlist_items:
wishlist_data.append(
{
"book_id": item.book_id,
"title": item.book.title,
"authors": [author.name for author in item.book.authors],
"wishlisted_date": item.wishlisted_date.isoformat(),
}
)
wishlist_data.append({
"book_id": item.book_id,
"title": item.book.title,
"authors": [author.name for author in item.book.authors],
"wishlisted_date": item.wishlisted_date.isoformat(),
})
click.echo(json.dumps(wishlist_data, indent=2))
else:
# Table format
@@ -531,28 +501,24 @@ def list_wishlist(owner: str, output_format: str = "table"):
# Database commands
@db.command("init")
def init_db():
@db_group.command("init")
def init_db() -> None:
"""Initialize the database."""
app = get_app()
with app.app_context():
from .db import db
db.create_all()
click.echo("Database initialized.")
@db.command("seed")
@db_group.command("seed")
@click.option("--owner", default="test_user", help="Default owner for seed data")
def seed_db(owner: str):
def seed_db(owner: str) -> None:
"""Create some sample data for testing."""
app = get_app()
user_id = ensure_user_exists(app, owner)
with app.app_context():
book_service = BookService()
sample_books = [
{
"title": "The Hobbit",
@@ -589,7 +555,7 @@ def seed_db(owner: str):
created_books = []
for book_data in sample_books:
try:
book = book_service.create_book(owner_id=user_id, **book_data)
book = library.create_book(owner_id=user_id, **book_data) # ty:ignore[invalid-argument-type]
created_books.append(book)
click.echo(f"Created: {book.title}")
except Exception as e:
@@ -598,15 +564,12 @@ def seed_db(owner: str):
click.echo(f"Created {len(created_books)} sample books for user '{owner}'")
@db.command("status")
def db_status():
@db_group.command("status")
def db_status() -> None:
"""Show database status and statistics."""
app = get_app()
with app.app_context():
from .db import db
from .models import Author, Book, Genre, Reading, User, Wishlist
try:
book_count = db.session.execute(db.select(db.func.count(Book.id))).scalar()
author_count = db.session.execute(

View File

@@ -1,5 +1,5 @@
from datetime import date, datetime
from typing import Any, Optional
from typing import Any
import requests
from pydantic import BaseModel, field_validator
@@ -53,7 +53,7 @@ class GoogleBook(BaseModel):
title: str
authors: list[str] = []
publisher: str = ""
publishedDate: Optional[date | int] = None
publishedDate: date | int | None = None
description: str = ""
industryIdentifiers: list[dict[str, str]] = []
pageCount: int = 0

698
src/hxbooks/library.py Normal file
View File

@@ -0,0 +1,698 @@
"""
Business logic services for HXBooks.
Clean service layer for book management, reading tracking, and wishlist operations.
Separated from web interface concerns to enable both CLI and web access.
"""
from collections.abc import Sequence
from datetime import date, datetime
from sqlalchemy import ColumnElement, and_, or_
from sqlalchemy.orm import InstrumentedAttribute, joinedload
from hxbooks.search import QueryParser, ValueT
from .db import db
from .gbooks import fetch_google_book_data
from .models import Author, Book, Genre, Reading, User, Wishlist
from .search import ComparisonOperator, Field, FieldFilter
def create_book(
title: str,
owner_id: int | None = None,
authors: list[str] | None = None,
genres: list[str] | None = None,
isbn: str | None = None,
publisher: str | None = None,
edition: str | None = None,
description: str | None = None,
notes: str | None = None,
location_place: str | None = None,
location_bookshelf: str | None = None,
location_shelf: int | None = None,
first_published: int | None = None,
bought_date: date | None = None,
) -> Book:
"""Create a new book with the given details."""
book = Book(
title=title,
owner_id=owner_id,
isbn=isbn or "",
publisher=publisher or "",
edition=edition or "",
description=description or "",
notes=notes or "",
location_place=location_place or "",
location_bookshelf=location_bookshelf or "",
location_shelf=location_shelf,
first_published=first_published,
bought_date=bought_date,
)
db.session.add(book)
# Handle authors
if authors:
for author_name in [a_strip for a in authors if (a_strip := a.strip())]:
author = _get_or_create_author(author_name)
book.authors.append(author)
# Handle genres
if genres:
for genre_name in [g_strip for g in genres if (g_strip := g.strip())]:
genre = _get_or_create_genre(genre_name)
book.genres.append(genre)
db.session.commit()
return book
def get_book(book_id: int) -> Book | None:
"""Get a book by ID with all relationships loaded."""
return db.session.execute(
db
.select(Book)
.options(
joinedload(Book.authors),
joinedload(Book.genres),
joinedload(Book.owner),
)
.filter(Book.id == book_id)
).scalar_one_or_none()
def update_book(
book_id: int,
title: str | None = None,
authors: list[str] | None = None,
genres: list[str] | None = None,
isbn: str | None = None,
publisher: str | None = None,
edition: str | None = None,
description: str | None = None,
notes: str | None = None,
location_place: str | None = None,
location_bookshelf: str | None = None,
location_shelf: int | None = None,
first_published: int | None = None,
bought_date: date | None = None,
) -> Book | None:
"""Update a book with new details."""
book = get_book(book_id)
if not book:
return None
# Update scalar fields
if title is not None:
book.title = title
if isbn is not None:
book.isbn = isbn
if publisher is not None:
book.publisher = publisher
if edition is not None:
book.edition = edition
if description is not None:
book.description = description
if notes is not None:
book.notes = notes
if location_place is not None:
book.location_place = location_place
if location_bookshelf is not None:
book.location_bookshelf = location_bookshelf
if location_shelf is not None:
book.location_shelf = location_shelf
if first_published is not None:
book.first_published = first_published
if bought_date is not None:
book.bought_date = bought_date
# Update authors
if authors is not None:
book.authors.clear()
for author_name in [a_strip for a in authors if (a_strip := a.strip())]:
author = _get_or_create_author(author_name)
book.authors.append(author)
# Update genres
if genres is not None:
book.genres.clear()
for genre_name in [g_strip for g in genres if (g_strip := g.strip())]:
genre = _get_or_create_genre(genre_name)
book.genres.append(genre)
db.session.commit()
return book
def delete_book(book_id: int) -> bool:
"""Delete a book and all related data."""
book = get_book(book_id)
if not book:
return False
db.session.delete(book)
db.session.commit()
return True
def search_books(
text_query: str | None = None,
owner_username: str | None = None,
location_place: str | None = None,
location_bookshelf: str | None = None,
location_shelf: int | None = None,
author_name: str | None = None,
genre_name: str | None = None,
isbn: str | None = None,
limit: int = 50,
) -> Sequence[Book]:
"""
Search books with various filters.
For now implements basic filtering - advanced query parsing will be added later.
"""
query = db.select(Book).options(
joinedload(Book.authors), joinedload(Book.genres), joinedload(Book.owner)
)
conditions = []
# Text search across multiple fields
if text_query:
text_query = text_query.strip()
if text_query:
text_conditions = []
# Search in title, description, notes
text_conditions.extend((
Book.title.icontains(text_query),
Book.description.icontains(text_query),
Book.notes.icontains(text_query),
Book.publisher.icontains(text_query),
Book.authors.any(Author.name.icontains(text_query)),
Book.genres.any(Genre.name.icontains(text_query)),
))
conditions.append(or_(*text_conditions))
# Owner filter
if owner_username:
conditions.append(Book.owner.has(User.username == owner_username))
# Location filters
if location_place:
conditions.append(Book.location_place.icontains(location_place))
if location_bookshelf:
conditions.append(Book.location_bookshelf.icontains(location_bookshelf))
if location_shelf is not None:
conditions.append(Book.location_shelf == location_shelf)
# Author filter
if author_name:
conditions.append(Book.authors.any(Author.name.icontains(author_name)))
# Genre filter
if genre_name:
conditions.append(Book.genres.any(Genre.name.icontains(genre_name)))
# ISBN filter
if isbn:
conditions.append(Book.isbn == isbn)
# Apply all conditions
if conditions:
query = query.filter(and_(*conditions))
query = query.distinct().limit(limit)
result = db.session.execute(query)
return result.scalars().unique().all()
query_parser = QueryParser()
def search_books_advanced(query_string: str, limit: int = 50) -> Sequence[Book]:
"""Advanced search with field filters supporting comparison operators."""
parsed_query = query_parser.parse(query_string)
query = db.select(Book).options(
joinedload(Book.authors), joinedload(Book.genres), joinedload(Book.owner)
)
conditions = []
# Text search across multiple fields (same as basic search)
if parsed_query.text_terms:
for text_query in [
t_strip for t in parsed_query.text_terms if (t_strip := t.strip())
]:
text_conditions = []
# Search in title, description, notes
text_conditions.extend((
Book.title.icontains(text_query),
Book.description.icontains(text_query),
Book.notes.icontains(text_query),
Book.publisher.icontains(text_query),
Book.authors.any(Author.name.icontains(text_query)),
Book.genres.any(Genre.name.icontains(text_query)),
))
conditions.append(or_(*text_conditions))
# Advanced field filters
if parsed_query.field_filters:
for field_filter in parsed_query.field_filters:
condition = _build_field_condition(field_filter)
if condition is not None:
if field_filter.negated:
condition = ~condition
conditions.append(condition)
# Apply all conditions
if conditions:
query = query.filter(and_(*conditions))
query = query.distinct().limit(limit)
result = db.session.execute(query)
# return result.scalars().unique().all()
results = []
for book in result.scalars().unique().all():
results.append({
"id": book.id,
"title": book.title,
"authors": [author.name for author in book.authors],
"genres": [genre.name for genre in book.genres],
"owner": book.owner.username if book.owner else None,
"isbn": book.isbn,
"publisher": book.publisher,
"description": book.description,
"location": {
"place": book.location_place,
"bookshelf": book.location_bookshelf,
"shelf": book.location_shelf,
},
"loaned_to": book.loaned_to,
"loaned_date": book.loaned_date.isoformat() if book.loaned_date else None,
"added_date": book.added_date.isoformat(),
"bought_date": book.bought_date.isoformat() if book.bought_date else None,
})
return results
def _build_field_condition(field_filter: FieldFilter) -> ColumnElement | None:
"""
Build a SQLAlchemy condition for a field filter.
"""
field = field_filter.field
operator = field_filter.operator
value = field_filter.value
# Map field names to Book attributes or special handling
if field == Field.TITLE:
field_attr = Book.title
elif field == Field.AUTHOR:
return Book.authors.any(_apply_operator(Author.name, operator, value))
elif field == Field.GENRE:
return Book.genres.any(_apply_operator(Genre.name, operator, value))
elif field == Field.ISBN:
field_attr = Book.isbn
elif field == Field.PLACE:
field_attr = Book.location_place
elif field == Field.BOOKSHELF:
field_attr = Book.location_bookshelf
elif field == Field.SHELF:
field_attr = Book.location_shelf
elif field == Field.ADDED_DATE:
field_attr = Book.added_date
elif field == Field.BOUGHT_DATE:
field_attr = Book.bought_date
elif field == Field.LOANED_DATE:
field_attr = Book.loaned_date
elif field == Field.OWNER:
return Book.owner.has(_apply_operator(User.username, operator, value))
else:
# Unknown field, skip
return None
condition = _apply_operator(field_attr, operator, value)
return condition
def _apply_operator(
field_attr: InstrumentedAttribute, operator: ComparisonOperator, value: ValueT
) -> ColumnElement:
"""Apply a comparison operator to a field attribute."""
if operator == ComparisonOperator.EQUALS:
if isinstance(value, str):
return field_attr.icontains(value) # Case-insensitive contains for strings
else:
return field_attr == value
elif operator == ComparisonOperator.GREATER:
return field_attr > value
elif operator == ComparisonOperator.GREATER_EQUAL:
return field_attr >= value
elif operator == ComparisonOperator.LESS:
return field_attr < value
elif operator == ComparisonOperator.LESS_EQUAL:
return field_attr <= value
elif operator == ComparisonOperator.NOT_EQUALS:
if isinstance(value, str):
return ~field_attr.icontains(value)
else:
return field_attr != value
else:
# Default to equals
return field_attr == value
def import_book_from_isbn(
isbn: str,
owner_id: int | None = None,
location_place: str | None = None,
location_bookshelf: str | None = None,
location_shelf: int | None = None,
) -> Book:
"""Import book data from Google Books API using ISBN."""
google_book_data = fetch_google_book_data(isbn)
if not google_book_data:
raise ValueError(f"No book data found for ISBN: {isbn}")
# Convert Google Books data to our format
authors = []
if google_book_data.authors:
authors = google_book_data.authors
genres = []
if google_book_data.categories:
genres = google_book_data.categories
first_published = None
if google_book_data.publishedDate:
if isinstance(google_book_data.publishedDate, date):
first_published = google_book_data.publishedDate.year
elif isinstance(google_book_data.publishedDate, int):
first_published = google_book_data.publishedDate
return create_book(
title=google_book_data.title,
owner_id=owner_id,
authors=authors,
genres=genres,
isbn=isbn,
publisher=google_book_data.publisher or "",
description=google_book_data.description or "",
first_published=first_published,
location_place=location_place,
location_bookshelf=location_bookshelf,
location_shelf=location_shelf,
)
def get_books_by_location(
place: str, bookshelf: str | None = None, shelf: int | None = None
) -> Sequence[Book]:
"""Get all books at a specific location."""
return search_books(
location_place=place,
location_bookshelf=bookshelf,
location_shelf=shelf,
limit=1000, # Large limit for location queries
)
def _get_or_create_author(name: str) -> Author:
"""Get existing author or create a new one."""
author = db.session.execute(
db.select(Author).filter(Author.name == name)
).scalar_one_or_none()
if author is None:
author = Author(name=name)
db.session.add(author)
# Don't commit here - let the caller handle the transaction
return author
def _get_or_create_genre(name: str) -> Genre:
"""Get existing genre or create a new one."""
genre = db.session.execute(
db.select(Genre).filter(Genre.name == name)
).scalar_one_or_none()
if genre is None:
genre = Genre(name=name)
db.session.add(genre)
# Don't commit here - let the caller handle the transaction
return genre
def start_reading(
book_id: int, user_id: int, start_date: date | None = None
) -> Reading:
"""Start a new reading session."""
# Check if book exists
book = db.session.get(Book, book_id)
if not book:
raise ValueError(f"Book not found: {book_id}")
# Check if user exists
user = db.session.get(User, user_id)
if not user:
raise ValueError(f"User not found: {user_id}")
# Check if already reading this book
existing_reading = db.session.execute(
db.select(Reading).filter(
and_(
Reading.book_id == book_id,
Reading.user_id == user_id,
Reading.end_date.is_(None), # Not finished yet
)
)
).scalar_one_or_none()
if existing_reading:
raise ValueError(
f"Already reading this book (reading session {existing_reading.id})"
)
reading = Reading(
book_id=book_id,
user_id=user_id,
start_date=start_date or datetime.now().date(),
)
db.session.add(reading)
db.session.commit()
return reading
def finish_reading(
reading_id: int,
rating: int | None = None,
comments: str | None = None,
end_date: date | None = None,
) -> Reading:
"""Finish a reading session."""
reading = db.session.execute(
db
.select(Reading)
.options(joinedload(Reading.book))
.filter(Reading.id == reading_id)
).scalar_one_or_none()
if not reading:
raise ValueError(f"Reading session not found: {reading_id}")
if reading.end_date is not None:
raise ValueError(f"Reading session {reading_id} is already finished")
reading.end_date = end_date or datetime.now().date()
reading.finished = True
reading.dropped = False
if rating is not None:
if not (1 <= rating <= 5):
raise ValueError("Rating must be between 1 and 5")
reading.rating = rating
if comments is not None:
reading.comments = comments
db.session.commit()
return reading
def drop_reading(
reading_id: int,
comments: str | None = None,
end_date: date | None = None,
) -> Reading:
"""Mark a reading session as dropped."""
reading = db.session.execute(
db
.select(Reading)
.options(joinedload(Reading.book))
.filter(Reading.id == reading_id)
).scalar_one_or_none()
if not reading:
raise ValueError(f"Reading session not found: {reading_id}")
if reading.end_date is not None:
raise ValueError(f"Reading session {reading_id} is already finished")
reading.end_date = end_date or datetime.now().date()
reading.finished = False
reading.dropped = True
if comments is not None:
reading.comments = comments
db.session.commit()
return reading
def get_current_readings(user_id: int) -> Sequence[Reading]:
"""Get all current (unfinished) readings for a user."""
return (
db.session
.execute(
db
.select(Reading)
.options(joinedload(Reading.book).joinedload(Book.authors))
.filter(
and_(
Reading.user_id == user_id,
Reading.end_date.is_(None),
)
)
.order_by(Reading.start_date.desc())
)
.scalars()
.unique()
.all()
)
def get_reading_history(user_id: int, limit: int = 50) -> Sequence[Reading]:
"""Get reading history for a user."""
return (
db.session
.execute(
db
.select(Reading)
.options(joinedload(Reading.book).joinedload(Book.authors))
.filter(Reading.user_id == user_id)
.order_by(Reading.start_date.desc())
.limit(limit)
)
.scalars()
.unique()
.all()
)
def add_to_wishlist(book_id: int, user_id: int) -> Wishlist:
"""Add a book to user's wishlist."""
# Check if book exists
book = db.session.get(Book, book_id)
if not book:
raise ValueError(f"Book not found: {book_id}")
# Check if user exists
user = db.session.get(User, user_id)
if not user:
raise ValueError(f"User not found: {user_id}")
# Check if already in wishlist
existing = db.session.execute(
db.select(Wishlist).filter(
and_(
Wishlist.book_id == book_id,
Wishlist.user_id == user_id,
)
)
).scalar_one_or_none()
if existing:
raise ValueError("Book is already in wishlist")
wishlist_item = Wishlist(
book_id=book_id,
user_id=user_id,
)
db.session.add(wishlist_item)
db.session.commit()
return wishlist_item
def remove_from_wishlist(book_id: int, user_id: int) -> bool:
"""Remove a book from user's wishlist."""
wishlist_item = db.session.execute(
db.select(Wishlist).filter(
and_(
Wishlist.book_id == book_id,
Wishlist.user_id == user_id,
)
)
).scalar_one_or_none()
if not wishlist_item:
return False
db.session.delete(wishlist_item)
db.session.commit()
return True
def get_wishlist(user_id: int) -> Sequence[Wishlist]:
"""Get user's wishlist."""
return (
db.session
.execute(
db
.select(Wishlist)
.options(joinedload(Wishlist.book).joinedload(Book.authors))
.filter(Wishlist.user_id == user_id)
.order_by(Wishlist.wishlisted_date.desc())
)
.scalars()
.unique()
.all()
)
def create_user(username: str) -> User:
"""Create a new user."""
# Check if username already exists
existing = db.session.execute(
db.select(User).filter(User.username == username)
).scalar_one_or_none()
if existing:
raise ValueError(f"Username '{username}' already exists")
user = User(username=username)
db.session.add(user)
db.session.commit()
return user
def get_user_by_username(username: str) -> User | None:
"""Get a user by username."""
return db.session.execute(
db.select(User).filter(User.username == username)
).scalar_one_or_none()
def list_users() -> Sequence[User]:
"""List all users."""
return db.session.execute(db.select(User).order_by(User.username)).scalars().all()

View File

@@ -1,5 +1,4 @@
from datetime import date, datetime
from typing import Optional
from sqlalchemy import JSON, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
@@ -7,67 +6,67 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship
from .db import db
class User(db.Model): # type: ignore[name-defined]
class User(db.Model): # ty:ignore[unsupported-base]
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column()
saved_searches: Mapped[dict] = mapped_column(JSON, default=dict)
readings: Mapped[list["Reading"]] = relationship(back_populates="user")
owned_books: Mapped[list["Book"]] = relationship(back_populates="owner")
wishes: Mapped[list["Wishlist"]] = relationship(back_populates="user")
readings: Mapped[list[Reading]] = relationship(back_populates="user")
owned_books: Mapped[list[Book]] = relationship(back_populates="owner")
wishes: Mapped[list[Wishlist]] = relationship(back_populates="user")
class Author(db.Model): # type: ignore[name-defined]
class Author(db.Model): # ty:ignore[unsupported-base]
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(200))
books: Mapped[list["Book"]] = relationship(
books: Mapped[list[Book]] = relationship(
secondary="book_author", back_populates="authors"
)
class Genre(db.Model): # type: ignore[name-defined]
class Genre(db.Model): # ty:ignore[unsupported-base]
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
books: Mapped[list["Book"]] = relationship(
books: Mapped[list[Book]] = relationship(
secondary="book_genre", back_populates="genres"
)
class BookAuthor(db.Model): # type: ignore[name-defined]
class BookAuthor(db.Model): # ty:ignore[unsupported-base]
__tablename__ = "book_author"
book_id: Mapped[int] = mapped_column(ForeignKey("book.id"), primary_key=True)
author_id: Mapped[int] = mapped_column(ForeignKey("author.id"), primary_key=True)
class BookGenre(db.Model): # type: ignore[name-defined]
class BookGenre(db.Model): # ty:ignore[unsupported-base]
__tablename__ = "book_genre"
book_id: Mapped[int] = mapped_column(ForeignKey("book.id"), primary_key=True)
genre_id: Mapped[int] = mapped_column(ForeignKey("genre.id"), primary_key=True)
class Book(db.Model): # type: ignore[name-defined]
class Book(db.Model): # ty:ignore[unsupported-base]
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(500), default="")
description: Mapped[str] = mapped_column(default="")
first_published: Mapped[Optional[int]] = mapped_column(default=None)
first_published: Mapped[int | None] = mapped_column(default=None)
edition: Mapped[str] = mapped_column(String(200), default="")
publisher: Mapped[str] = mapped_column(String(200), default="")
isbn: Mapped[str] = mapped_column(String(20), default="")
notes: Mapped[str] = mapped_column(default="")
added_date: Mapped[datetime] = mapped_column(default=datetime.now)
bought_date: Mapped[Optional[date]] = mapped_column(default=None)
bought_date: Mapped[date | None] = mapped_column(default=None)
# Location hierarchy
location_place: Mapped[str] = mapped_column(String(100), default="")
location_bookshelf: Mapped[str] = mapped_column(String(100), default="")
location_shelf: Mapped[Optional[int]] = mapped_column(default=None)
location_shelf: Mapped[int | None] = mapped_column(default=None)
# Loaning
loaned_to: Mapped[str] = mapped_column(String(200), default="")
loaned_date: Mapped[Optional[date]] = mapped_column(default=None)
loaned_date: Mapped[date | None] = mapped_column(default=None)
# Relationships
owner_id: Mapped[Optional[int]] = mapped_column(ForeignKey("user.id"))
owner: Mapped[Optional[User]] = relationship(back_populates="owned_books")
owner_id: Mapped[int | None] = mapped_column(ForeignKey("user.id"))
owner: Mapped[User | None] = relationship(back_populates="owned_books")
authors: Mapped[list[Author]] = relationship(
secondary="book_author", back_populates="books"
@@ -76,21 +75,21 @@ class Book(db.Model): # type: ignore[name-defined]
secondary="book_genre", back_populates="books"
)
readings: Mapped[list["Reading"]] = relationship(
readings: Mapped[list[Reading]] = relationship(
back_populates="book", cascade="delete, delete-orphan"
)
wished_by: Mapped[list["Wishlist"]] = relationship(
wished_by: Mapped[list[Wishlist]] = relationship(
back_populates="book", cascade="delete, delete-orphan"
)
class Reading(db.Model): # type: ignore[name-defined]
class Reading(db.Model): # ty:ignore[unsupported-base]
id: Mapped[int] = mapped_column(primary_key=True)
start_date: Mapped[date] = mapped_column(default=lambda: datetime.now().date())
end_date: Mapped[Optional[date]] = mapped_column(default=None)
end_date: Mapped[date | None] = mapped_column(default=None)
finished: Mapped[bool] = mapped_column(default=False)
dropped: Mapped[bool] = mapped_column(default=False)
rating: Mapped[Optional[int]] = mapped_column(default=None)
rating: Mapped[int | None] = mapped_column(default=None)
comments: Mapped[str] = mapped_column(default="")
user_id: Mapped[int] = mapped_column(ForeignKey("user.id"))
@@ -100,7 +99,7 @@ class Reading(db.Model): # type: ignore[name-defined]
book: Mapped[Book] = relationship(back_populates="readings")
class Wishlist(db.Model): # type: ignore[name-defined]
class Wishlist(db.Model): # ty:ignore[unsupported-base]
id: Mapped[int] = mapped_column(primary_key=True)
wishlisted_date: Mapped[date] = mapped_column(default=lambda: datetime.now().date())

View File

@@ -2,14 +2,12 @@
Search functionality for HXBooks.
Provides query parsing and search logic for finding books with advanced syntax.
Currently implements basic search - will be enhanced with pyparsing for advanced queries.
Currently implements basic search - will be enhanced with pyparsing for advanced queries
"""
from dataclasses import dataclass, field
from datetime import date, datetime
from enum import StrEnum
from re import A
from typing import Any, Dict, List, Optional, Union
import pyparsing as pp
@@ -44,13 +42,16 @@ class Field(StrEnum):
OWNER = "owner"
ValueT = str | int | float | date
@dataclass
class FieldFilter:
"""Represents a field-specific search filter."""
field: Field
operator: ComparisonOperator
value: Union[str, int, float, date]
value: ValueT
negated: bool = False
@@ -58,8 +59,8 @@ class FieldFilter:
class SearchQuery:
"""Enhanced structured representation of a search query."""
text_terms: List[str] = field(default_factory=list)
field_filters: List[FieldFilter] = field(default_factory=list)
text_terms: list[str] = field(default_factory=list)
field_filters: list[FieldFilter] = field(default_factory=list)
boolean_operator: str = "AND" # Default to AND for multiple terms
@@ -77,11 +78,11 @@ class QueryParser:
- Parentheses: (genre:fantasy OR genre:scifi) AND rating>=4
"""
def __init__(self):
def __init__(self) -> None:
"""Initialize the pyparsing grammar."""
self._build_grammar()
def _build_grammar(self):
def _build_grammar(self) -> None:
"""Build the pyparsing grammar for the query language."""
# Basic tokens
@@ -110,9 +111,9 @@ class QueryParser:
text_term = quoted_string | pp.Regex(r'[^\s():"]+(?![:\<\>=!])')
# Boolean operators
and_op = pp.CaselessKeyword("AND")
or_op = pp.CaselessKeyword("OR")
not_op = pp.CaselessKeyword("NOT")
# and_op = pp.CaselessKeyword("AND")
# or_op = pp.CaselessKeyword("OR")
# not_op = pp.CaselessKeyword("NOT")
# Basic search element
search_element = field_filter | text_term
@@ -132,7 +133,7 @@ class QueryParser:
try:
parsed_elements = self.grammar.parse_string(query_string, parse_all=True)
except pp.ParseException as e:
except pp.ParseException:
# If parsing fails, fall back to simple text search
return SearchQuery(text_terms=[query_string])
@@ -158,7 +159,7 @@ class QueryParser:
operator = ComparisonOperator.EQUALS
# Convert value to appropriate type
value = self._convert_value(field, value_str)
value = _convert_value(field, value_str)
field_filters.append(
FieldFilter(
@@ -171,32 +172,31 @@ class QueryParser:
return SearchQuery(text_terms=text_terms, field_filters=field_filters)
def _convert_value(
self, field: Field, value_str: str
) -> Union[str, int, float, date]:
"""Convert string value to appropriate type based on field."""
# Date fields
if field in [
Field.READ_DATE,
Field.BOUGHT_DATE,
Field.ADDED_DATE,
Field.LOANED_DATE,
]:
try:
return datetime.strptime(value_str, "%Y-%m-%d").date()
except ValueError:
return value_str
def _convert_value(field: Field, value_str: str) -> str | int | float | date:
"""Convert string value to appropriate type based on field."""
# Numeric fields
if field in [Field.RATING, Field.SHELF, Field.YEAR]:
try:
if "." in value_str:
return float(value_str)
else:
return int(value_str)
except ValueError:
return value_str
# Date fields
if field in {
Field.READ_DATE,
Field.BOUGHT_DATE,
Field.ADDED_DATE,
Field.LOANED_DATE,
}:
try:
return datetime.strptime(value_str, "%Y-%m-%d").date()
except ValueError:
return value_str
# String fields (default)
return value_str
# Numeric fields
if field in {Field.RATING, Field.SHELF, Field.YEAR}:
try:
if "." in value_str:
return float(value_str)
else:
return int(value_str)
except ValueError:
return value_str
# String fields (default)
return value_str

View File

@@ -1,738 +0,0 @@
"""
Business logic services for HXBooks.
Clean service layer for book management, reading tracking, and wishlist operations.
Separated from web interface concerns to enable both CLI and web access.
"""
from datetime import date, datetime
from typing import Any, Dict, List, Optional, Sequence, Union
from sqlalchemy import and_, or_, text
from sqlalchemy.orm import joinedload
from hxbooks.search import QueryParser
from .db import db
from .gbooks import fetch_google_book_data
from .models import Author, Book, Genre, Reading, User, Wishlist
from .search import ComparisonOperator, Field, FieldFilter
class BookService:
"""Service for book-related operations."""
def __init__(self):
self.query_parser = QueryParser()
def create_book(
self,
title: str,
owner_id: Optional[int] = None,
authors: Optional[List[str]] = None,
genres: Optional[List[str]] = None,
isbn: Optional[str] = None,
publisher: Optional[str] = None,
edition: Optional[str] = None,
description: Optional[str] = None,
notes: Optional[str] = None,
location_place: Optional[str] = None,
location_bookshelf: Optional[str] = None,
location_shelf: Optional[int] = None,
first_published: Optional[int] = None,
bought_date: Optional[date] = None,
) -> Book:
"""Create a new book with the given details."""
book = Book(
title=title,
owner_id=owner_id,
isbn=isbn or "",
publisher=publisher or "",
edition=edition or "",
description=description or "",
notes=notes or "",
location_place=location_place or "",
location_bookshelf=location_bookshelf or "",
location_shelf=location_shelf,
first_published=first_published,
bought_date=bought_date,
)
db.session.add(book)
# Handle authors
if authors:
for author_name in authors:
author_name = author_name.strip()
if author_name:
author = self._get_or_create_author(author_name)
book.authors.append(author)
# Handle genres
if genres:
for genre_name in genres:
genre_name = genre_name.strip()
if genre_name:
genre = self._get_or_create_genre(genre_name)
book.genres.append(genre)
db.session.commit()
return book
def get_book(self, book_id: int) -> Optional[Book]:
"""Get a book by ID with all relationships loaded."""
return db.session.execute(
db.select(Book)
.options(
joinedload(Book.authors),
joinedload(Book.genres),
joinedload(Book.owner),
)
.filter(Book.id == book_id)
).scalar_one_or_none()
def update_book(
self,
book_id: int,
title: Optional[str] = None,
authors: Optional[List[str]] = None,
genres: Optional[List[str]] = None,
isbn: Optional[str] = None,
publisher: Optional[str] = None,
edition: Optional[str] = None,
description: Optional[str] = None,
notes: Optional[str] = None,
location_place: Optional[str] = None,
location_bookshelf: Optional[str] = None,
location_shelf: Optional[int] = None,
first_published: Optional[int] = None,
bought_date: Optional[date] = None,
) -> Optional[Book]:
"""Update a book with new details."""
book = self.get_book(book_id)
if not book:
return None
# Update scalar fields
if title is not None:
book.title = title
if isbn is not None:
book.isbn = isbn
if publisher is not None:
book.publisher = publisher
if edition is not None:
book.edition = edition
if description is not None:
book.description = description
if notes is not None:
book.notes = notes
if location_place is not None:
book.location_place = location_place
if location_bookshelf is not None:
book.location_bookshelf = location_bookshelf
if location_shelf is not None:
book.location_shelf = location_shelf
if first_published is not None:
book.first_published = first_published
if bought_date is not None:
book.bought_date = bought_date
# Update authors
if authors is not None:
book.authors.clear()
for author_name in authors:
author_name = author_name.strip()
if author_name:
author = self._get_or_create_author(author_name)
book.authors.append(author)
# Update genres
if genres is not None:
book.genres.clear()
for genre_name in genres:
genre_name = genre_name.strip()
if genre_name:
genre = self._get_or_create_genre(genre_name)
book.genres.append(genre)
db.session.commit()
return book
def delete_book(self, book_id: int) -> bool:
"""Delete a book and all related data."""
book = self.get_book(book_id)
if not book:
return False
db.session.delete(book)
db.session.commit()
return True
def search_books(
self,
text_query: Optional[str] = None,
owner_username: Optional[str] = None,
location_place: Optional[str] = None,
location_bookshelf: Optional[str] = None,
location_shelf: Optional[int] = None,
author_name: Optional[str] = None,
genre_name: Optional[str] = None,
isbn: Optional[str] = None,
limit: int = 50,
) -> Sequence[Book]:
"""
Search books with various filters.
For now implements basic filtering - advanced query parsing will be added later.
"""
query = db.select(Book).options(
joinedload(Book.authors), joinedload(Book.genres), joinedload(Book.owner)
)
conditions = []
# Text search across multiple fields
if text_query:
text_query = text_query.strip()
if text_query:
# Create aliases to avoid table name conflicts
author_alias = db.aliased(Author)
genre_alias = db.aliased(Genre)
text_conditions = []
# Search in title, description, notes
text_conditions.append(Book.title.icontains(text_query))
text_conditions.append(Book.description.icontains(text_query))
text_conditions.append(Book.notes.icontains(text_query))
text_conditions.append(Book.publisher.icontains(text_query))
# Search in authors and genres via subqueries to avoid cartesian products
author_subquery = (
db.select(Book.id)
.join(Book.authors)
.filter(Author.name.icontains(text_query))
)
genre_subquery = (
db.select(Book.id)
.join(Book.genres)
.filter(Genre.name.icontains(text_query))
)
text_conditions.append(Book.id.in_(author_subquery))
text_conditions.append(Book.id.in_(genre_subquery))
conditions.append(or_(*text_conditions))
# Owner filter
if owner_username:
query = query.join(Book.owner)
conditions.append(User.username == owner_username)
# Location filters
if location_place:
conditions.append(Book.location_place.icontains(location_place))
if location_bookshelf:
conditions.append(Book.location_bookshelf.icontains(location_bookshelf))
if location_shelf is not None:
conditions.append(Book.location_shelf == location_shelf)
# Author filter
if author_name:
author_subquery = (
db.select(Book.id)
.join(Book.authors)
.filter(Author.name.icontains(author_name))
)
conditions.append(Book.id.in_(author_subquery))
# Genre filter
if genre_name:
genre_subquery = (
db.select(Book.id)
.join(Book.genres)
.filter(Genre.name.icontains(genre_name))
)
conditions.append(Book.id.in_(genre_subquery))
# ISBN filter
if isbn:
conditions.append(Book.isbn == isbn)
# Apply all conditions
if conditions:
query = query.filter(and_(*conditions))
query = query.distinct().limit(limit)
result = db.session.execute(query)
return result.scalars().unique().all()
def search_books_advanced(
self, query_string: str, limit: int = 50
) -> Sequence[Book]:
"""Advanced search with field filters supporting comparison operators."""
parsed_query = self.query_parser.parse(query_string)
query = db.select(Book).options(
joinedload(Book.authors), joinedload(Book.genres), joinedload(Book.owner)
)
conditions = []
# Text search across multiple fields (same as basic search)
if parsed_query.text_terms:
for text_query in parsed_query.text_terms:
text_query = text_query.strip()
if text_query:
text_conditions = []
# Search in title, description, notes
text_conditions.append(Book.title.icontains(text_query))
text_conditions.append(Book.description.icontains(text_query))
text_conditions.append(Book.notes.icontains(text_query))
text_conditions.append(Book.publisher.icontains(text_query))
# Search in authors and genres via subqueries
author_subquery = (
db.select(Book.id)
.join(Book.authors)
.filter(Author.name.icontains(text_query))
)
genre_subquery = (
db.select(Book.id)
.join(Book.genres)
.filter(Genre.name.icontains(text_query))
)
text_conditions.append(Book.id.in_(author_subquery))
text_conditions.append(Book.id.in_(genre_subquery))
conditions.append(or_(*text_conditions))
# Advanced field filters
if parsed_query.field_filters:
for field_filter in parsed_query.field_filters:
condition = self._build_field_condition(field_filter)
if condition is not None:
if field_filter.negated:
condition = ~condition
conditions.append(condition)
# Apply all conditions
if conditions:
query = query.filter(and_(*conditions))
query = query.distinct().limit(limit)
result = db.session.execute(query)
# return result.scalars().unique().all()
results = []
for book in result.scalars().unique().all():
results.append(
{
"id": book.id,
"title": book.title,
"authors": [author.name for author in book.authors],
"genres": [genre.name for genre in book.genres],
"owner": book.owner.username if book.owner else None,
"isbn": book.isbn,
"publisher": book.publisher,
"description": book.description,
"location": {
"place": book.location_place,
"bookshelf": book.location_bookshelf,
"shelf": book.location_shelf,
},
"loaned_to": book.loaned_to,
"loaned_date": book.loaned_date.isoformat()
if book.loaned_date
else None,
"added_date": book.added_date.isoformat(),
"bought_date": book.bought_date.isoformat()
if book.bought_date
else None,
}
)
return results
def _build_field_condition(self, field_filter: FieldFilter):
"""
Build a SQLAlchemy condition for a field filter.
"""
field = field_filter.field
operator = field_filter.operator
value = field_filter.value
# Map field names to Book attributes or special handling
if field == Field.TITLE:
field_attr = Book.title
elif field == Field.AUTHOR:
return Book.authors.any(self._apply_operator(Author.name, operator, value))
elif field == Field.GENRE:
return Book.genres.any(self._apply_operator(Genre.name, operator, value))
elif field == Field.ISBN:
field_attr = Book.isbn
elif field == Field.PLACE:
field_attr = Book.location_place
elif field == Field.BOOKSHELF:
field_attr = Book.location_bookshelf
elif field == Field.SHELF:
field_attr = Book.location_shelf
elif field == Field.ADDED_DATE:
field_attr = Book.added_date
elif field == Field.BOUGHT_DATE:
field_attr = Book.bought_date
elif field == Field.LOANED_DATE:
field_attr = Book.loaned_date
elif field == Field.OWNER:
return Book.owner.has(self._apply_operator(User.username, operator, value))
else:
# Unknown field, skip
return None
condition = self._apply_operator(field_attr, operator, value)
return condition
def _apply_operator(self, field_attr, operator, value):
"""
Apply a comparison operator to a field attribute.
"""
if operator == ComparisonOperator.EQUALS:
if isinstance(value, str):
return field_attr.icontains(
value
) # Case-insensitive contains for strings
else:
return field_attr == value
elif operator == ComparisonOperator.GREATER:
return field_attr > value
elif operator == ComparisonOperator.GREATER_EQUAL:
return field_attr >= value
elif operator == ComparisonOperator.LESS:
return field_attr < value
elif operator == ComparisonOperator.LESS_EQUAL:
return field_attr <= value
elif operator == ComparisonOperator.NOT_EQUALS:
if isinstance(value, str):
return ~field_attr.icontains(value)
else:
return field_attr != value
else:
# Default to equals
return field_attr == value
def import_book_from_isbn(
self,
isbn: str,
owner_id: Optional[int] = None,
location_place: Optional[str] = None,
location_bookshelf: Optional[str] = None,
location_shelf: Optional[int] = None,
) -> Book:
"""Import book data from Google Books API using ISBN."""
google_book_data = fetch_google_book_data(isbn)
if not google_book_data:
raise ValueError(f"No book data found for ISBN: {isbn}")
# Convert Google Books data to our format
authors = []
if google_book_data.authors:
authors = google_book_data.authors
genres = []
if google_book_data.categories:
genres = google_book_data.categories
return self.create_book(
title=google_book_data.title,
owner_id=owner_id,
authors=authors,
genres=genres,
isbn=isbn,
publisher=google_book_data.publisher or "",
description=google_book_data.description or "",
first_published=google_book_data.published_year,
location_place=location_place,
location_bookshelf=location_bookshelf,
location_shelf=location_shelf,
)
def get_books_by_location(
self, place: str, bookshelf: Optional[str] = None, shelf: Optional[int] = None
) -> Sequence[Book]:
"""Get all books at a specific location."""
return self.search_books(
location_place=place,
location_bookshelf=bookshelf,
location_shelf=shelf,
limit=1000, # Large limit for location queries
)
def _get_or_create_author(self, name: str) -> Author:
"""Get existing author or create a new one."""
author = db.session.execute(
db.select(Author).filter(Author.name == name)
).scalar_one_or_none()
if author is None:
author = Author(name=name)
db.session.add(author)
# Don't commit here - let the caller handle the transaction
return author
def _get_or_create_genre(self, name: str) -> Genre:
"""Get existing genre or create a new one."""
genre = db.session.execute(
db.select(Genre).filter(Genre.name == name)
).scalar_one_or_none()
if genre is None:
genre = Genre(name=name)
db.session.add(genre)
# Don't commit here - let the caller handle the transaction
return genre
class ReadingService:
"""Service for reading-related operations."""
def start_reading(
self, book_id: int, user_id: int, start_date: Optional[date] = None
) -> Reading:
"""Start a new reading session."""
# Check if book exists
book = db.session.get(Book, book_id)
if not book:
raise ValueError(f"Book not found: {book_id}")
# Check if user exists
user = db.session.get(User, user_id)
if not user:
raise ValueError(f"User not found: {user_id}")
# Check if already reading this book
existing_reading = db.session.execute(
db.select(Reading).filter(
and_(
Reading.book_id == book_id,
Reading.user_id == user_id,
Reading.end_date.is_(None), # Not finished yet
)
)
).scalar_one_or_none()
if existing_reading:
raise ValueError(
f"Already reading this book (reading session {existing_reading.id})"
)
reading = Reading(
book_id=book_id,
user_id=user_id,
start_date=start_date or datetime.now().date(),
)
db.session.add(reading)
db.session.commit()
return reading
def finish_reading(
self,
reading_id: int,
rating: Optional[int] = None,
comments: Optional[str] = None,
end_date: Optional[date] = None,
) -> Reading:
"""Finish a reading session."""
reading = db.session.execute(
db.select(Reading)
.options(joinedload(Reading.book))
.filter(Reading.id == reading_id)
).scalar_one_or_none()
if not reading:
raise ValueError(f"Reading session not found: {reading_id}")
if reading.end_date is not None:
raise ValueError(f"Reading session {reading_id} is already finished")
reading.end_date = end_date or datetime.now().date()
reading.finished = True
reading.dropped = False
if rating is not None:
if not (1 <= rating <= 5):
raise ValueError("Rating must be between 1 and 5")
reading.rating = rating
if comments is not None:
reading.comments = comments
db.session.commit()
return reading
def drop_reading(
self,
reading_id: int,
comments: Optional[str] = None,
end_date: Optional[date] = None,
) -> Reading:
"""Mark a reading session as dropped."""
reading = db.session.execute(
db.select(Reading)
.options(joinedload(Reading.book))
.filter(Reading.id == reading_id)
).scalar_one_or_none()
if not reading:
raise ValueError(f"Reading session not found: {reading_id}")
if reading.end_date is not None:
raise ValueError(f"Reading session {reading_id} is already finished")
reading.end_date = end_date or datetime.now().date()
reading.finished = False
reading.dropped = True
if comments is not None:
reading.comments = comments
db.session.commit()
return reading
def get_current_readings(self, user_id: int) -> Sequence[Reading]:
"""Get all current (unfinished) readings for a user."""
return (
db.session.execute(
db.select(Reading)
.options(joinedload(Reading.book).joinedload(Book.authors))
.filter(
and_(
Reading.user_id == user_id,
Reading.end_date.is_(None),
)
)
.order_by(Reading.start_date.desc())
)
.scalars()
.unique()
.all()
)
def get_reading_history(self, user_id: int, limit: int = 50) -> Sequence[Reading]:
"""Get reading history for a user."""
return (
db.session.execute(
db.select(Reading)
.options(joinedload(Reading.book).joinedload(Book.authors))
.filter(Reading.user_id == user_id)
.order_by(Reading.start_date.desc())
.limit(limit)
)
.scalars()
.unique()
.all()
)
class WishlistService:
"""Service for wishlist operations."""
def add_to_wishlist(self, book_id: int, user_id: int) -> Wishlist:
"""Add a book to user's wishlist."""
# Check if book exists
book = db.session.get(Book, book_id)
if not book:
raise ValueError(f"Book not found: {book_id}")
# Check if user exists
user = db.session.get(User, user_id)
if not user:
raise ValueError(f"User not found: {user_id}")
# Check if already in wishlist
existing = db.session.execute(
db.select(Wishlist).filter(
and_(
Wishlist.book_id == book_id,
Wishlist.user_id == user_id,
)
)
).scalar_one_or_none()
if existing:
raise ValueError("Book is already in wishlist")
wishlist_item = Wishlist(
book_id=book_id,
user_id=user_id,
)
db.session.add(wishlist_item)
db.session.commit()
return wishlist_item
def remove_from_wishlist(self, book_id: int, user_id: int) -> bool:
"""Remove a book from user's wishlist."""
wishlist_item = db.session.execute(
db.select(Wishlist).filter(
and_(
Wishlist.book_id == book_id,
Wishlist.user_id == user_id,
)
)
).scalar_one_or_none()
if not wishlist_item:
return False
db.session.delete(wishlist_item)
db.session.commit()
return True
def get_wishlist(self, user_id: int) -> Sequence[Wishlist]:
"""Get user's wishlist."""
return (
db.session.execute(
db.select(Wishlist)
.options(joinedload(Wishlist.book).joinedload(Book.authors))
.filter(Wishlist.user_id == user_id)
.order_by(Wishlist.wishlisted_date.desc())
)
.scalars()
.unique()
.all()
)
class UserService:
"""Service for user operations."""
def create_user(self, username: str) -> User:
"""Create a new user."""
# Check if username already exists
existing = db.session.execute(
db.select(User).filter(User.username == username)
).scalar_one_or_none()
if existing:
raise ValueError(f"Username '{username}' already exists")
user = User(username=username)
db.session.add(user)
db.session.commit()
return user
def get_user_by_username(self, username: str) -> Optional[User]:
"""Get a user by username."""
return db.session.execute(
db.select(User).filter(User.username == username)
).scalar_one_or_none()
def list_users(self) -> Sequence[User]:
"""List all users."""
return (
db.session.execute(db.select(User).order_by(User.username)).scalars().all()
)

View File

@@ -4,16 +4,17 @@ Test configuration and fixtures for HXBooks.
Provides isolated test database, Flask app instances, and CLI testing utilities.
"""
import tempfile
from collections.abc import Generator
from pathlib import Path
from typing import Generator
import pytest
from click.testing import CliRunner
from flask import Flask
from flask.testing import FlaskClient
from sqlalchemy.orm import Session
from hxbooks import cli, create_app
from hxbooks import cli
from hxbooks.app import create_app
from hxbooks.db import db
from hxbooks.models import User
@@ -64,7 +65,7 @@ def test_user(app: Flask) -> User:
@pytest.fixture
def db_session(app: Flask):
def db_session(app: Flask) -> Generator[Session]:
"""Create database session for direct database testing."""
with app.app_context():
yield db.session

View File

@@ -6,22 +6,21 @@ Tests all CLI commands for correct behavior, database integration, and output fo
import json
import re
import tempfile
from datetime import date
from pathlib import Path
import pytest
from click.testing import CliRunner
from flask import Flask
from hxbooks.cli import cli
from hxbooks.db import db
from hxbooks.models import Author, Book, Genre, Reading, User, Wishlist
from hxbooks.models import Author, Book, Genre, Reading, User
class TestBookAddCommand:
"""Test the 'hxbooks book add' command."""
def test_book_add_basic(self, app, cli_runner):
def test_book_add_basic(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test basic book addition with title and owner."""
# Run the CLI command
result = cli_runner.invoke(
@@ -70,7 +69,8 @@ class TestBookAddCommand:
# Check book was created with correct fields
books = (
db.session.execute(db.select(Book).join(Book.authors).join(Book.genres))
db.session
.execute(db.select(Book).join(Book.authors).join(Book.genres))
.unique()
.scalars()
.all()
@@ -106,7 +106,7 @@ class TestBookAddCommand:
assert book in genre.books
assert genre in book.genres
def test_book_add_minimal_fields(self, app, cli_runner):
def test_book_add_minimal_fields(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test book addition with only required fields."""
result = cli_runner.invoke(
cli, ["book", "add", "Minimal Book", "--owner", "alice"]
@@ -124,7 +124,9 @@ class TestBookAddCommand:
assert len(book.authors) == 0 # No authors provided
assert len(book.genres) == 0 # No genres provided
def test_book_add_missing_owner_fails(self, app, cli_runner):
def test_book_add_missing_owner_fails(
self, app: Flask, cli_runner: CliRunner
) -> None:
"""Test that book addition fails when owner is not provided."""
result = cli_runner.invoke(
cli,
@@ -144,14 +146,14 @@ class TestBookAddCommand:
class TestBookListCommand:
"""Test the 'hxbooks book list' command."""
def test_book_list_empty(self, app, cli_runner):
def test_book_list_empty(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test listing books when database is empty."""
result = cli_runner.invoke(cli, ["book", "list"])
assert result.exit_code == 0
assert "No books found." in result.output
def test_book_list_with_books(self, app, cli_runner):
def test_book_list_with_books(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test listing books in table format."""
# Add test data
cli_runner.invoke(
@@ -172,7 +174,7 @@ class TestBookListCommand:
assert "alice" in result.output
assert "bob" in result.output
def test_book_list_json_format(self, app, cli_runner):
def test_book_list_json_format(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test listing books in JSON format."""
# Add test data
cli_runner.invoke(
@@ -201,7 +203,7 @@ class TestBookListCommand:
assert book["owner"] == "alice"
assert book["isbn"] == "1234567890"
def test_book_list_filter_by_owner(self, app, cli_runner):
def test_book_list_filter_by_owner(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test filtering books by owner."""
# Add books for different owners
cli_runner.invoke(cli, ["book", "add", "Alice Book", "--owner", "alice"])
@@ -213,7 +215,9 @@ class TestBookListCommand:
assert "Alice Book" in result.output
assert "Bob Book" not in result.output
def test_book_list_filter_by_location(self, app, cli_runner):
def test_book_list_filter_by_location(
self, app: Flask, cli_runner: CliRunner
) -> None:
"""Test filtering books by location."""
# Add books in different locations
cli_runner.invoke(
@@ -271,7 +275,7 @@ class TestBookListCommand:
class TestBookSearchCommand:
"""Test the 'hxbooks book search' command."""
def test_book_search_basic(self, app, cli_runner):
def test_book_search_basic(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test basic book search functionality."""
# Add test books
cli_runner.invoke(
@@ -309,14 +313,14 @@ class TestBookSearchCommand:
assert "The Hobbit" in result.output
assert "Dune" not in result.output
def test_book_search_no_results(self, app, cli_runner):
def test_book_search_no_results(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test search with no matching results."""
result = cli_runner.invoke(cli, ["book", "search", "nonexistent"])
assert result.exit_code == 0
assert "No books found." in result.output
def test_book_search_json_format(self, app, cli_runner):
def test_book_search_json_format(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test book search with JSON output."""
cli_runner.invoke(
cli,
@@ -378,8 +382,8 @@ class TestBookSearchCommand:
],
)
def test_book_search_advanced_queries(
self, app, cli_runner, query, expected_titles
):
self, app: Flask, cli_runner: CliRunner, query: str, expected_titles: list[str]
) -> None:
"""Test advanced search queries with various field filters."""
# Set up comprehensive test data
self._setup_search_test_data(app, cli_runner)
@@ -400,7 +404,7 @@ class TestBookSearchCommand:
f"Query '{query}' expected {expected_titles}, got {actual_titles}"
)
def _setup_search_test_data(self, app, cli_runner):
def _setup_search_test_data(self, app: Flask, cli_runner: CliRunner) -> None:
"""Set up comprehensive test data for advanced search testing."""
# Book 1: The Hobbit - Fantasy, high rating, shelf 1, home
cli_runner.invoke(
@@ -517,7 +521,8 @@ class TestBookSearchCommand:
with app.app_context():
# Get reading session IDs
readings = (
db.session.execute(db.select(Reading).order_by(Reading.id))
db.session
.execute(db.select(Reading).order_by(Reading.id))
.scalars()
.all()
)
@@ -539,16 +544,20 @@ class TestBookSearchCommand:
# Update one book with bought_date for date filter testing
with app.app_context():
prog_book = db.session.get(Book, prog_id)
assert prog_book is not None
prog_book.bought_date = date(2025, 12, 1) # Before 2026-01-01
prog_book.first_published = 2000
hobbit_book = db.session.get(Book, hobbit_id)
assert hobbit_book is not None
hobbit_book.first_published = 1937
fellowship_book = db.session.get(Book, fellowship_id)
assert fellowship_book is not None
fellowship_book.first_published = 1954
dune_book = db.session.get(Book, dune_id)
assert dune_book is not None
dune_book.first_published = 1965
db.session.commit()
@@ -557,7 +566,7 @@ class TestBookSearchCommand:
class TestReadingCommands:
"""Test reading-related CLI commands."""
def test_reading_start_basic(self, app, cli_runner):
def test_reading_start_basic(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test starting a reading session."""
# Add a book first
result = cli_runner.invoke(
@@ -566,7 +575,6 @@ class TestReadingCommands:
assert result.exit_code == 0
# Extract book ID from output
import re
book_id_match = re.search(r"ID: (\d+)", result.output)
assert book_id_match
@@ -578,10 +586,12 @@ class TestReadingCommands:
)
assert result.exit_code == 0
assert f"Started reading session" in result.output
assert "Started reading session" in result.output
assert f"for book {book_id}" in result.output
def test_reading_finish_with_rating(self, app, cli_runner):
def test_reading_finish_with_rating(
self, app: Flask, cli_runner: CliRunner
) -> None:
"""Test finishing a reading session with rating."""
# Add book and start reading
cli_runner.invoke(cli, ["book", "add", "Test Book", "--owner", "alice"])
@@ -597,7 +607,6 @@ class TestReadingCommands:
assert result.exit_code == 0
# Extract reading session ID
import re
reading_id_match = re.search(r"Started reading session (\d+)", result.output)
assert reading_id_match
@@ -621,7 +630,7 @@ class TestReadingCommands:
assert "Finished reading: Test Book" in result.output
assert "Rating: 4/5" in result.output
def test_reading_drop(self, app, cli_runner):
def test_reading_drop(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test dropping a reading session."""
# Add book and start reading
cli_runner.invoke(cli, ["book", "add", "Boring Book", "--owner", "alice"])
@@ -634,9 +643,8 @@ class TestReadingCommands:
cli, ["reading", "start", str(book_id), "--owner", "alice"]
)
import re
reading_id_match = re.search(r"Started reading session (\d+)", result.output)
assert reading_id_match is not None
reading_id = reading_id_match.group(1)
# Drop the reading
@@ -647,7 +655,7 @@ class TestReadingCommands:
assert result.exit_code == 0
assert "Dropped reading: Boring Book" in result.output
def test_reading_list_current(self, app, cli_runner):
def test_reading_list_current(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test listing current (unfinished) readings."""
# Add book and start reading
cli_runner.invoke(cli, ["book", "add", "Current Book", "--owner", "alice"])
@@ -666,7 +674,7 @@ class TestReadingCommands:
assert "Current Book" in result.output
assert "Reading" in result.output
def test_reading_list_json_format(self, app, cli_runner):
def test_reading_list_json_format(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test listing readings in JSON format."""
# Add book and start reading
cli_runner.invoke(cli, ["book", "add", "JSON Book", "--owner", "alice"])
@@ -692,7 +700,7 @@ class TestReadingCommands:
class TestWishlistCommands:
"""Test wishlist-related CLI commands."""
def test_wishlist_add(self, app, cli_runner):
def test_wishlist_add(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test adding a book to wishlist."""
# Add a book first
cli_runner.invoke(cli, ["book", "add", "Desired Book", "--owner", "alice"])
@@ -708,7 +716,7 @@ class TestWishlistCommands:
assert result.exit_code == 0
assert "Added 'Desired Book' to wishlist" in result.output
def test_wishlist_remove(self, app, cli_runner):
def test_wishlist_remove(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test removing a book from wishlist."""
# Add book and add to wishlist
cli_runner.invoke(cli, ["book", "add", "Unwanted Book", "--owner", "alice"])
@@ -726,7 +734,9 @@ class TestWishlistCommands:
assert result.exit_code == 0
assert f"Removed book {book_id} from wishlist" in result.output
def test_wishlist_remove_not_in_list(self, app, cli_runner):
def test_wishlist_remove_not_in_list(
self, app: Flask, cli_runner: CliRunner
) -> None:
"""Test removing a book that's not in wishlist."""
# Add book but don't add to wishlist
cli_runner.invoke(cli, ["book", "add", "Not Wished Book", "--owner", "alice"])
@@ -742,14 +752,14 @@ class TestWishlistCommands:
assert result.exit_code == 0
assert f"Book {book_id} was not in wishlist" in result.output
def test_wishlist_list_empty(self, app, cli_runner):
def test_wishlist_list_empty(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test listing empty wishlist."""
result = cli_runner.invoke(cli, ["wishlist", "list", "--owner", "alice"])
assert result.exit_code == 0
assert "Wishlist is empty." in result.output
def test_wishlist_list_with_items(self, app, cli_runner):
def test_wishlist_list_with_items(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test listing wishlist with items."""
# Add books and add to wishlist
cli_runner.invoke(
@@ -793,7 +803,7 @@ class TestWishlistCommands:
assert "Author One" in result.output
assert "Author Two" in result.output
def test_wishlist_list_json_format(self, app, cli_runner):
def test_wishlist_list_json_format(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test listing wishlist in JSON format."""
cli_runner.invoke(
cli,
@@ -829,14 +839,14 @@ class TestWishlistCommands:
class TestDatabaseCommands:
"""Test database management CLI commands."""
def test_db_init(self, app, cli_runner):
def test_db_init(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test database initialization."""
result = cli_runner.invoke(cli, ["db", "init"])
assert result.exit_code == 0
assert "Database initialized." in result.output
def test_db_seed(self, app, cli_runner):
def test_db_seed(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test database seeding with sample data."""
result = cli_runner.invoke(cli, ["db", "seed", "--owner", "test_owner"])
@@ -855,7 +865,7 @@ class TestDatabaseCommands:
assert "Dune" in titles
assert "The Pragmatic Programmer" in titles
def test_db_status_empty(self, app, cli_runner):
def test_db_status_empty(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test database status with empty database."""
result = cli_runner.invoke(cli, ["db", "status"])
@@ -868,7 +878,7 @@ class TestDatabaseCommands:
assert "Reading sessions: 0" in result.output
assert "Wishlist items: 0" in result.output
def test_db_status_with_data(self, app, cli_runner):
def test_db_status_with_data(self, app: Flask, cli_runner: CliRunner) -> None:
"""Test database status with sample data."""
# Add some test data
cli_runner.invoke(
@@ -908,21 +918,27 @@ class TestDatabaseCommands:
class TestErrorScenarios:
"""Test error handling and edge cases."""
def test_reading_start_invalid_book_id(self, app, cli_runner):
def test_reading_start_invalid_book_id(
self, app: Flask, cli_runner: CliRunner
) -> None:
"""Test starting reading with non-existent book ID."""
result = cli_runner.invoke(cli, ["reading", "start", "999", "--owner", "alice"])
assert result.exit_code == 1
assert "Error starting reading:" in result.output
def test_wishlist_add_invalid_book_id(self, app, cli_runner):
def test_wishlist_add_invalid_book_id(
self, app: Flask, cli_runner: CliRunner
) -> None:
"""Test adding non-existent book to wishlist."""
result = cli_runner.invoke(cli, ["wishlist", "add", "999", "--owner", "alice"])
assert result.exit_code == 1
assert "Error adding to wishlist:" in result.output
def test_reading_finish_invalid_reading_id(self, app, cli_runner):
def test_reading_finish_invalid_reading_id(
self, app: Flask, cli_runner: CliRunner
) -> None:
"""Test finishing non-existent reading session."""
result = cli_runner.invoke(cli, ["reading", "finish", "999"])

View File

@@ -6,16 +6,15 @@ field filters, and edge case handling.
"""
from datetime import date
from typing import List
import pytest
from hxbooks.search import (
ComparisonOperator,
Field,
FieldFilter,
QueryParser,
SearchQuery,
_convert_value, # noqa: PLC2701
)
@@ -28,31 +27,31 @@ def parser() -> QueryParser:
class TestQueryParser:
"""Test the QueryParser class functionality."""
def test_parse_empty_query(self, parser: QueryParser):
def test_parse_empty_query(self, parser: QueryParser) -> None:
"""Test parsing an empty query string."""
result = parser.parse("")
assert result.text_terms == []
assert result.field_filters == []
def test_parse_whitespace_only(self, parser: QueryParser):
def test_parse_whitespace_only(self, parser: QueryParser) -> None:
"""Test parsing a query with only whitespace."""
result = parser.parse(" \t\n ")
assert result.text_terms == []
assert result.field_filters == []
def test_parse_simple_text_terms(self, parser: QueryParser):
def test_parse_simple_text_terms(self, parser: QueryParser) -> None:
"""Test parsing simple text search terms."""
result = parser.parse("hobbit tolkien")
assert result.text_terms == ["hobbit", "tolkien"]
assert result.field_filters == []
def test_parse_quoted_text_terms(self, parser: QueryParser):
def test_parse_quoted_text_terms(self, parser: QueryParser) -> None:
"""Test parsing quoted text search terms."""
result = parser.parse('"the hobbit" tolkien')
assert result.text_terms == ["the hobbit", "tolkien"]
assert result.field_filters == []
def test_parse_quoted_text_with_spaces(self, parser: QueryParser):
def test_parse_quoted_text_with_spaces(self, parser: QueryParser) -> None:
"""Test parsing quoted text containing multiple spaces."""
result = parser.parse('"lord of the rings"')
assert result.text_terms == ["lord of the rings"]
@@ -62,7 +61,7 @@ class TestQueryParser:
class TestFieldFilters:
"""Test field filter parsing."""
def test_parse_title_filter(self, parser: QueryParser):
def test_parse_title_filter(self, parser: QueryParser) -> None:
"""Test parsing title field filter."""
result = parser.parse("title:hobbit")
assert len(result.field_filters) == 1
@@ -72,7 +71,7 @@ class TestFieldFilters:
assert filter.value == "hobbit"
assert filter.negated is False
def test_parse_quoted_title_filter(self, parser: QueryParser):
def test_parse_quoted_title_filter(self, parser: QueryParser) -> None:
"""Test parsing quoted title field filter."""
result = parser.parse('title:"the hobbit"')
assert len(result.field_filters) == 1
@@ -80,7 +79,7 @@ class TestFieldFilters:
assert filter.field == Field.TITLE
assert filter.value == "the hobbit"
def test_parse_author_filter(self, parser: QueryParser):
def test_parse_author_filter(self, parser: QueryParser) -> None:
"""Test parsing author field filter."""
result = parser.parse("author:tolkien")
assert len(result.field_filters) == 1
@@ -88,7 +87,7 @@ class TestFieldFilters:
assert filter.field == Field.AUTHOR
assert filter.value == "tolkien"
def test_parse_negated_filter(self, parser: QueryParser):
def test_parse_negated_filter(self, parser: QueryParser) -> None:
"""Test parsing negated field filter."""
result = parser.parse("-genre:romance")
assert len(result.field_filters) == 1
@@ -97,7 +96,7 @@ class TestFieldFilters:
assert filter.value == "romance"
assert filter.negated is True
def test_parse_multiple_filters(self, parser: QueryParser):
def test_parse_multiple_filters(self, parser: QueryParser) -> None:
"""Test parsing multiple field filters."""
result = parser.parse("author:tolkien genre:fantasy")
assert len(result.field_filters) == 2
@@ -108,7 +107,7 @@ class TestFieldFilters:
genre_filter = next(f for f in result.field_filters if f.field == Field.GENRE)
assert genre_filter.value == "fantasy"
def test_parse_mixed_filters_and_text(self, parser: QueryParser):
def test_parse_mixed_filters_and_text(self, parser: QueryParser) -> None:
"""Test parsing mix of field filters and text terms."""
result = parser.parse('epic author:tolkien "middle earth"')
assert "epic" in result.text_terms
@@ -133,8 +132,11 @@ class TestComparisonOperators:
],
)
def test_parse_comparison_operators(
self, parser: QueryParser, operator_str, expected_operator
):
self,
parser: QueryParser,
operator_str: str,
expected_operator: ComparisonOperator,
) -> None:
"""Test parsing all supported comparison operators."""
query = f"rating{operator_str}4"
result = parser.parse(query)
@@ -145,7 +147,7 @@ class TestComparisonOperators:
assert filter.operator == expected_operator
assert filter.value == 4
def test_parse_date_comparison(self, parser: QueryParser):
def test_parse_date_comparison(self, parser: QueryParser) -> None:
"""Test parsing date comparison operators."""
result = parser.parse("added>=2026-03-15")
assert len(result.field_filters) == 1
@@ -154,7 +156,7 @@ class TestComparisonOperators:
assert filter.operator == ComparisonOperator.GREATER_EQUAL
assert filter.value == date(2026, 3, 15)
def test_parse_numeric_comparison(self, parser: QueryParser):
def test_parse_numeric_comparison(self, parser: QueryParser) -> None:
"""Test parsing numeric comparison operators."""
result = parser.parse("shelf>2")
assert len(result.field_filters) == 1
@@ -167,79 +169,77 @@ class TestComparisonOperators:
class TestTypeConversion:
"""Test the _convert_value method for different field types."""
def test_convert_date_field_valid(self, parser: QueryParser):
def test_convert_date_field_valid(self, parser: QueryParser) -> None:
"""Test converting valid date strings for date fields."""
result = parser._convert_value(Field.BOUGHT_DATE, "2026-03-15")
result = _convert_value(Field.BOUGHT_DATE, "2026-03-15")
assert result == date(2026, 3, 15)
result = parser._convert_value(Field.READ_DATE, "2025-12-31")
result = _convert_value(Field.READ_DATE, "2025-12-31")
assert result == date(2025, 12, 31)
result = parser._convert_value(Field.ADDED_DATE, "2024-01-01")
result = _convert_value(Field.ADDED_DATE, "2024-01-01")
assert result == date(2024, 1, 1)
def test_convert_date_field_invalid(self, parser: QueryParser):
def test_convert_date_field_invalid(self, parser: QueryParser) -> None:
"""Test converting invalid date strings falls back to string."""
result = parser._convert_value(Field.BOUGHT_DATE, "invalid-date")
result = _convert_value(Field.BOUGHT_DATE, "invalid-date")
assert result == "invalid-date"
result = parser._convert_value(
Field.READ_DATE, "2026-13-45"
) # Invalid month/day
result = _convert_value(Field.READ_DATE, "2026-13-45") # Invalid month/day
assert result == "2026-13-45"
result = parser._convert_value(Field.ADDED_DATE, "not-a-date")
result = _convert_value(Field.ADDED_DATE, "not-a-date")
assert result == "not-a-date"
def test_convert_numeric_field_integers(self, parser: QueryParser):
def test_convert_numeric_field_integers(self, parser: QueryParser) -> None:
"""Test converting integer strings for numeric fields."""
result = parser._convert_value(Field.RATING, "5")
result = _convert_value(Field.RATING, "5")
assert result == 5
assert isinstance(result, int)
result = parser._convert_value(Field.SHELF, "10")
result = _convert_value(Field.SHELF, "10")
assert result == 10
result = parser._convert_value(Field.YEAR, "2026")
result = _convert_value(Field.YEAR, "2026")
assert result == 2026
def test_convert_numeric_field_floats(self, parser: QueryParser):
def test_convert_numeric_field_floats(self, parser: QueryParser) -> None:
"""Test converting float strings for numeric fields."""
result = parser._convert_value(Field.RATING, "4.5")
assert result == 4.5
result = _convert_value(Field.RATING, "4.5")
assert result == pytest.approx(4.5)
assert isinstance(result, float)
result = parser._convert_value(Field.SHELF, "2.0")
assert result == 2.0
result = _convert_value(Field.SHELF, "2.0")
assert result == pytest.approx(2.0)
def test_convert_numeric_field_invalid(self, parser: QueryParser):
def test_convert_numeric_field_invalid(self, parser: QueryParser) -> None:
"""Test converting invalid numeric strings falls back to string."""
result = parser._convert_value(Field.RATING, "not-a-number")
result = _convert_value(Field.RATING, "not-a-number")
assert result == "not-a-number"
result = parser._convert_value(Field.SHELF, "abc")
result = _convert_value(Field.SHELF, "abc")
assert result == "abc"
result = parser._convert_value(Field.YEAR, "twenty-twenty-six")
result = _convert_value(Field.YEAR, "twenty-twenty-six")
assert result == "twenty-twenty-six"
def test_convert_string_fields(self, parser: QueryParser):
def test_convert_string_fields(self, parser: QueryParser) -> None:
"""Test converting values for string fields returns as-is."""
result = parser._convert_value(Field.TITLE, "The Hobbit")
result = _convert_value(Field.TITLE, "The Hobbit")
assert result == "The Hobbit"
result = parser._convert_value(Field.AUTHOR, "Tolkien")
result = _convert_value(Field.AUTHOR, "Tolkien")
assert result == "Tolkien"
result = parser._convert_value(Field.GENRE, "Fantasy")
result = _convert_value(Field.GENRE, "Fantasy")
assert result == "Fantasy"
# Even things that look like dates/numbers should stay as strings for string fields
result = parser._convert_value(Field.TITLE, "2026-03-15")
result = _convert_value(Field.TITLE, "2026-03-15")
assert result == "2026-03-15"
assert isinstance(result, str)
result = parser._convert_value(Field.AUTHOR, "123")
result = _convert_value(Field.AUTHOR, "123")
assert result == "123"
assert isinstance(result, str)
@@ -247,13 +247,13 @@ class TestTypeConversion:
class TestParsingEdgeCases:
"""Test edge cases and error handling in query parsing."""
def test_parse_invalid_field_name(self, parser: QueryParser):
def test_parse_invalid_field_name(self, parser: QueryParser) -> None:
"""Test parsing with invalid field names falls back to text search."""
result = parser.parse("invalid_field:value")
# Should fall back to treating the whole thing as text
assert len(result.text_terms) >= 1 or len(result.field_filters) == 0
def test_parse_mixed_quotes_and_operators(self, parser: QueryParser):
def test_parse_mixed_quotes_and_operators(self, parser: QueryParser) -> None:
"""Test parsing complex queries with quotes and operators."""
result = parser.parse('title:"The Lord" author:tolkien rating>=4')
@@ -278,35 +278,36 @@ class TestParsingEdgeCases:
assert rating_filter.value == 4
assert rating_filter.operator == ComparisonOperator.GREATER_EQUAL
def test_parse_escaped_quotes(self, parser: QueryParser):
def test_parse_escaped_quotes(self, parser: QueryParser) -> None:
"""Test parsing strings with escaped quotes."""
result = parser.parse(r'title:"She said \"hello\""')
if result.field_filters:
# If parsing succeeds, check the escaped quote handling
filter = result.field_filters[0]
assert isinstance(filter.value, str)
assert "hello" in filter.value
# If parsing fails, it should fall back gracefully
def test_parse_special_characters(self, parser: QueryParser):
def test_parse_special_characters(self, parser: QueryParser) -> None:
"""Test parsing queries with special characters."""
result = parser.parse("title:C++ author:Stroustrup")
# Should handle the + characters gracefully
assert len(result.field_filters) >= 1 or len(result.text_terms) >= 1
def test_parse_very_long_query(self, parser: QueryParser):
def test_parse_very_long_query(self, parser: QueryParser) -> None:
"""Test parsing very long query strings."""
long_value = "a" * 1000
result = parser.parse(f"title:{long_value}")
# Should handle long strings without crashing
assert isinstance(result, SearchQuery)
def test_parse_unicode_characters(self, parser: QueryParser):
def test_parse_unicode_characters(self, parser: QueryParser) -> None:
"""Test parsing queries with unicode characters."""
result = parser.parse("title:Café author:José")
# Should handle unicode gracefully
assert isinstance(result, SearchQuery)
def test_fallback_behavior_on_parse_error(self, parser: QueryParser):
def test_fallback_behavior_on_parse_error(self, parser: QueryParser) -> None:
"""Test that invalid syntax falls back to text search."""
# Construct a query that should cause parse errors
invalid_queries = [
@@ -327,7 +328,7 @@ class TestParsingEdgeCases:
class TestComplexQueries:
"""Test parsing of complex, real-world query examples."""
def test_parse_realistic_book_search(self, parser: QueryParser):
def test_parse_realistic_book_search(self, parser: QueryParser) -> None:
"""Test parsing realistic book search queries."""
result = parser.parse(
'author:tolkien genre:fantasy -genre:romance rating>=4 "middle earth"'
@@ -363,7 +364,7 @@ class TestComplexQueries:
assert romance_filter.value == "romance"
assert romance_filter.negated is True
def test_parse_location_and_date_filters(self, parser: QueryParser):
def test_parse_location_and_date_filters(self, parser: QueryParser) -> None:
"""Test parsing location and date-based queries."""
result = parser.parse("place:home bookshelf:fantasy shelf>=2 added>=2026-01-01")
@@ -372,21 +373,24 @@ class TestComplexQueries:
place_filter = next(
(f for f in result.field_filters if f.field == Field.PLACE), None
)
assert place_filter is not None
assert place_filter.value == "home"
shelf_filter = next(
(f for f in result.field_filters if f.field == Field.SHELF), None
)
assert shelf_filter is not None
assert shelf_filter.value == 2
assert shelf_filter.operator == ComparisonOperator.GREATER_EQUAL
added_filter = next(
(f for f in result.field_filters if f.field == Field.ADDED_DATE), None
)
assert added_filter is not None
assert added_filter.value == date(2026, 1, 1)
assert added_filter.operator == ComparisonOperator.GREATER_EQUAL
def test_parse_mixed_types_comprehensive(self, parser: QueryParser):
def test_parse_mixed_types_comprehensive(self, parser: QueryParser) -> None:
"""Test parsing query with all major field types."""
query = 'title:"Complex Book" author:Author year=2020 rating>=4 bought<=2025-12-31 -genre:boring epic adventure'
result = parser.parse(query)

191
uv.lock generated
View File

@@ -43,6 +43,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/9a/3c/c17fb3ca2d9c3acff52e30b309f538586f9f5b9c9cf454f3845fc9af4881/certifi-2026.2.25-py3-none-any.whl", hash = "sha256:027692e4402ad994f1c42e52a4997a9763c646b73e4096e4d5d6db8af1d6f0fa", size = 153684, upload-time = "2026-02-25T02:54:15.766Z" },
]
[[package]]
name = "cfgv"
version = "3.5.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/4e/b5/721b8799b04bf9afe054a3899c6cf4e880fcf8563cc71c15610242490a0c/cfgv-3.5.0.tar.gz", hash = "sha256:d5b1034354820651caa73ede66a6294d6e95c1b00acc5e9b098e917404669132", size = 7334, upload-time = "2025-11-19T20:55:51.612Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/db/3c/33bac158f8ab7f89b2e59426d5fe2e4f63f7ed25df84c036890172b412b5/cfgv-3.5.0-py2.py3-none-any.whl", hash = "sha256:a8dc6b26ad22ff227d2634a65cb388215ce6cc96bbcc5cfde7641ae87e8dacc0", size = 7445, upload-time = "2025-11-19T20:55:50.744Z" },
]
[[package]]
name = "charset-normalizer"
version = "3.4.5"
@@ -89,6 +98,24 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "distlib"
version = "0.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/96/8e/709914eb2b5749865801041647dc7f4e6d00b549cfe88b65ca192995f07c/distlib-0.4.0.tar.gz", hash = "sha256:feec40075be03a04501a973d81f633735b4b69f98b05450592310c0f401a4e0d", size = 614605, upload-time = "2025-07-17T16:52:00.465Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/33/6b/e0547afaf41bf2c42e52430072fa5658766e3d65bd4b03a563d1b6336f57/distlib-0.4.0-py2.py3-none-any.whl", hash = "sha256:9659f7d87e46584a30b5780e43ac7a2143098441670ff0a49d5f9034c54a6c16", size = 469047, upload-time = "2025-07-17T16:51:58.613Z" },
]
[[package]]
name = "filelock"
version = "3.25.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/94/b8/00651a0f559862f3bb7d6f7477b192afe3f583cc5e26403b44e59a55ab34/filelock-3.25.2.tar.gz", hash = "sha256:b64ece2b38f4ca29dd3e810287aa8c48182bbecd1ae6e9ae126c9b35f1382694", size = 40480, upload-time = "2026-03-11T20:45:38.487Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a4/a5/842ae8f0c08b61d6484b52f99a03510a3a72d23141942d216ebe81fefbce/filelock-3.25.2-py3-none-any.whl", hash = "sha256:ca8afb0da15f229774c9ad1b455ed96e85a81373065fb10446672f64444ddf70", size = 26759, upload-time = "2026-03-11T20:45:37.437Z" },
]
[[package]]
name = "flask"
version = "3.1.3"
@@ -195,11 +222,18 @@ dependencies = [
{ name = "jinja2-fragments" },
{ name = "pydantic" },
{ name = "pyparsing" },
{ name = "pytest" },
{ name = "requests" },
{ name = "sqlalchemy" },
]
[package.dev-dependencies]
dev = [
{ name = "pre-commit" },
{ name = "pytest" },
{ name = "ruff" },
{ name = "ty" },
]
[package.metadata]
requires-dist = [
{ name = "alembic", specifier = ">=1.13.0" },
@@ -212,11 +246,27 @@ requires-dist = [
{ name = "jinja2-fragments", specifier = ">=1.11.0" },
{ name = "pydantic", specifier = ">=2.12.5" },
{ name = "pyparsing", specifier = ">=3.3.2" },
{ name = "pytest", specifier = ">=9.0.2" },
{ name = "requests", specifier = ">=2.32.5" },
{ name = "sqlalchemy", specifier = ">=2.0.48" },
]
[package.metadata.requires-dev]
dev = [
{ name = "pre-commit", specifier = ">=4.5.1" },
{ name = "pytest", specifier = ">=9.0.2" },
{ name = "ruff", specifier = ">=0.15.6" },
{ name = "ty", specifier = ">=0.0.23" },
]
[[package]]
name = "identify"
version = "2.6.18"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/46/c4/7fb4db12296cdb11893d61c92048fe617ee853f8523b9b296ac03b43757e/identify-2.6.18.tar.gz", hash = "sha256:873ac56a5e3fd63e7438a7ecbc4d91aca692eb3fefa4534db2b7913f3fc352fd", size = 99580, upload-time = "2026-03-15T18:39:50.319Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/46/33/92ef41c6fad0233e41d3d84ba8e8ad18d1780f1e5d99b3c683e6d7f98b63/identify-2.6.18-py2.py3-none-any.whl", hash = "sha256:8db9d3c8ea9079db92cafb0ebf97abdc09d52e97f4dcf773a2e694048b7cd737", size = 99394, upload-time = "2026-03-15T18:39:48.915Z" },
]
[[package]]
name = "idna"
version = "3.11"
@@ -310,6 +360,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/70/bc/6f1c2f612465f5fa89b95bead1f44dcb607670fd42891d8fdcd5d039f4f4/markupsafe-3.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:32001d6a8fc98c8cb5c947787c5d08b0a50663d139f1305bac5885d98d9b40fa", size = 14146, upload-time = "2025-09-27T18:37:28.327Z" },
]
[[package]]
name = "nodeenv"
version = "1.10.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/24/bf/d1bda4f6168e0b2e9e5958945e01910052158313224ada5ce1fb2e1113b8/nodeenv-1.10.0.tar.gz", hash = "sha256:996c191ad80897d076bdfba80a41994c2b47c68e224c542b48feba42ba00f8bb", size = 55611, upload-time = "2025-12-20T14:08:54.006Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/88/b2/d0896bdcdc8d28a7fc5717c305f1a861c26e18c05047949fb371034d98bd/nodeenv-1.10.0-py2.py3-none-any.whl", hash = "sha256:5bb13e3eed2923615535339b3c620e76779af4cb4c6a90deccc9e36b274d3827", size = 23438, upload-time = "2025-12-20T14:08:52.782Z" },
]
[[package]]
name = "packaging"
version = "26.0"
@@ -319,6 +378,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/b9/c538f279a4e237a006a2c98387d081e9eb060d203d8ed34467cc0f0b9b53/packaging-26.0-py3-none-any.whl", hash = "sha256:b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529", size = 74366, upload-time = "2026-01-21T20:50:37.788Z" },
]
[[package]]
name = "platformdirs"
version = "4.9.4"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/19/56/8d4c30c8a1d07013911a8fdbd8f89440ef9f08d07a1b50ab8ca8be5a20f9/platformdirs-4.9.4.tar.gz", hash = "sha256:1ec356301b7dc906d83f371c8f487070e99d3ccf9e501686456394622a01a934", size = 28737, upload-time = "2026-03-05T18:34:13.271Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/63/d7/97f7e3a6abb67d8080dd406fd4df842c2be0efaf712d1c899c32a075027c/platformdirs-4.9.4-py3-none-any.whl", hash = "sha256:68a9a4619a666ea6439f2ff250c12a853cd1cbd5158d258bd824a7df6be2f868", size = 21216, upload-time = "2026-03-05T18:34:12.172Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
@@ -328,6 +396,22 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "pre-commit"
version = "4.5.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "cfgv" },
{ name = "identify" },
{ name = "nodeenv" },
{ name = "pyyaml" },
{ name = "virtualenv" },
]
sdist = { url = "https://files.pythonhosted.org/packages/40/f1/6d86a29246dfd2e9b6237f0b5823717f60cad94d47ddc26afa916d21f525/pre_commit-4.5.1.tar.gz", hash = "sha256:eb545fcff725875197837263e977ea257a402056661f09dae08e4b149b030a61", size = 198232, upload-time = "2025-12-16T21:14:33.552Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/5d/19/fd3ef348460c80af7bb4669ea7926651d1f95c23ff2df18b9d24bab4f3fa/pre_commit-4.5.1-py2.py3-none-any.whl", hash = "sha256:3b3afd891e97337708c1674210f8eba659b52a38ea5f822ff142d10786221f77", size = 226437, upload-time = "2025-12-16T21:14:32.409Z" },
]
[[package]]
name = "pydantic"
version = "2.12.5"
@@ -416,6 +500,45 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" },
]
[[package]]
name = "python-discovery"
version = "1.1.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "filelock" },
{ name = "platformdirs" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d7/7e/9f3b0dd3a074a6c3e1e79f35e465b1f2ee4b262d619de00cfce523cc9b24/python_discovery-1.1.3.tar.gz", hash = "sha256:7acca36e818cd88e9b2ba03e045ad7e93e1713e29c6bbfba5d90202310b7baa5", size = 56945, upload-time = "2026-03-10T15:08:15.038Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e7/80/73211fc5bfbfc562369b4aa61dc1e4bf07dc7b34df7b317e4539316b809c/python_discovery-1.1.3-py3-none-any.whl", hash = "sha256:90e795f0121bc84572e737c9aa9966311b9fde44ffb88a5953b3ec9b31c6945e", size = 31485, upload-time = "2026-03-10T15:08:13.06Z" },
]
[[package]]
name = "pyyaml"
version = "6.0.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/05/8e/961c0007c59b8dd7729d542c61a4d537767a59645b82a0b521206e1e25c2/pyyaml-6.0.3.tar.gz", hash = "sha256:d76623373421df22fb4cf8817020cbb7ef15c725b9d5e45f17e189bfc384190f", size = 130960, upload-time = "2025-09-25T21:33:16.546Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9d/8c/f4bd7f6465179953d3ac9bc44ac1a8a3e6122cf8ada906b4f96c60172d43/pyyaml-6.0.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:8d1fab6bb153a416f9aeb4b8763bc0f22a5586065f86f7664fc23339fc1c1fac", size = 181814, upload-time = "2025-09-25T21:32:35.712Z" },
{ url = "https://files.pythonhosted.org/packages/bd/9c/4d95bb87eb2063d20db7b60faa3840c1b18025517ae857371c4dd55a6b3a/pyyaml-6.0.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:34d5fcd24b8445fadc33f9cf348c1047101756fd760b4dacb5c3e99755703310", size = 173809, upload-time = "2025-09-25T21:32:36.789Z" },
{ url = "https://files.pythonhosted.org/packages/92/b5/47e807c2623074914e29dabd16cbbdd4bf5e9b2db9f8090fa64411fc5382/pyyaml-6.0.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:501a031947e3a9025ed4405a168e6ef5ae3126c59f90ce0cd6f2bfc477be31b7", size = 766454, upload-time = "2025-09-25T21:32:37.966Z" },
{ url = "https://files.pythonhosted.org/packages/02/9e/e5e9b168be58564121efb3de6859c452fccde0ab093d8438905899a3a483/pyyaml-6.0.3-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:b3bc83488de33889877a0f2543ade9f70c67d66d9ebb4ac959502e12de895788", size = 836355, upload-time = "2025-09-25T21:32:39.178Z" },
{ url = "https://files.pythonhosted.org/packages/88/f9/16491d7ed2a919954993e48aa941b200f38040928474c9e85ea9e64222c3/pyyaml-6.0.3-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c458b6d084f9b935061bc36216e8a69a7e293a2f1e68bf956dcd9e6cbcd143f5", size = 794175, upload-time = "2025-09-25T21:32:40.865Z" },
{ url = "https://files.pythonhosted.org/packages/dd/3f/5989debef34dc6397317802b527dbbafb2b4760878a53d4166579111411e/pyyaml-6.0.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:7c6610def4f163542a622a73fb39f534f8c101d690126992300bf3207eab9764", size = 755228, upload-time = "2025-09-25T21:32:42.084Z" },
{ url = "https://files.pythonhosted.org/packages/d7/ce/af88a49043cd2e265be63d083fc75b27b6ed062f5f9fd6cdc223ad62f03e/pyyaml-6.0.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:5190d403f121660ce8d1d2c1bb2ef1bd05b5f68533fc5c2ea899bd15f4399b35", size = 789194, upload-time = "2025-09-25T21:32:43.362Z" },
{ url = "https://files.pythonhosted.org/packages/23/20/bb6982b26a40bb43951265ba29d4c246ef0ff59c9fdcdf0ed04e0687de4d/pyyaml-6.0.3-cp314-cp314-win_amd64.whl", hash = "sha256:4a2e8cebe2ff6ab7d1050ecd59c25d4c8bd7e6f400f5f82b96557ac0abafd0ac", size = 156429, upload-time = "2025-09-25T21:32:57.844Z" },
{ url = "https://files.pythonhosted.org/packages/f4/f4/a4541072bb9422c8a883ab55255f918fa378ecf083f5b85e87fc2b4eda1b/pyyaml-6.0.3-cp314-cp314-win_arm64.whl", hash = "sha256:93dda82c9c22deb0a405ea4dc5f2d0cda384168e466364dec6255b293923b2f3", size = 143912, upload-time = "2025-09-25T21:32:59.247Z" },
{ url = "https://files.pythonhosted.org/packages/7c/f9/07dd09ae774e4616edf6cda684ee78f97777bdd15847253637a6f052a62f/pyyaml-6.0.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:02893d100e99e03eda1c8fd5c441d8c60103fd175728e23e431db1b589cf5ab3", size = 189108, upload-time = "2025-09-25T21:32:44.377Z" },
{ url = "https://files.pythonhosted.org/packages/4e/78/8d08c9fb7ce09ad8c38ad533c1191cf27f7ae1effe5bb9400a46d9437fcf/pyyaml-6.0.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:c1ff362665ae507275af2853520967820d9124984e0f7466736aea23d8611fba", size = 183641, upload-time = "2025-09-25T21:32:45.407Z" },
{ url = "https://files.pythonhosted.org/packages/7b/5b/3babb19104a46945cf816d047db2788bcaf8c94527a805610b0289a01c6b/pyyaml-6.0.3-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6adc77889b628398debc7b65c073bcb99c4a0237b248cacaf3fe8a557563ef6c", size = 831901, upload-time = "2025-09-25T21:32:48.83Z" },
{ url = "https://files.pythonhosted.org/packages/8b/cc/dff0684d8dc44da4d22a13f35f073d558c268780ce3c6ba1b87055bb0b87/pyyaml-6.0.3-cp314-cp314t-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:a80cb027f6b349846a3bf6d73b5e95e782175e52f22108cfa17876aaeff93702", size = 861132, upload-time = "2025-09-25T21:32:50.149Z" },
{ url = "https://files.pythonhosted.org/packages/b1/5e/f77dc6b9036943e285ba76b49e118d9ea929885becb0a29ba8a7c75e29fe/pyyaml-6.0.3-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:00c4bdeba853cc34e7dd471f16b4114f4162dc03e6b7afcc2128711f0eca823c", size = 839261, upload-time = "2025-09-25T21:32:51.808Z" },
{ url = "https://files.pythonhosted.org/packages/ce/88/a9db1376aa2a228197c58b37302f284b5617f56a5d959fd1763fb1675ce6/pyyaml-6.0.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:66e1674c3ef6f541c35191caae2d429b967b99e02040f5ba928632d9a7f0f065", size = 805272, upload-time = "2025-09-25T21:32:52.941Z" },
{ url = "https://files.pythonhosted.org/packages/da/92/1446574745d74df0c92e6aa4a7b0b3130706a4142b2d1a5869f2eaa423c6/pyyaml-6.0.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:16249ee61e95f858e83976573de0f5b2893b3677ba71c9dd36b9cf8be9ac6d65", size = 829923, upload-time = "2025-09-25T21:32:54.537Z" },
{ url = "https://files.pythonhosted.org/packages/f0/7a/1c7270340330e575b92f397352af856a8c06f230aa3e76f86b39d01b416a/pyyaml-6.0.3-cp314-cp314t-win_amd64.whl", hash = "sha256:4ad1906908f2f5ae4e5a8ddfce73c320c2a1429ec52eafd27138b7f1cbe341c9", size = 174062, upload-time = "2025-09-25T21:32:55.767Z" },
{ url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" },
]
[[package]]
name = "requests"
version = "2.32.5"
@@ -431,6 +554,31 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" },
]
[[package]]
name = "ruff"
version = "0.15.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/51/df/f8629c19c5318601d3121e230f74cbee7a3732339c52b21daa2b82ef9c7d/ruff-0.15.6.tar.gz", hash = "sha256:8394c7bb153a4e3811a4ecdacd4a8e6a4fa8097028119160dffecdcdf9b56ae4", size = 4597916, upload-time = "2026-03-12T23:05:47.51Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9e/2f/4e03a7e5ce99b517e98d3b4951f411de2b0fa8348d39cf446671adcce9a2/ruff-0.15.6-py3-none-linux_armv6l.whl", hash = "sha256:7c98c3b16407b2cf3d0f2b80c80187384bc92c6774d85fefa913ecd941256fff", size = 10508953, upload-time = "2026-03-12T23:05:17.246Z" },
{ url = "https://files.pythonhosted.org/packages/70/60/55bcdc3e9f80bcf39edf0cd272da6fa511a3d94d5a0dd9e0adf76ceebdb4/ruff-0.15.6-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:ee7dcfaad8b282a284df4aa6ddc2741b3f4a18b0555d626805555a820ea181c3", size = 10942257, upload-time = "2026-03-12T23:05:23.076Z" },
{ url = "https://files.pythonhosted.org/packages/e7/f9/005c29bd1726c0f492bfa215e95154cf480574140cb5f867c797c18c790b/ruff-0.15.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:3bd9967851a25f038fc8b9ae88a7fbd1b609f30349231dffaa37b6804923c4bb", size = 10322683, upload-time = "2026-03-12T23:05:33.738Z" },
{ url = "https://files.pythonhosted.org/packages/5f/74/2f861f5fd7cbb2146bddb5501450300ce41562da36d21868c69b7a828169/ruff-0.15.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:13f4594b04e42cd24a41da653886b04d2ff87adbf57497ed4f728b0e8a4866f8", size = 10660986, upload-time = "2026-03-12T23:05:53.245Z" },
{ url = "https://files.pythonhosted.org/packages/c1/a1/309f2364a424eccb763cdafc49df843c282609f47fe53aa83f38272389e0/ruff-0.15.6-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:e2ed8aea2f3fe57886d3f00ea5b8aae5bf68d5e195f487f037a955ff9fbaac9e", size = 10332177, upload-time = "2026-03-12T23:05:56.145Z" },
{ url = "https://files.pythonhosted.org/packages/30/41/7ebf1d32658b4bab20f8ac80972fb19cd4e2c6b78552be263a680edc55ac/ruff-0.15.6-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:70789d3e7830b848b548aae96766431c0dc01a6c78c13381f423bf7076c66d15", size = 11170783, upload-time = "2026-03-12T23:06:01.742Z" },
{ url = "https://files.pythonhosted.org/packages/76/be/6d488f6adca047df82cd62c304638bcb00821c36bd4881cfca221561fdfc/ruff-0.15.6-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:542aaf1de3154cea088ced5a819ce872611256ffe2498e750bbae5247a8114e9", size = 12044201, upload-time = "2026-03-12T23:05:28.697Z" },
{ url = "https://files.pythonhosted.org/packages/71/68/e6f125df4af7e6d0b498f8d373274794bc5156b324e8ab4bf5c1b4fc0ec7/ruff-0.15.6-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1c22e6f02c16cfac3888aa636e9eba857254d15bbacc9906c9689fdecb1953ab", size = 11421561, upload-time = "2026-03-12T23:05:31.236Z" },
{ url = "https://files.pythonhosted.org/packages/f1/9f/f85ef5fd01a52e0b472b26dc1b4bd228b8f6f0435975442ffa4741278703/ruff-0.15.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:98893c4c0aadc8e448cfa315bd0cc343a5323d740fe5f28ef8a3f9e21b381f7e", size = 11310928, upload-time = "2026-03-12T23:05:45.288Z" },
{ url = "https://files.pythonhosted.org/packages/8c/26/b75f8c421f5654304b89471ed384ae8c7f42b4dff58fa6ce1626d7f2b59a/ruff-0.15.6-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:70d263770d234912374493e8cc1e7385c5d49376e41dfa51c5c3453169dc581c", size = 11235186, upload-time = "2026-03-12T23:05:50.677Z" },
{ url = "https://files.pythonhosted.org/packages/fc/d4/d5a6d065962ff7a68a86c9b4f5500f7d101a0792078de636526c0edd40da/ruff-0.15.6-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:55a1ad63c5a6e54b1f21b7514dfadc0c7fb40093fa22e95143cf3f64ebdcd512", size = 10635231, upload-time = "2026-03-12T23:05:37.044Z" },
{ url = "https://files.pythonhosted.org/packages/d6/56/7c3acf3d50910375349016cf33de24be021532042afbed87942858992491/ruff-0.15.6-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:8dc473ba093c5ec238bb1e7429ee676dca24643c471e11fbaa8a857925b061c0", size = 10340357, upload-time = "2026-03-12T23:06:04.748Z" },
{ url = "https://files.pythonhosted.org/packages/06/54/6faa39e9c1033ff6a3b6e76b5df536931cd30caf64988e112bbf91ef5ce5/ruff-0.15.6-py3-none-musllinux_1_2_i686.whl", hash = "sha256:85b042377c2a5561131767974617006f99f7e13c63c111b998f29fc1e58a4cfb", size = 10860583, upload-time = "2026-03-12T23:05:58.978Z" },
{ url = "https://files.pythonhosted.org/packages/cb/1e/509a201b843b4dfb0b32acdedf68d951d3377988cae43949ba4c4133a96a/ruff-0.15.6-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:cef49e30bc5a86a6a92098a7fbf6e467a234d90b63305d6f3ec01225a9d092e0", size = 11410976, upload-time = "2026-03-12T23:05:39.955Z" },
{ url = "https://files.pythonhosted.org/packages/6c/25/3fc9114abf979a41673ce877c08016f8e660ad6cf508c3957f537d2e9fa9/ruff-0.15.6-py3-none-win32.whl", hash = "sha256:bbf67d39832404812a2d23020dda68fee7f18ce15654e96fb1d3ad21a5fe436c", size = 10616872, upload-time = "2026-03-12T23:05:42.451Z" },
{ url = "https://files.pythonhosted.org/packages/89/7a/09ece68445ceac348df06e08bf75db72d0e8427765b96c9c0ffabc1be1d9/ruff-0.15.6-py3-none-win_amd64.whl", hash = "sha256:aee25bc84c2f1007ecb5037dff75cef00414fdf17c23f07dc13e577883dca406", size = 11787271, upload-time = "2026-03-12T23:05:20.168Z" },
{ url = "https://files.pythonhosted.org/packages/7f/d0/578c47dd68152ddddddf31cd7fc67dc30b7cdf639a86275fda821b0d9d98/ruff-0.15.6-py3-none-win_arm64.whl", hash = "sha256:c34de3dd0b0ba203be50ae70f5910b17188556630e2178fd7d79fc030eb0d837", size = 11060497, upload-time = "2026-03-12T23:05:25.968Z" },
]
[[package]]
name = "sqlalchemy"
version = "2.0.48"
@@ -457,6 +605,30 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/46/2c/9664130905f03db57961b8980b05cab624afd114bf2be2576628a9f22da4/sqlalchemy-2.0.48-py3-none-any.whl", hash = "sha256:a66fe406437dd65cacd96a72689a3aaaecaebbcd62d81c5ac1c0fdbeac835096", size = 1940202, upload-time = "2026-03-02T15:52:43.285Z" },
]
[[package]]
name = "ty"
version = "0.0.23"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/75/ba/d3c998ff4cf6b5d75b39356db55fe1b7caceecc522b9586174e6a5dee6f7/ty-0.0.23.tar.gz", hash = "sha256:5fb05db58f202af366f80ef70f806e48f5237807fe424ec787c9f289e3f3a4ef", size = 5341461, upload-time = "2026-03-13T12:34:23.125Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f4/21/aab32603dfdfacd4819e52fa8c6074e7bd578218a5142729452fc6a62db6/ty-0.0.23-py3-none-linux_armv6l.whl", hash = "sha256:e810eef1a5f1cfc0731a58af8d2f334906a96835829767aed00026f1334a8dd7", size = 10329096, upload-time = "2026-03-13T12:34:09.432Z" },
{ url = "https://files.pythonhosted.org/packages/9f/a9/dd3287a82dce3df546ec560296208d4905dcf06346b6e18c2f3c63523bd1/ty-0.0.23-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:e43d36bd89a151ddcad01acaeff7dcc507cb73ff164c1878d2d11549d39a061c", size = 10156631, upload-time = "2026-03-13T12:34:53.122Z" },
{ url = "https://files.pythonhosted.org/packages/0f/01/3f25909b02fac29bb0a62b2251f8d62e65d697781ffa4cf6b47a4c075c85/ty-0.0.23-py3-none-macosx_11_0_arm64.whl", hash = "sha256:bd6a340969577b4645f231572c4e46012acba2d10d4c0c6570fe1ab74e76ae00", size = 9653211, upload-time = "2026-03-13T12:34:15.049Z" },
{ url = "https://files.pythonhosted.org/packages/d5/60/bfc0479572a6f4b90501c869635faf8d84c8c68ffc5dd87d04f049affabc/ty-0.0.23-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:341441783e626eeb7b1ec2160432956aed5734932ab2d1c26f94d0c98b229937", size = 10156143, upload-time = "2026-03-13T12:34:34.468Z" },
{ url = "https://files.pythonhosted.org/packages/3a/81/8a93e923535a340f54bea20ff196f6b2787782b2f2f399bd191c4bc132d6/ty-0.0.23-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:8ce1dc66c26d4167e2c78d12fa870ef5a7ec9cc344d2baaa6243297cfa88bd52", size = 10136632, upload-time = "2026-03-13T12:34:28.832Z" },
{ url = "https://files.pythonhosted.org/packages/da/cb/2ac81c850c58acc9f976814404d28389c9c1c939676e32287b9cff61381e/ty-0.0.23-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bae1e7a294bf8528836f7617dc5c360ea2dddb63789fc9471ae6753534adca05", size = 10655025, upload-time = "2026-03-13T12:34:37.105Z" },
{ url = "https://files.pythonhosted.org/packages/b5/9b/bac771774c198c318ae699fc013d8cd99ed9caf993f661fba11238759244/ty-0.0.23-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d2b162768764d9dc177c83fb497a51532bb67cbebe57b8fa0f2668436bf53f3c", size = 11230107, upload-time = "2026-03-13T12:34:20.751Z" },
{ url = "https://files.pythonhosted.org/packages/14/09/7644fb0e297265e18243f878aca343593323b9bb19ed5278dcbc63781be0/ty-0.0.23-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d28384e48ca03b34e4e2beee0e230c39bbfb68994bb44927fec61ef3642900da", size = 10934177, upload-time = "2026-03-13T12:34:17.904Z" },
{ url = "https://files.pythonhosted.org/packages/18/14/69a25a0cad493fb6a947302471b579a03516a3b00e7bece77fdc6b4afb9b/ty-0.0.23-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:559d9a299df793cb7a7902caed5eda8a720ff69164c31c979673e928f02251ee", size = 10752487, upload-time = "2026-03-13T12:34:31.785Z" },
{ url = "https://files.pythonhosted.org/packages/9d/2a/42fc3cbccf95af0a62308ebed67e084798ab7a85ef073c9986ef18032743/ty-0.0.23-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:32a7b8a14a98e1d20a9d8d2af23637ed7efdb297ac1fa2450b8e465d05b94482", size = 10133007, upload-time = "2026-03-13T12:34:42.838Z" },
{ url = "https://files.pythonhosted.org/packages/e1/69/307833f1b52fa3670e0a1d496e43ef7df556ecde838192d3fcb9b35e360d/ty-0.0.23-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:6f803b9b9cca87af793467973b9abdd4b83e6b96d9b5e749d662cff7ead70b6d", size = 10169698, upload-time = "2026-03-13T12:34:12.351Z" },
{ url = "https://files.pythonhosted.org/packages/89/ae/5dd379ec22d0b1cba410d7af31c366fcedff191d5b867145913a64889f66/ty-0.0.23-py3-none-musllinux_1_2_i686.whl", hash = "sha256:4a0bf086ec8e2197b7ea7ebfcf4be36cb6a52b235f8be61647ef1b2d99d6ffd3", size = 10346080, upload-time = "2026-03-13T12:34:40.012Z" },
{ url = "https://files.pythonhosted.org/packages/98/c7/dfc83203d37998620bba9c4873a080c8850a784a8a46f56f8163c5b4e320/ty-0.0.23-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:252539c3fcd7aeb9b8d5c14e2040682c3e1d7ff640906d63fd2c4ce35865a4ba", size = 10848162, upload-time = "2026-03-13T12:34:45.421Z" },
{ url = "https://files.pythonhosted.org/packages/89/08/05481511cfbcc1fd834b6c67aaae090cb609a079189ddf2032139ccfc490/ty-0.0.23-py3-none-win32.whl", hash = "sha256:51b591d19eef23bbc3807aef77d38fa1f003c354e1da908aa80ea2dca0993f77", size = 9748283, upload-time = "2026-03-13T12:34:50.607Z" },
{ url = "https://files.pythonhosted.org/packages/31/2e/eaed4ff5c85e857a02415084c394e02c30476b65e158eec1938fdaa9a205/ty-0.0.23-py3-none-win_amd64.whl", hash = "sha256:1e137e955f05c501cfbb81dd2190c8fb7d01ec037c7e287024129c722a83c9ad", size = 10698355, upload-time = "2026-03-13T12:34:26.134Z" },
{ url = "https://files.pythonhosted.org/packages/91/29/b32cb7b4c7d56b9ed50117f8ad6e45834aec293e4cb14749daab4e9236d5/ty-0.0.23-py3-none-win_arm64.whl", hash = "sha256:a0399bd13fd2cd6683fd0a2d59b9355155d46546d8203e152c556ddbdeb20842", size = 10155890, upload-time = "2026-03-13T12:34:48.082Z" },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"
@@ -487,6 +659,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" },
]
[[package]]
name = "virtualenv"
version = "21.2.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "distlib" },
{ name = "filelock" },
{ name = "platformdirs" },
{ name = "python-discovery" },
]
sdist = { url = "https://files.pythonhosted.org/packages/aa/92/58199fe10049f9703c2666e809c4f686c54ef0a68b0f6afccf518c0b1eb9/virtualenv-21.2.0.tar.gz", hash = "sha256:1720dc3a62ef5b443092e3f499228599045d7fea4c79199770499df8becf9098", size = 5840618, upload-time = "2026-03-09T17:24:38.013Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c6/59/7d02447a55b2e55755011a647479041bc92a82e143f96a8195cb33bd0a1c/virtualenv-21.2.0-py3-none-any.whl", hash = "sha256:1bd755b504931164a5a496d217c014d098426cddc79363ad66ac78125f9d908f", size = 5825084, upload-time = "2026-03-09T17:24:35.378Z" },
]
[[package]]
name = "werkzeug"
version = "3.1.6"