from __future__ import annotations import pandas as pd import warnings from datetime import datetime from .core.scrapers.models import Property, ListingType, Advertisers from .exceptions import InvalidListingType, InvalidDate ordered_properties = [ "property_url", "property_id", "listing_id", "permalink", "mls", "mls_id", "status", "mls_status", "text", "style", "formatted_address", "full_street_line", "street", "unit", "city", "state", "zip_code", "beds", "full_baths", "half_baths", "sqft", "year_built", "days_on_mls", "list_price", "list_price_min", "list_price_max", "list_date", "pending_date", "sold_price", "last_sold_date", "last_sold_price", "last_status_change_date", "last_update_date", "assessed_value", "estimated_value", "tax", "tax_history", "new_construction", "lot_sqft", "price_per_sqft", "latitude", "longitude", "neighborhoods", "county", "fips_code", "stories", "hoa_fee", "parking_garage", "agent_id", "agent_name", "agent_email", "agent_phones", "agent_mls_set", "agent_nrds_id", "broker_id", "broker_name", "builder_id", "builder_name", "office_id", "office_mls_set", "office_name", "office_email", "office_phones", "nearby_schools", "primary_photo", "alt_photos" ] def process_result(result: Property) -> pd.DataFrame: prop_data = {prop: None for prop in ordered_properties} prop_data.update(result.model_dump()) if "address" in prop_data and prop_data["address"]: address_data = prop_data["address"] prop_data["full_street_line"] = address_data.get("full_line") prop_data["street"] = address_data.get("street") prop_data["unit"] = address_data.get("unit") prop_data["city"] = address_data.get("city") prop_data["state"] = address_data.get("state") prop_data["zip_code"] = address_data.get("zip") prop_data["formatted_address"] = address_data.get("formatted_address") if "advertisers" in prop_data and prop_data.get("advertisers"): advertiser_data = prop_data["advertisers"] if advertiser_data.get("agent"): agent_data = advertiser_data["agent"] prop_data["agent_id"] = agent_data.get("uuid") prop_data["agent_name"] = agent_data.get("name") prop_data["agent_email"] = agent_data.get("email") prop_data["agent_phones"] = agent_data.get("phones") prop_data["agent_mls_set"] = agent_data.get("mls_set") prop_data["agent_nrds_id"] = agent_data.get("nrds_id") if advertiser_data.get("broker"): broker_data = advertiser_data["broker"] prop_data["broker_id"] = broker_data.get("uuid") prop_data["broker_name"] = broker_data.get("name") if advertiser_data.get("builder"): builder_data = advertiser_data["builder"] prop_data["builder_id"] = builder_data.get("uuid") prop_data["builder_name"] = builder_data.get("name") if advertiser_data.get("office"): office_data = advertiser_data["office"] prop_data["office_id"] = office_data.get("uuid") prop_data["office_name"] = office_data.get("name") prop_data["office_email"] = office_data.get("email") prop_data["office_phones"] = office_data.get("phones") prop_data["office_mls_set"] = office_data.get("mls_set") prop_data["price_per_sqft"] = prop_data["prc_sqft"] prop_data["nearby_schools"] = filter(None, prop_data["nearby_schools"]) if prop_data["nearby_schools"] else None prop_data["nearby_schools"] = ", ".join(set(prop_data["nearby_schools"])) if prop_data["nearby_schools"] else None # Convert datetime objects to strings for CSV (preserve full datetime including time) for date_field in ["list_date", "pending_date", "last_sold_date", "last_status_change_date"]: if prop_data.get(date_field): prop_data[date_field] = prop_data[date_field].strftime("%Y-%m-%d %H:%M:%S") if hasattr(prop_data[date_field], 'strftime') else prop_data[date_field] # Convert HttpUrl objects to strings for CSV if prop_data.get("property_url"): prop_data["property_url"] = str(prop_data["property_url"]) description = result.description if description: prop_data["primary_photo"] = str(description.primary_photo) if description.primary_photo else None prop_data["alt_photos"] = ", ".join(str(url) for url in description.alt_photos) if description.alt_photos else None prop_data["style"] = ( description.style if isinstance(description.style, str) else description.style.value if description.style else None ) prop_data["beds"] = description.beds prop_data["full_baths"] = description.baths_full prop_data["half_baths"] = description.baths_half prop_data["sqft"] = description.sqft prop_data["lot_sqft"] = description.lot_sqft prop_data["sold_price"] = description.sold_price prop_data["year_built"] = description.year_built prop_data["parking_garage"] = description.garage prop_data["stories"] = description.stories prop_data["text"] = description.text properties_df = pd.DataFrame([prop_data]) properties_df = properties_df.reindex(columns=ordered_properties) return properties_df[ordered_properties] def validate_input(listing_type: str | list[str] | None) -> None: if listing_type is None: return # None is valid - returns all types if isinstance(listing_type, list): for lt in listing_type: if lt.upper() not in ListingType.__members__: raise InvalidListingType(f"Provided listing type, '{lt}', does not exist.") else: if listing_type.upper() not in ListingType.__members__: raise InvalidListingType(f"Provided listing type, '{listing_type}', does not exist.") def validate_dates(date_from: str | None, date_to: str | None) -> None: if isinstance(date_from, str) != isinstance(date_to, str): raise InvalidDate("Both date_from and date_to must be provided together.") if date_from and date_to: try: # Use fromisoformat to accept both date and datetime strings date_from_str = date_from.replace('Z', '+00:00') if date_from.endswith('Z') else date_from date_to_str = date_to.replace('Z', '+00:00') if date_to.endswith('Z') else date_to date_from_obj = datetime.fromisoformat(date_from_str) date_to_obj = datetime.fromisoformat(date_to_str) if date_to_obj < date_from_obj: raise InvalidDate(f"date_to ('{date_to}') must be after date_from ('{date_from}').") except ValueError as e: # Provide specific guidance on the expected format raise InvalidDate( f"Invalid date format. Expected ISO 8601 format. " f"Examples: '2025-01-20' (date only) or '2025-01-20T14:30:00' (with time). " f"Got: date_from='{date_from}', date_to='{date_to}'. Error: {e}" ) def validate_limit(limit: int) -> None: #: 1 -> 10000 limit if limit is not None and (limit < 1 or limit > 10000): raise ValueError("Property limit must be between 1 and 10,000.") def validate_offset(offset: int, limit: int = 10000) -> None: """Validate offset parameter for pagination. Args: offset: Starting position for results pagination limit: Maximum number of results to fetch Raises: ValueError: If offset is invalid or if offset + limit exceeds API limit """ if offset is not None and offset < 0: raise ValueError("Offset must be non-negative (>= 0).") # Check if offset + limit exceeds API's hard limit of 10,000 if offset is not None and limit is not None and (offset + limit) > 10000: raise ValueError( f"offset ({offset}) + limit ({limit}) = {offset + limit} exceeds API maximum of 10,000. " f"The API cannot return results beyond position 10,000. " f"To fetch more results, narrow your search." ) # Warn if offset is not a multiple of 200 (API page size) if offset is not None and offset > 0 and offset % 200 != 0: warnings.warn( f"Offset should be a multiple of 200 (page size) for optimal performance. " f"Using offset {offset} may result in less efficient pagination.", UserWarning ) def validate_datetime(datetime_value) -> None: """Validate datetime value (accepts datetime objects or ISO 8601 strings).""" if datetime_value is None: return # Already a datetime object - valid from datetime import datetime as dt, date if isinstance(datetime_value, (dt, date)): return # Must be a string - validate ISO 8601 format if not isinstance(datetime_value, str): raise InvalidDate( f"Invalid datetime value. Expected datetime object, date object, or ISO 8601 string. " f"Got: {type(datetime_value).__name__}" ) try: # Try parsing as ISO 8601 datetime datetime.fromisoformat(datetime_value.replace('Z', '+00:00')) except (ValueError, AttributeError): raise InvalidDate( f"Invalid datetime format: '{datetime_value}'. " f"Expected ISO 8601 format (e.g., '2025-01-20T14:30:00' or '2025-01-20')." ) def validate_last_update_filters(updated_since: str | None, updated_in_past_hours: int | None) -> None: """Validate last_update_date filtering parameters.""" if updated_since and updated_in_past_hours: raise ValueError( "Cannot use both 'updated_since' and 'updated_in_past_hours' parameters together. " "Please use only one method to filter by last_update_date." ) # Validate updated_since format if provided if updated_since: validate_datetime(updated_since) # Validate updated_in_past_hours range if provided if updated_in_past_hours is not None: if updated_in_past_hours < 1: raise ValueError( f"updated_in_past_hours must be at least 1. Got: {updated_in_past_hours}" ) def validate_filters( beds_min: int | None = None, beds_max: int | None = None, baths_min: float | None = None, baths_max: float | None = None, sqft_min: int | None = None, sqft_max: int | None = None, price_min: int | None = None, price_max: int | None = None, lot_sqft_min: int | None = None, lot_sqft_max: int | None = None, year_built_min: int | None = None, year_built_max: int | None = None, ) -> None: """Validate that min values are less than max values for range filters.""" ranges = [ ("beds", beds_min, beds_max), ("baths", baths_min, baths_max), ("sqft", sqft_min, sqft_max), ("price", price_min, price_max), ("lot_sqft", lot_sqft_min, lot_sqft_max), ("year_built", year_built_min, year_built_max), ] for name, min_val, max_val in ranges: if min_val is not None and max_val is not None and min_val > max_val: raise ValueError(f"{name}_min ({min_val}) cannot be greater than {name}_max ({max_val}).") def validate_sort(sort_by: str | None, sort_direction: str | None = "desc") -> None: """Validate sort parameters.""" valid_sort_fields = ["list_date", "sold_date", "list_price", "sqft", "beds", "baths", "last_update_date"] valid_directions = ["asc", "desc"] if sort_by and sort_by not in valid_sort_fields: raise ValueError( f"Invalid sort_by value: '{sort_by}'. " f"Valid options: {', '.join(valid_sort_fields)}" ) if sort_direction and sort_direction not in valid_directions: raise ValueError( f"Invalid sort_direction value: '{sort_direction}'. " f"Valid options: {', '.join(valid_directions)}" ) def convert_to_datetime_string(value) -> str | None: """ Convert datetime object or string to ISO 8601 string format. Accepts: - datetime.datetime objects - datetime.date objects - ISO 8601 strings (returned as-is) - None (returns None) Returns ISO 8601 formatted string or None. """ if value is None: return None # Already a string - return as-is if isinstance(value, str): return value # datetime.datetime object from datetime import datetime, date if isinstance(value, datetime): return value.isoformat() # datetime.date object (convert to datetime at midnight) if isinstance(value, date): return datetime.combine(value, datetime.min.time()).isoformat() raise ValueError( f"Invalid datetime value. Expected datetime object, date object, or ISO 8601 string. " f"Got: {type(value).__name__}" ) def extract_timedelta_hours(value) -> int | None: """ Extract hours from int or timedelta object. Accepts: - int (returned as-is) - timedelta objects (converted to total hours) - None (returns None) Returns integer hours or None. """ if value is None: return None # Already an int - return as-is if isinstance(value, int): return value # timedelta object - convert to hours from datetime import timedelta if isinstance(value, timedelta): return int(value.total_seconds() / 3600) raise ValueError( f"Invalid past_hours value. Expected int or timedelta object. " f"Got: {type(value).__name__}" ) def extract_timedelta_days(value) -> int | None: """ Extract days from int or timedelta object. Accepts: - int (returned as-is) - timedelta objects (converted to total days) - None (returns None) Returns integer days or None. """ if value is None: return None # Already an int - return as-is if isinstance(value, int): return value # timedelta object - convert to days from datetime import timedelta if isinstance(value, timedelta): return int(value.total_seconds() / 86400) # 86400 seconds in a day raise ValueError( f"Invalid past_days value. Expected int or timedelta object. " f"Got: {type(value).__name__}" ) def detect_precision_and_convert(value): """ Detect if input has time precision and convert to ISO string. Accepts: - datetime.datetime objects → (ISO string, "hour") - datetime.date objects → (ISO string at midnight, "day") - ISO 8601 datetime strings with time → (string as-is, "hour") - Date-only strings "YYYY-MM-DD" → (string as-is, "day") - None → (None, None) Returns: tuple: (iso_string, precision) where precision is "day" or "hour" """ if value is None: return (None, None) from datetime import datetime as dt, date # datetime.datetime object - has time precision if isinstance(value, dt): return (value.isoformat(), "hour") # datetime.date object - day precision only if isinstance(value, date): # Convert to datetime at midnight return (dt.combine(value, dt.min.time()).isoformat(), "day") # String - detect if it has time component if isinstance(value, str): # ISO 8601 datetime with time component (has 'T' and time) if 'T' in value: return (value, "hour") # Date-only string else: return (value, "day") raise ValueError( f"Invalid date value. Expected datetime object, date object, or ISO 8601 string. " f"Got: {type(value).__name__}" )