Source code for django_api_orm.base

"""Synchronous base classes for APIModel, QuerySet, and Manager."""

from collections.abc import Iterator
from typing import Any, Generic, TypeVar, Union

from pydantic import BaseModel

from .client import ServiceClient
from .exceptions import DoesNotExist, MultipleObjectsReturned

T = TypeVar("T", bound="APIModel")


[docs] class QuerySet(Generic[T]): """Django-like QuerySet for filtering and retrieving API resources. Provides lazy evaluation with result caching and chainable filter methods. Example: >>> queryset = Policy.objects.filter(status='active') >>> queryset = queryset.order_by('-created_at') >>> for policy in queryset: # Executes query here ... print(policy.policy_number) """
[docs] def __init__(self, model_class: type[T], manager: "Manager[T]") -> None: """Initialize QuerySet. Args: model_class: The model class this QuerySet represents manager: The manager that created this QuerySet """ self.model_class = model_class self.manager = manager # Query parameters self._filters: dict[str, Any] = {} self._excludes: dict[str, Any] = {} self._order_by_fields: list[str] = [] self._limit: int | None = None self._offset: int | None = None # Result caching self._result_cache: list[T] | None = None self._fetched = False
def _clone(self) -> "QuerySet[T]": """Create a copy of this QuerySet for chaining. Returns: A new QuerySet with the same parameters """ qs = QuerySet(self.model_class, self.manager) qs._filters = self._filters.copy() qs._excludes = self._excludes.copy() qs._order_by_fields = self._order_by_fields.copy() qs._limit = self._limit qs._offset = self._offset return qs def _build_params(self) -> dict[str, Any]: """Build query parameters for the API request. Returns: Dictionary of query parameters """ params: dict[str, Any] = {} # Add filters params.update(self._filters) # Add excludes (if API supports it) for key, value in self._excludes.items(): params[f"exclude_{key}"] = value # Add ordering if self._order_by_fields: params["ordering"] = ",".join(self._order_by_fields) # Add pagination if self._limit is not None: params["limit"] = self._limit if self._offset is not None: params["offset"] = self._offset return params def _fetch(self) -> None: """Execute the query and cache results.""" if self._fetched: return params = self._build_params() response = self.manager.client.get(self.manager.get_endpoint(), params=params) # Parse response data data = response.data # Handle paginated responses (Django REST framework style) if isinstance(data, dict) and "results" in data: results = data["results"] elif isinstance(data, list): results = data else: results = [data] # Convert to model instances self._result_cache = [ self.model_class.from_api(item, client=self.manager.client) # type: ignore[misc] for item in results ] self._fetched = True # Filtering methods
[docs] def filter(self, **kwargs: Any) -> "QuerySet[T]": """Filter QuerySet by given parameters. Args: **kwargs: Field lookups (e.g., status='active', id=123) Returns: New QuerySet with filters applied Example: >>> Policy.objects.filter(status='active', premium_amount__gte=1000) """ qs = self._clone() qs._filters.update(kwargs) return qs
[docs] def exclude(self, **kwargs: Any) -> "QuerySet[T]": """Exclude results matching given parameters. Args: **kwargs: Field lookups to exclude Returns: New QuerySet with exclusions applied Example: >>> Policy.objects.exclude(status='cancelled') """ qs = self._clone() qs._excludes.update(kwargs) return qs
[docs] def all(self) -> "QuerySet[T]": """Return a copy of this QuerySet. Returns: New QuerySet (clone) """ return self._clone()
# Ordering methods
[docs] def order_by(self, *fields: str) -> "QuerySet[T]": """Order results by given fields. Args: *fields: Field names (prefix with '-' for descending) Returns: New QuerySet with ordering applied Example: >>> Policy.objects.order_by('-created_at', 'policy_number') """ qs = self._clone() qs._order_by_fields = list(fields) return qs
# Slicing and limiting methods
[docs] def first(self) -> T | None: """Get the first result or None. Returns: First model instance or None Example: >>> policy = Policy.objects.filter(status='active').first() """ qs = self._clone() qs._limit = 1 qs._fetch() return qs._result_cache[0] if qs._result_cache else None
[docs] def last(self) -> T | None: """Get the last result or None. Returns: Last model instance or None """ qs = self._clone() # If ordering specified, reverse it and get first if qs._order_by_fields: qs._order_by_fields = [ f[1:] if f.startswith("-") else f"-{f}" for f in qs._order_by_fields ] qs._limit = 1 # If no ordering, need to fetch all and get last qs._fetch() return qs._result_cache[-1] if qs._result_cache else None
[docs] def __getitem__(self, key: int | slice) -> Union[T, "QuerySet[T]"]: """Support slicing and indexing. Args: key: Integer index or slice Returns: Model instance (for int) or QuerySet (for slice) Example: >>> policies = Policy.objects.all() >>> first = policies[0] # Get first >>> subset = policies[10:20] # Get slice """ if isinstance(key, int): # Get single item by index qs = self._clone() qs._offset = key qs._limit = 1 qs._fetch() if not qs._result_cache: raise IndexError("QuerySet index out of range") return qs._result_cache[0] elif isinstance(key, slice): # Get slice qs = self._clone() if key.start is not None: qs._offset = key.start if key.stop is not None: if key.start is not None: qs._limit = key.stop - key.start else: qs._limit = key.stop return qs else: raise TypeError("QuerySet indices must be integers or slices")
# Retrieval methods
[docs] def get(self, **kwargs: Any) -> T: """Get a single object matching the criteria. Args: **kwargs: Field lookups Returns: Single model instance Raises: DoesNotExist: If no results found MultipleObjectsReturned: If multiple results found Example: >>> policy = Policy.objects.get(id=123) """ qs = self.filter(**kwargs) qs._limit = 2 # Fetch 2 to detect multiple qs._fetch() if not qs._result_cache: raise DoesNotExist(f"{self.model_class.__name__} matching query does not exist") if len(qs._result_cache) > 1: raise MultipleObjectsReturned( f"get() returned more than one {self.model_class.__name__}" ) return qs._result_cache[0]
[docs] def exists(self) -> bool: """Check if any results exist. Returns: True if results exist, False otherwise Example: >>> if Policy.objects.filter(status='active').exists(): ... print("Active policies found") """ qs = self._clone() qs._limit = 1 qs._fetch() return bool(qs._result_cache)
[docs] def count(self) -> int: """Get count of results. Returns: Number of results Example: >>> count = Policy.objects.filter(status='active').count() """ # Try to get count from API without fetching all results params = self._build_params() params["count_only"] = "true" try: response = self.manager.client.get(self.manager.get_endpoint(), params=params) # Try to get count from response if isinstance(response.data, dict): if "count" in response.data: return int(response.data["count"]) elif "total" in response.data: return int(response.data["total"]) except Exception: pass # Fallback: fetch and count self._fetch() return len(self._result_cache) if self._result_cache else 0
# Iteration support
[docs] def __iter__(self) -> Iterator[T]: """Make QuerySet iterable. Returns: Iterator over model instances Example: >>> for policy in Policy.objects.filter(status='active'): ... print(policy.policy_number) """ self._fetch() return iter(self._result_cache or [])
[docs] def __len__(self) -> int: """Get length of results. Returns: Number of cached results """ self._fetch() return len(self._result_cache) if self._result_cache else 0
# Value extraction methods
[docs] def values(self, *fields: str) -> list[dict[str, Any]]: """Return list of dictionaries instead of model instances. Args: *fields: Field names to include (all if not specified) Returns: List of dictionaries Example: >>> policies = Policy.objects.values('id', 'policy_number') >>> # [{'id': 1, 'policy_number': 'POL-001'}, ...] """ self._fetch() if not self._result_cache: return [] results = [] for obj in self._result_cache: data = obj.to_dict() if fields: data = {k: v for k, v in data.items() if k in fields} results.append(data) return results
[docs] def values_list(self, *fields: str, flat: bool = False) -> list[Any]: """Return list of tuples instead of model instances. Args: *fields: Field names to include flat: If True and one field, return flat list Returns: List of tuples (or flat list if flat=True) Example: >>> ids = Policy.objects.values_list('id', flat=True) >>> # [1, 2, 3, ...] >>> pairs = Policy.objects.values_list('id', 'policy_number') >>> # [(1, 'POL-001'), (2, 'POL-002'), ...] """ self._fetch() if not self._result_cache: return [] if flat and len(fields) != 1: raise ValueError("'flat' is only valid when one field is specified") results = [] for obj in self._result_cache: data = obj.to_dict() if flat: results.append(data.get(fields[0])) else: values = tuple(data.get(field) for field in fields) results.append(values) return results
[docs] def __repr__(self) -> str: """String representation of QuerySet.""" if self._result_cache is not None: return f"<QuerySet {list(self._result_cache)}>" return f"<QuerySet for {self.model_class.__name__}>"
[docs] class Manager(Generic[T]): """Django-like Manager for model querying. Handles creation and retrieval of model instances. Example: >>> class Policy(APIModel): ... objects = Manager() >>> policies = Policy.objects.filter(status='active') """
[docs] def __init__(self, model_class: type[T], client: ServiceClient) -> None: """Initialize Manager. Args: model_class: The model class this manager handles client: HTTP client for API requests """ self.model_class = model_class self.client = client
[docs] def get_endpoint(self) -> str: """Get the API endpoint for this model. Returns: API endpoint path """ return self.model_class.get_endpoint()
# Query methods (delegate to QuerySet)
[docs] def all(self) -> QuerySet[T]: """Get all objects. Returns: QuerySet of all objects Example: >>> all_policies = Policy.objects.all() """ return QuerySet(self.model_class, self)
[docs] def filter(self, **kwargs: Any) -> QuerySet[T]: """Filter objects by criteria. Args: **kwargs: Filter parameters Returns: Filtered QuerySet Example: >>> active_policies = Policy.objects.filter(status='active') """ return self.all().filter(**kwargs)
[docs] def exclude(self, **kwargs: Any) -> QuerySet[T]: """Exclude objects matching criteria. Args: **kwargs: Exclusion parameters Returns: Filtered QuerySet """ return self.all().exclude(**kwargs)
[docs] def get(self, **kwargs: Any) -> T: """Get a single object. Args: **kwargs: Lookup parameters Returns: Single model instance Raises: DoesNotExist: If not found MultipleObjectsReturned: If multiple found Example: >>> policy = Policy.objects.get(id=123) """ return self.all().get(**kwargs)
[docs] def order_by(self, *fields: str) -> QuerySet[T]: """Order results by given fields. Args: *fields: Field names to order by (prefix with '-' for descending) Returns: Ordered QuerySet Example: >>> policies = Policy.objects.order_by('-created_at') """ return self.all().order_by(*fields)
[docs] def first(self) -> T | None: """Get the first object or None. Returns: First model instance or None Example: >>> policy = Policy.objects.first() """ return self.all().first()
[docs] def last(self) -> T | None: """Get the last object or None. Returns: Last model instance or None Example: >>> policy = Policy.objects.last() """ return self.all().last()
[docs] def exists(self) -> bool: """Check if any objects exist. Returns: True if results exist, False otherwise Example: >>> has_policies = Policy.objects.filter(status='active').exists() """ return self.all().exists()
[docs] def values(self, *fields: str) -> list[dict[str, Any]]: """Return list of dictionaries instead of model instances. Args: *fields: Field names to include (all if not specified) Returns: List of dictionaries Example: >>> policies = Policy.objects.values('id', 'policy_number') """ return self.all().values(*fields)
[docs] def values_list(self, *fields: str, flat: bool = False) -> list[Any]: """Return list of tuples instead of model instances. Args: *fields: Field names to include flat: If True and one field, return flat list Returns: List of tuples (or flat list if flat=True) Example: >>> ids = Policy.objects.values_list('id', flat=True) """ return self.all().values_list(*fields, flat=flat)
[docs] def count(self) -> int: """Count the number of objects. Returns: Number of objects Example: >>> total = Policy.objects.count() """ return self.all().count()
# Creation methods
[docs] def create(self, **kwargs: Any) -> T: """Create a new object in the API. Args: **kwargs: Field values Returns: Created model instance Example: >>> policy = Policy.objects.create( ... policy_number='POL-001', ... premium_amount=1500.00 ... ) """ # Validate with Pydantic schema schema_class = self.model_class.get_schema_class() validated_data = schema_class(**kwargs) data = validated_data.model_dump(mode="json", exclude_unset=True) # Make API request response = self.client.post(self.get_endpoint(), data=data) # Return model instance return self.model_class.from_api(response.data, client=self.client) # type: ignore[return-value]
[docs] def get_or_create( self, defaults: dict[str, Any] | None = None, **kwargs: Any ) -> tuple[T, bool]: """Get an existing object or create a new one. Args: defaults: Values to use when creating **kwargs: Lookup parameters Returns: Tuple of (object, created) where created is a boolean Example: >>> policy, created = Policy.objects.get_or_create( ... policy_number='POL-001', ... defaults={'premium_amount': 1500.00} ... ) """ try: obj = self.get(**kwargs) return obj, False except DoesNotExist: create_data = kwargs.copy() if defaults: create_data.update(defaults) obj = self.create(**create_data) return obj, True
[docs] def update_or_create( self, defaults: dict[str, Any] | None = None, **kwargs: Any ) -> tuple[T, bool]: """Update an existing object or create a new one. Args: defaults: Values to update/create with **kwargs: Lookup parameters Returns: Tuple of (object, created) where created is a boolean Example: >>> policy, created = Policy.objects.update_or_create( ... policy_number='POL-001', ... defaults={'premium_amount': 2000.00} ... ) """ try: obj = self.get(**kwargs) # Update object if defaults: for key, value in defaults.items(): setattr(obj, key, value) obj.save(update_fields=list(defaults.keys())) return obj, False except DoesNotExist: create_data = kwargs.copy() if defaults: create_data.update(defaults) obj = self.create(**create_data) return obj, True
[docs] def bulk_create(self, objs: list[dict[str, Any]]) -> list[T]: """Create multiple objects in bulk. Args: objs: List of dictionaries with object data Returns: List of created model instances Example: >>> policies = Policy.objects.bulk_create([ ... {'policy_number': 'POL-001', 'premium_amount': 1500}, ... {'policy_number': 'POL-002', 'premium_amount': 2000}, ... ]) """ created_objects = [] for obj_data in objs: obj = self.create(**obj_data) created_objects.append(obj) return created_objects
[docs] class APIModel: """Base class for API models. Provides Django ORM-like interface for working with API resources. Must be subclassed with _schema_class and _endpoint defined. Example: >>> class PolicySchema(BaseModel): ... id: int ... policy_number: str ... premium_amount: float >>> >>> class Policy(APIModel): ... _schema_class = PolicySchema ... _endpoint = "/api/v1/policies/" >>> >>> # Register with client >>> Policy.objects = Manager(Policy, client) """ # Class attributes (to be overridden in subclasses) _schema_class: type[BaseModel] _endpoint: str objects: Manager["APIModel"]
[docs] def __init__(self, **kwargs: Any) -> None: """Initialize model instance. Args: **kwargs: Field values or _pydantic_instance """ # Check if we received a pydantic instance if "_pydantic_instance" in kwargs: self._pydantic_instance: BaseModel = kwargs["_pydantic_instance"] self._client: ServiceClient | None = kwargs.get("_client") else: # Validate and create pydantic instance schema_class = self.get_schema_class() self._pydantic_instance = schema_class(**kwargs) self._client = kwargs.get("_client") # Set attributes from pydantic instance (preserving nested Pydantic models) for key in self._pydantic_instance.model_fields.keys(): object.__setattr__(self, key, getattr(self._pydantic_instance, key))
[docs] def __setattr__(self, name: str, value: Any) -> None: """Set attribute and update internal pydantic instance. Args: name: Attribute name value: Attribute value """ # Set the attribute normally object.__setattr__(self, name, value) # If it's a model field and we have a pydantic instance, update it if ( name not in ("_pydantic_instance", "_client") and hasattr(self, "_pydantic_instance") and name in self._pydantic_instance.model_fields ): # Get current data and update with new value current_data = self._pydantic_instance.model_dump() current_data[name] = value # Recreate pydantic instance with updated data schema_class = self.get_schema_class() self._pydantic_instance = schema_class(**current_data)
[docs] @classmethod def get_endpoint(cls) -> str: """Get the API endpoint for this model. Returns: API endpoint path """ return cls._endpoint
[docs] @classmethod def get_schema_class(cls) -> type[BaseModel]: """Get the Pydantic schema class for this model. Returns: Pydantic BaseModel class """ return cls._schema_class
[docs] @classmethod def from_api(cls, data: dict[str, Any], client: ServiceClient | None = None) -> "APIModel": """Create model instance from API response data. Args: data: API response data client: HTTP client Returns: Model instance """ schema_class = cls.get_schema_class() pydantic_instance = schema_class(**data) return cls(_pydantic_instance=pydantic_instance, _client=client)
[docs] def to_dict(self, exclude_unset: bool = False, exclude_none: bool = False) -> dict[str, Any]: """Convert model to dictionary. Args: exclude_unset: Exclude fields that weren't explicitly set exclude_none: Exclude fields with None values Returns: Dictionary representation """ return self._pydantic_instance.model_dump( mode="json", exclude_unset=exclude_unset, exclude_none=exclude_none )
[docs] def model_dump(self, **kwargs: Any) -> dict[str, Any]: """Alias for to_dict for Pydantic compatibility. Args: **kwargs: Arguments to pass to model_dump Returns: Dictionary representation """ return self._pydantic_instance.model_dump(**kwargs)
[docs] def save(self, update_fields: list[str] | None = None) -> None: """Save this instance to the API. Creates or updates based on whether the object has an ID. Args: update_fields: List of fields to update (None = all fields) Raises: APIException: If client is not set Example: >>> policy = Policy(policy_number='POL-001', premium_amount=1500) >>> policy.save() # Creates new >>> policy.premium_amount = 2000 >>> policy.save(update_fields=['premium_amount']) # Updates """ from .exceptions import APIException if not self._client: raise APIException("Cannot save: no client configured for this instance") # Get data to send data = self.to_dict(exclude_unset=True) schema_class = self.get_schema_class() # Filter by update_fields if specified if update_fields: data = {k: v for k, v in data.items() if k in update_fields} # Skip validation for partial updates - API will validate else: # Validate full data schema_class(**data) # Determine if update or create if hasattr(self, "id") and self.id: # Update existing endpoint = f"{self.get_endpoint()}{self.id}/" response = self._client.patch(endpoint, data=data) else: # Create new response = self._client.post(self.get_endpoint(), data=data) # Update instance with response data pydantic_instance = schema_class(**response.data) self._pydantic_instance = pydantic_instance # Update attributes (preserving nested Pydantic models) for key in pydantic_instance.model_fields.keys(): object.__setattr__(self, key, getattr(pydantic_instance, key))
[docs] def delete(self) -> None: """Delete this instance from the API. Raises: APIException: If client is not set or object has no ID Example: >>> policy = Policy.objects.get(id=123) >>> policy.delete() """ from .exceptions import APIException if not self._client: raise APIException("Cannot delete: no client configured") if not hasattr(self, "id") or not self.id: raise APIException("Cannot delete: object has no id") endpoint = f"{self.get_endpoint()}{self.id}/" self._client.delete(endpoint)
[docs] def refresh_from_api(self) -> None: """Refresh this instance's data from the API. Raises: APIException: If client is not set or object has no ID Example: >>> policy = Policy.objects.get(id=123) >>> # ... time passes, data may have changed ... >>> policy.refresh_from_api() # Reload from API """ from .exceptions import APIException if not self._client: raise APIException("Cannot refresh: no client configured") if not hasattr(self, "id") or not self.id: raise APIException("Cannot refresh: object has no id") endpoint = f"{self.get_endpoint()}{self.id}/" response = self._client.get(endpoint) # Update instance with fresh data schema_class = self.get_schema_class() pydantic_instance = schema_class(**response.data) self._pydantic_instance = pydantic_instance # Update attributes (preserving nested Pydantic models) for key in pydantic_instance.model_fields.keys(): object.__setattr__(self, key, getattr(pydantic_instance, key))
[docs] def __repr__(self) -> str: """String representation.""" return f"<{self.__class__.__name__}: {self._pydantic_instance}>"
[docs] def __str__(self) -> str: """Human-readable string.""" return repr(self)
[docs] def register_models(client: ServiceClient, *model_classes: type[APIModel]) -> None: """Register model classes with a sync client. This assigns a Manager instance to each model's 'objects' attribute. Args: client: ServiceClient instance *model_classes: Model classes to register Example: >>> client = ServiceClient(base_url="https://api.example.com") >>> register_models(client, Policy, Claim, Broker) >>> policies = Policy.objects.filter(status='active') """ for model_class in model_classes: model_class.objects = Manager(model_class, client)