diff --git a/homeharvest/core/scrapers/models.py b/homeharvest/core/scrapers/models.py index 5cf144b..5db299d 100644 --- a/homeharvest/core/scrapers/models.py +++ b/homeharvest/core/scrapers/models.py @@ -146,6 +146,7 @@ class Agent(Entity): phones: list[dict] | AgentPhone | None = None email: str | None = None href: str | None = None + state_license: str | None = Field(None, description="Advertiser agent state license number") class Office(Entity): @@ -197,7 +198,7 @@ class Property(BaseModel): days_on_mls: int | None = Field(None, description="An integer value determined by the MLS to calculate days on market") description: Description | None = None tags: list[str] | None = None - details: list[dict] | None = None + details: list[HomeDetails] | None = None latitude: float | None = None longitude: float | None = None @@ -208,7 +209,7 @@ class Property(BaseModel): assessed_value: int | None = None estimated_value: int | None = None tax: int | None = None - tax_history: list[dict] | None = None + tax_history: list[TaxHistory] | None = None advertisers: Advertisers | None = None @@ -228,7 +229,7 @@ class Property(BaseModel): tax_record: TaxRecord | None = None parcel_info: dict | None = None # Keep as dict for flexibility current_estimates: list[PropertyEstimate] | None = None - estimates: dict | None = None # Keep as dict for flexibility + estimates: HomeEstimates | None = None photos: list[dict] | None = None # Keep as dict for photo structure flags: HomeFlags | None = Field(None, description="Home flags for Listing/Property") @@ -294,6 +295,22 @@ class Popularity(BaseModel): periods: list[PopularityPeriod] | None = None +class Assessment(BaseModel): + building: int | None = None + land: int | None = None + total: int | None = None + + +class TaxHistory(BaseModel): + assessment: Assessment | None = None + market: Assessment | None = Field(None, description="Market values as provided by the county or local taxing/assessment authority") + appraisal: Assessment | None = Field(None, description="Appraised value given by taxing authority") + value: Assessment | None = Field(None, description="Value closest to current market value used for assessment by county or local taxing authorities") + tax: int | None = None + year: int | None = None + assessed_year: int | None = Field(None, description="Assessment year for which taxes were billed") + + class TaxRecord(BaseModel): cl_id: str | None = None public_record_id: str | None = None @@ -302,12 +319,22 @@ class TaxRecord(BaseModel): tax_parcel_id: str | None = None +class EstimateSource(BaseModel): + type: str | None = Field(None, description="Type of the avm vendor, list of values: corelogic, collateral, quantarium") + name: str | None = Field(None, description="Name of the avm vendor") + + class PropertyEstimate(BaseModel): - estimate: int | None = None - estimate_high: int | None = None - estimate_low: int | None = None - date: datetime | None = None + estimate: int | None = Field(None, description="Estimated value of a property") + estimate_high: int | None = Field(None, description="Estimated high value of a property") + estimate_low: int | None = Field(None, description="Estimated low value of a property") + date: datetime | None = Field(None, description="Date of estimation") is_best_home_value: bool | None = None + source: EstimateSource | None = Field(None, description="Source of the latest estimate value") + + +class HomeEstimates(BaseModel): + current_values: list[PropertyEstimate] | None = Field(None, description="Current valuation and best value for home from multiple AVM vendors") class PropertyDetails(BaseModel): @@ -316,6 +343,12 @@ class PropertyDetails(BaseModel): parent_category: str | None = None +class HomeDetails(BaseModel): + category: str | None = None + text: list[str] | None = None + parent_category: str | None = None + + class UnitDescription(BaseModel): baths_consolidated: str | None = None baths: float | None = None # Changed to float to handle values like 2.5 diff --git a/homeharvest/core/scrapers/realtor/__init__.py b/homeharvest/core/scrapers/realtor/__init__.py index d19683a..85adec4 100644 --- a/homeharvest/core/scrapers/realtor/__init__.py +++ b/homeharvest/core/scrapers/realtor/__init__.py @@ -11,7 +11,7 @@ import json from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from json import JSONDecodeError -from typing import Dict, Union, Optional +from typing import Dict, Union from tenacity import ( retry, @@ -23,18 +23,15 @@ from tenacity import ( from .. import Scraper from ..models import ( Property, - Address, ListingType, - Description, - PropertyType, - Agent, - Broker, - Builder, - Advertisers, - Office, ReturnType ) from .queries import GENERAL_RESULTS_QUERY, SEARCH_HOMES_DATA, HOMES_DATA, HOME_FRAGMENT +from .processors import ( + process_property, + process_extra_property_details, + get_key +) class RealtorScraper(Scraper): @@ -122,140 +119,12 @@ class RealtorScraper(Scraper): property_info = response_json["data"]["home"] if self.return_type != ReturnType.raw: - return [self.process_property(property_info)] + return [process_property(property_info, self.mls_only, self.extra_property_data, + self.exclude_pending, self.listing_type, get_key, process_extra_property_details)] else: return [property_info] - @staticmethod - def process_advertisers(advertisers: list[dict] | None) -> Advertisers | None: - if not advertisers: - return None - def _parse_fulfillment_id(fulfillment_id: str | None) -> str | None: - return fulfillment_id if fulfillment_id and fulfillment_id != "0" else None - - processed_advertisers = Advertisers() - - for advertiser in advertisers: - advertiser_type = advertiser.get("type") - if advertiser_type == "seller": #: agent - processed_advertisers.agent = Agent( - uuid=_parse_fulfillment_id(advertiser.get("fulfillment_id")), - nrds_id=advertiser.get("nrds_id"), - mls_set=advertiser.get("mls_set"), - name=advertiser.get("name"), - email=advertiser.get("email"), - phones=advertiser.get("phones"), - ) - - if advertiser.get("broker") and advertiser["broker"].get("name"): #: has a broker - processed_advertisers.broker = Broker( - uuid=_parse_fulfillment_id(advertiser["broker"].get("fulfillment_id")), - name=advertiser["broker"].get("name"), - ) - - if advertiser.get("office"): #: has an office - processed_advertisers.office = Office( - uuid=_parse_fulfillment_id(advertiser["office"].get("fulfillment_id")), - mls_set=advertiser["office"].get("mls_set"), - name=advertiser["office"].get("name"), - email=advertiser["office"].get("email"), - phones=advertiser["office"].get("phones"), - ) - - if advertiser_type == "community": #: could be builder - if advertiser.get("builder"): - processed_advertisers.builder = Builder( - uuid=_parse_fulfillment_id(advertiser["builder"].get("fulfillment_id")), - name=advertiser["builder"].get("name"), - ) - - return processed_advertisers - - def process_property(self, result: dict) -> Property | None: - mls = result["source"].get("id") if "source" in result and isinstance(result["source"], dict) else None - - if not mls and self.mls_only: - return - - able_to_get_lat_long = ( - result - and result.get("location") - and result["location"].get("address") - and result["location"]["address"].get("coordinate") - ) - - is_pending = result["flags"].get("is_pending") - is_contingent = result["flags"].get("is_contingent") - - if (is_pending or is_contingent) and (self.exclude_pending and self.listing_type != ListingType.PENDING): - return - - property_id = result["property_id"] - prop_details = self.process_extra_property_details(result) if self.extra_property_data else {} - - property_estimates_root = result.get("current_estimates") or result.get("estimates", {}).get("currentValues") - estimated_value = self.get_key(property_estimates_root, [0, "estimate"]) - - advertisers = self.process_advertisers(result.get("advertisers")) - - realty_property = Property( - mls=mls, - mls_id=( - result["source"].get("listing_id") - if "source" in result and isinstance(result["source"], dict) - else None - ), - property_url=result["href"], - property_id=property_id, - listing_id=result.get("listing_id"), - permalink=result.get("permalink"), - status=("PENDING" if is_pending else "CONTINGENT" if is_contingent else result["status"].upper()), - list_price=result["list_price"], - list_price_min=result["list_price_min"], - list_price_max=result["list_price_max"], - list_date=(datetime.fromisoformat(result["list_date"].split("T")[0]) if result.get("list_date") else None), - prc_sqft=result.get("price_per_sqft"), - last_sold_date=(datetime.fromisoformat(result["last_sold_date"]) if result.get("last_sold_date") else None), - pending_date=(datetime.fromisoformat(result["pending_date"].split("T")[0]) if result.get("pending_date") else None), - new_construction=result["flags"].get("is_new_construction") is True, - hoa_fee=(result["hoa"]["fee"] if result.get("hoa") and isinstance(result["hoa"], dict) else None), - latitude=(result["location"]["address"]["coordinate"].get("lat") if able_to_get_lat_long else None), - longitude=(result["location"]["address"]["coordinate"].get("lon") if able_to_get_lat_long else None), - address=self._parse_address(result, search_type="general_search"), - description=self._parse_description(result), - neighborhoods=self._parse_neighborhoods(result), - county=(result["location"]["county"].get("name") if result["location"]["county"] else None), - fips_code=(result["location"]["county"].get("fips_code") if result["location"]["county"] else None), - days_on_mls=self.calculate_days_on_mls(result), - nearby_schools=prop_details.get("schools"), - assessed_value=prop_details.get("assessed_value"), - estimated_value=estimated_value if estimated_value else None, - advertisers=advertisers, - tax=prop_details.get("tax"), - tax_history=prop_details.get("tax_history"), - - # Additional fields from GraphQL - mls_status=result.get("mls_status"), - last_sold_price=result.get("last_sold_price"), - tags=result.get("tags"), - details=result.get("details"), - open_houses=self._parse_open_houses(result.get("open_houses")), - pet_policy=result.get("pet_policy"), - units=self._parse_units(result.get("units")), - monthly_fees=result.get("monthly_fees"), - one_time_fees=result.get("one_time_fees"), - parking=result.get("parking"), - terms=result.get("terms"), - popularity=result.get("popularity"), - tax_record=self._parse_tax_record(result.get("tax_record")), - parcel_info=result.get("location", {}).get("parcel"), - current_estimates=self._parse_current_estimates(result.get("current_estimates")), - estimates=result.get("estimates"), - photos=result.get("photos"), - flags=result.get("flags"), - ) - return realty_property def general_search(self, variables: dict, search_type: str) -> Dict[str, Union[int, Union[list[Property], list[dict]]]]: """ @@ -425,7 +294,8 @@ class RealtorScraper(Scraper): if self.return_type != ReturnType.raw: with ThreadPoolExecutor(max_workers=self.NUM_PROPERTY_WORKERS) as executor: - futures = [executor.submit(self.process_property, result) for result in properties_list] + futures = [executor.submit(process_property, result, self.mls_only, self.extra_property_data, + self.exclude_pending, self.listing_type, get_key, process_extra_property_details) for result in properties_list] for future in as_completed(futures): result = future.result() @@ -510,54 +380,7 @@ class RealtorScraper(Scraper): return homes - @staticmethod - def get_key(data: dict, keys: list): - try: - value = data - for key in keys: - value = value[key] - return value or {} - except (KeyError, TypeError, IndexError): - return {} - - def process_extra_property_details(self, result: dict) -> dict: - schools = self.get_key(result, ["nearbySchools", "schools"]) - assessed_value = self.get_key(result, ["taxHistory", 0, "assessment", "total"]) - tax_history = self.get_key(result, ["taxHistory"]) - - schools = [school["district"]["name"] for school in schools if school["district"].get("name")] - - # Process tax history - latest_tax = None - processed_tax_history = None - if tax_history and isinstance(tax_history, list): - tax_history = sorted(tax_history, key=lambda x: x.get("year", 0), reverse=True) - - if tax_history and "tax" in tax_history[0]: - latest_tax = tax_history[0]["tax"] - - processed_tax_history = [] - for entry in tax_history: - if "year" in entry and "tax" in entry: - processed_entry = { - "year": entry["year"], - "tax": entry["tax"], - } - if "assessment" in entry and isinstance(entry["assessment"], dict): - processed_entry["assessment"] = { - "building": entry["assessment"].get("building"), - "land": entry["assessment"].get("land"), - "total": entry["assessment"].get("total"), - } - processed_tax_history.append(processed_entry) - - return { - "schools": schools if schools else None, - "assessed_value": assessed_value if assessed_value else None, - "tax": latest_tax, - "tax_history": processed_tax_history, - } @retry( retry=retry_if_exception_type(JSONDecodeError), @@ -594,213 +417,4 @@ class RealtorScraper(Scraper): properties = data["data"] return {data.replace('home_', ''): properties[data] for data in properties if properties[data]} - @staticmethod - def _parse_neighborhoods(result: dict) -> Optional[str]: - neighborhoods_list = [] - neighborhoods = result["location"].get("neighborhoods", []) - if neighborhoods: - for neighborhood in neighborhoods: - name = neighborhood.get("name") - if name: - neighborhoods_list.append(name) - - return ", ".join(neighborhoods_list) if neighborhoods_list else None - - @staticmethod - def handle_none_safely(address_part): - if address_part is None: - return "" - - return address_part - - @staticmethod - def _parse_address(result: dict, search_type): - if search_type == "general_search": - address = result["location"]["address"] - else: - address = result["address"] - - return Address( - full_line=address.get("line"), - street=" ".join( - part - for part in [ - address.get("street_number"), - address.get("street_direction"), - address.get("street_name"), - address.get("street_suffix"), - ] - if part is not None - ).strip(), - unit=address["unit"], - city=address["city"], - state=address["state_code"], - zip=address["postal_code"], - - # Additional address fields - street_direction=address.get("street_direction"), - street_number=address.get("street_number"), - street_name=address.get("street_name"), - street_suffix=address.get("street_suffix"), - ) - - @staticmethod - def _parse_description(result: dict) -> Description | None: - if not result: - return None - - description_data = result.get("description", {}) - - if description_data is None or not isinstance(description_data, dict): - description_data = {} - - style = description_data.get("type", "") - if style is not None: - style = style.upper() - - primary_photo = None - if (primary_photo_info := result.get("primary_photo")) and ( - primary_photo_href := primary_photo_info.get("href") - ): - primary_photo = primary_photo_href.replace("s.jpg", "od-w480_h360_x2.webp?w=1080&q=75") - - return Description( - primary_photo=primary_photo, - alt_photos=RealtorScraper.process_alt_photos(result.get("photos", [])), - style=(PropertyType.__getitem__(style) if style and style in PropertyType.__members__ else None), - beds=description_data.get("beds"), - baths_full=description_data.get("baths_full"), - baths_half=description_data.get("baths_half"), - sqft=description_data.get("sqft"), - lot_sqft=description_data.get("lot_sqft"), - sold_price=( - result.get("last_sold_price") or description_data.get("sold_price") - if result.get("last_sold_date") or result["list_price"] != description_data.get("sold_price") - else None - ), #: has a sold date or list and sold price are different - year_built=description_data.get("year_built"), - garage=description_data.get("garage"), - stories=description_data.get("stories"), - text=description_data.get("text"), - - # Additional description fields - name=description_data.get("name"), - type=description_data.get("type"), - ) - - @staticmethod - def calculate_days_on_mls(result: dict) -> Optional[int]: - list_date_str = result.get("list_date") - list_date = datetime.strptime(list_date_str.split("T")[0], "%Y-%m-%d") if list_date_str else None - last_sold_date_str = result.get("last_sold_date") - last_sold_date = datetime.strptime(last_sold_date_str, "%Y-%m-%d") if last_sold_date_str else None - today = datetime.now() - - if list_date: - if result["status"] == "sold": - if last_sold_date: - days = (last_sold_date - list_date).days - if days >= 0: - return days - elif result["status"] in ("for_sale", "for_rent"): - days = (today - list_date).days - if days >= 0: - return days - - @staticmethod - def process_alt_photos(photos_info: list[dict]) -> list[str] | None: - if not photos_info: - return None - - return [ - photo_info["href"].replace("s.jpg", "od-w480_h360_x2.webp?w=1080&q=75") - for photo_info in photos_info - if photo_info.get("href") - ] - - @staticmethod - def _parse_open_houses(open_houses_data: list[dict] | None) -> list[dict] | None: - """Parse open houses data and convert date strings to datetime objects""" - if not open_houses_data: - return None - - parsed_open_houses = [] - for oh in open_houses_data: - parsed_oh = oh.copy() - - # Parse start_date and end_date - if parsed_oh.get("start_date"): - try: - parsed_oh["start_date"] = datetime.fromisoformat(parsed_oh["start_date"].replace("Z", "+00:00")) - except (ValueError, AttributeError): - parsed_oh["start_date"] = None - - if parsed_oh.get("end_date"): - try: - parsed_oh["end_date"] = datetime.fromisoformat(parsed_oh["end_date"].replace("Z", "+00:00")) - except (ValueError, AttributeError): - parsed_oh["end_date"] = None - - parsed_open_houses.append(parsed_oh) - - return parsed_open_houses - - @staticmethod - def _parse_units(units_data: list[dict] | None) -> list[dict] | None: - """Parse units data and convert date strings to datetime objects""" - if not units_data: - return None - - parsed_units = [] - for unit in units_data: - parsed_unit = unit.copy() - - # Parse availability date - if parsed_unit.get("availability") and parsed_unit["availability"].get("date"): - try: - parsed_unit["availability"]["date"] = datetime.fromisoformat(parsed_unit["availability"]["date"].replace("Z", "+00:00")) - except (ValueError, AttributeError): - parsed_unit["availability"]["date"] = None - - parsed_units.append(parsed_unit) - - return parsed_units - - @staticmethod - def _parse_tax_record(tax_record_data: dict | None) -> dict | None: - """Parse tax record data and convert date strings to datetime objects""" - if not tax_record_data: - return None - - parsed_tax_record = tax_record_data.copy() - - # Parse last_update_date - if parsed_tax_record.get("last_update_date"): - try: - parsed_tax_record["last_update_date"] = datetime.fromisoformat(parsed_tax_record["last_update_date"].replace("Z", "+00:00")) - except (ValueError, AttributeError): - parsed_tax_record["last_update_date"] = None - - return parsed_tax_record - - @staticmethod - def _parse_current_estimates(estimates_data: list[dict] | None) -> list[dict] | None: - """Parse current estimates data and convert date strings to datetime objects""" - if not estimates_data: - return None - - parsed_estimates = [] - for estimate in estimates_data: - parsed_estimate = estimate.copy() - - # Parse date - if parsed_estimate.get("date"): - try: - parsed_estimate["date"] = datetime.fromisoformat(parsed_estimate["date"].replace("Z", "+00:00")) - except (ValueError, AttributeError): - parsed_estimate["date"] = None - - parsed_estimates.append(parsed_estimate) - - return parsed_estimates diff --git a/homeharvest/core/scrapers/realtor/parsers.py b/homeharvest/core/scrapers/realtor/parsers.py new file mode 100644 index 0000000..07905a1 --- /dev/null +++ b/homeharvest/core/scrapers/realtor/parsers.py @@ -0,0 +1,279 @@ +""" +Parsers for realtor.com data processing +""" + +from datetime import datetime +from typing import Optional +from ..models import Address, Description, PropertyType + + +def parse_open_houses(open_houses_data: list[dict] | None) -> list[dict] | None: + """Parse open houses data and convert date strings to datetime objects""" + if not open_houses_data: + return None + + parsed_open_houses = [] + for oh in open_houses_data: + parsed_oh = oh.copy() + + # Parse start_date and end_date + if parsed_oh.get("start_date"): + try: + parsed_oh["start_date"] = datetime.fromisoformat(parsed_oh["start_date"].replace("Z", "+00:00")) + except (ValueError, AttributeError): + parsed_oh["start_date"] = None + + if parsed_oh.get("end_date"): + try: + parsed_oh["end_date"] = datetime.fromisoformat(parsed_oh["end_date"].replace("Z", "+00:00")) + except (ValueError, AttributeError): + parsed_oh["end_date"] = None + + parsed_open_houses.append(parsed_oh) + + return parsed_open_houses + + +def parse_units(units_data: list[dict] | None) -> list[dict] | None: + """Parse units data and convert date strings to datetime objects""" + if not units_data: + return None + + parsed_units = [] + for unit in units_data: + parsed_unit = unit.copy() + + # Parse availability date + if parsed_unit.get("availability") and parsed_unit["availability"].get("date"): + try: + parsed_unit["availability"]["date"] = datetime.fromisoformat(parsed_unit["availability"]["date"].replace("Z", "+00:00")) + except (ValueError, AttributeError): + parsed_unit["availability"]["date"] = None + + parsed_units.append(parsed_unit) + + return parsed_units + + +def parse_tax_record(tax_record_data: dict | None) -> dict | None: + """Parse tax record data and convert date strings to datetime objects""" + if not tax_record_data: + return None + + parsed_tax_record = tax_record_data.copy() + + # Parse last_update_date + if parsed_tax_record.get("last_update_date"): + try: + parsed_tax_record["last_update_date"] = datetime.fromisoformat(parsed_tax_record["last_update_date"].replace("Z", "+00:00")) + except (ValueError, AttributeError): + parsed_tax_record["last_update_date"] = None + + return parsed_tax_record + + +def parse_current_estimates(estimates_data: list[dict] | None) -> list[dict] | None: + """Parse current estimates data and convert date strings to datetime objects""" + if not estimates_data: + return None + + parsed_estimates = [] + for estimate in estimates_data: + parsed_estimate = estimate.copy() + + # Parse date + if parsed_estimate.get("date"): + try: + parsed_estimate["date"] = datetime.fromisoformat(parsed_estimate["date"].replace("Z", "+00:00")) + except (ValueError, AttributeError): + parsed_estimate["date"] = None + + # Parse source information + if parsed_estimate.get("source"): + source_data = parsed_estimate["source"] + parsed_estimate["source"] = { + "type": source_data.get("type"), + "name": source_data.get("name") + } + + parsed_estimates.append(parsed_estimate) + + return parsed_estimates + + +def parse_estimates(estimates_data: dict | None) -> dict | None: + """Parse estimates data and convert date strings to datetime objects""" + if not estimates_data: + return None + + parsed_estimates = estimates_data.copy() + + # Parse current_values (which is aliased as currentValues in GraphQL) + current_values = parsed_estimates.get("currentValues") or parsed_estimates.get("current_values") + if current_values: + parsed_current_values = [] + for estimate in current_values: + parsed_estimate = estimate.copy() + + # Parse date + if parsed_estimate.get("date"): + try: + parsed_estimate["date"] = datetime.fromisoformat(parsed_estimate["date"].replace("Z", "+00:00")) + except (ValueError, AttributeError): + parsed_estimate["date"] = None + + # Parse source information + if parsed_estimate.get("source"): + source_data = parsed_estimate["source"] + parsed_estimate["source"] = { + "type": source_data.get("type"), + "name": source_data.get("name") + } + + # Convert GraphQL aliases to Pydantic field names + if "estimateHigh" in parsed_estimate: + parsed_estimate["estimate_high"] = parsed_estimate.pop("estimateHigh") + if "estimateLow" in parsed_estimate: + parsed_estimate["estimate_low"] = parsed_estimate.pop("estimateLow") + if "isBestHomeValue" in parsed_estimate: + parsed_estimate["is_best_home_value"] = parsed_estimate.pop("isBestHomeValue") + + parsed_current_values.append(parsed_estimate) + + parsed_estimates["current_values"] = parsed_current_values + + # Remove the GraphQL alias if it exists + if "currentValues" in parsed_estimates: + del parsed_estimates["currentValues"] + + return parsed_estimates + + +def parse_neighborhoods(result: dict) -> Optional[str]: + """Parse neighborhoods from location data""" + neighborhoods_list = [] + neighborhoods = result["location"].get("neighborhoods", []) + + if neighborhoods: + for neighborhood in neighborhoods: + name = neighborhood.get("name") + if name: + neighborhoods_list.append(name) + + return ", ".join(neighborhoods_list) if neighborhoods_list else None + + +def handle_none_safely(address_part): + """Handle None values safely for address parts""" + if address_part is None: + return "" + return address_part + + +def parse_address(result: dict, search_type: str) -> Address: + """Parse address data from result""" + if search_type == "general_search": + address = result["location"]["address"] + else: + address = result["address"] + + return Address( + full_line=address.get("line"), + street=" ".join( + part + for part in [ + address.get("street_number"), + address.get("street_direction"), + address.get("street_name"), + address.get("street_suffix"), + ] + if part is not None + ).strip(), + unit=address["unit"], + city=address["city"], + state=address["state_code"], + zip=address["postal_code"], + + # Additional address fields + street_direction=address.get("street_direction"), + street_number=address.get("street_number"), + street_name=address.get("street_name"), + street_suffix=address.get("street_suffix"), + ) + + +def parse_description(result: dict) -> Description | None: + """Parse description data from result""" + if not result: + return None + + description_data = result.get("description", {}) + + if description_data is None or not isinstance(description_data, dict): + description_data = {} + + style = description_data.get("type", "") + if style is not None: + style = style.upper() + + primary_photo = None + if (primary_photo_info := result.get("primary_photo")) and ( + primary_photo_href := primary_photo_info.get("href") + ): + primary_photo = primary_photo_href.replace("s.jpg", "od-w480_h360_x2.webp?w=1080&q=75") + + return Description( + primary_photo=primary_photo, + alt_photos=process_alt_photos(result.get("photos", [])), + style=(PropertyType.__getitem__(style) if style and style in PropertyType.__members__ else None), + beds=description_data.get("beds"), + baths_full=description_data.get("baths_full"), + baths_half=description_data.get("baths_half"), + sqft=description_data.get("sqft"), + lot_sqft=description_data.get("lot_sqft"), + sold_price=( + result.get("last_sold_price") or description_data.get("sold_price") + if result.get("last_sold_date") or result["list_price"] != description_data.get("sold_price") + else None + ), #: has a sold date or list and sold price are different + year_built=description_data.get("year_built"), + garage=description_data.get("garage"), + stories=description_data.get("stories"), + text=description_data.get("text"), + + # Additional description fields + name=description_data.get("name"), + type=description_data.get("type"), + ) + + +def calculate_days_on_mls(result: dict) -> Optional[int]: + """Calculate days on MLS from result data""" + list_date_str = result.get("list_date") + list_date = datetime.strptime(list_date_str.split("T")[0], "%Y-%m-%d") if list_date_str else None + last_sold_date_str = result.get("last_sold_date") + last_sold_date = datetime.strptime(last_sold_date_str, "%Y-%m-%d") if last_sold_date_str else None + today = datetime.now() + + if list_date: + if result["status"] == "sold": + if last_sold_date: + days = (last_sold_date - list_date).days + if days >= 0: + return days + elif result["status"] in ("for_sale", "for_rent"): + days = (today - list_date).days + if days >= 0: + return days + + +def process_alt_photos(photos_info: list[dict]) -> list[str] | None: + """Process alternative photos from photos info""" + if not photos_info: + return None + + return [ + photo_info["href"].replace("s.jpg", "od-w480_h360_x2.webp?w=1080&q=75") + for photo_info in photos_info + if photo_info.get("href") + ] \ No newline at end of file diff --git a/homeharvest/core/scrapers/realtor/processors.py b/homeharvest/core/scrapers/realtor/processors.py new file mode 100644 index 0000000..0fc5af6 --- /dev/null +++ b/homeharvest/core/scrapers/realtor/processors.py @@ -0,0 +1,224 @@ +""" +Processors for realtor.com property data processing +""" + +from datetime import datetime +from typing import Optional +from ..models import ( + Property, + ListingType, + Agent, + Broker, + Builder, + Advertisers, + Office, + ReturnType +) +from .parsers import ( + parse_open_houses, + parse_units, + parse_tax_record, + parse_current_estimates, + parse_estimates, + parse_neighborhoods, + parse_address, + parse_description, + calculate_days_on_mls, + process_alt_photos +) + + +def process_advertisers(advertisers: list[dict] | None) -> Advertisers | None: + """Process advertisers data from GraphQL response""" + if not advertisers: + return None + + def _parse_fulfillment_id(fulfillment_id: str | None) -> str | None: + return fulfillment_id if fulfillment_id and fulfillment_id != "0" else None + + processed_advertisers = Advertisers() + + for advertiser in advertisers: + advertiser_type = advertiser.get("type") + if advertiser_type == "seller": #: agent + processed_advertisers.agent = Agent( + uuid=_parse_fulfillment_id(advertiser.get("fulfillment_id")), + nrds_id=advertiser.get("nrds_id"), + mls_set=advertiser.get("mls_set"), + name=advertiser.get("name"), + email=advertiser.get("email"), + phones=advertiser.get("phones"), + state_license=advertiser.get("state_license"), + ) + + if advertiser.get("broker") and advertiser["broker"].get("name"): #: has a broker + processed_advertisers.broker = Broker( + uuid=_parse_fulfillment_id(advertiser["broker"].get("fulfillment_id")), + name=advertiser["broker"].get("name"), + ) + + if advertiser.get("office"): #: has an office + processed_advertisers.office = Office( + uuid=_parse_fulfillment_id(advertiser["office"].get("fulfillment_id")), + mls_set=advertiser["office"].get("mls_set"), + name=advertiser["office"].get("name"), + email=advertiser["office"].get("email"), + phones=advertiser["office"].get("phones"), + ) + + if advertiser_type == "community": #: could be builder + if advertiser.get("builder"): + processed_advertisers.builder = Builder( + uuid=_parse_fulfillment_id(advertiser["builder"].get("fulfillment_id")), + name=advertiser["builder"].get("name"), + ) + + return processed_advertisers + + +def process_property(result: dict, mls_only: bool = False, extra_property_data: bool = False, + exclude_pending: bool = False, listing_type: ListingType = ListingType.FOR_SALE, + get_key_func=None, process_extra_property_details_func=None) -> Property | None: + """Process property data from GraphQL response""" + mls = result["source"].get("id") if "source" in result and isinstance(result["source"], dict) else None + + if not mls and mls_only: + return None + + able_to_get_lat_long = ( + result + and result.get("location") + and result["location"].get("address") + and result["location"]["address"].get("coordinate") + ) + + is_pending = result["flags"].get("is_pending") + is_contingent = result["flags"].get("is_contingent") + + if (is_pending or is_contingent) and (exclude_pending and listing_type != ListingType.PENDING): + return None + + property_id = result["property_id"] + prop_details = process_extra_property_details_func(result) if extra_property_data and process_extra_property_details_func else {} + + property_estimates_root = result.get("current_estimates") or result.get("estimates", {}).get("currentValues") + estimated_value = get_key_func(property_estimates_root, [0, "estimate"]) if get_key_func else None + + advertisers = process_advertisers(result.get("advertisers")) + + realty_property = Property( + mls=mls, + mls_id=( + result["source"].get("listing_id") + if "source" in result and isinstance(result["source"], dict) + else None + ), + property_url=result["href"], + property_id=property_id, + listing_id=result.get("listing_id"), + permalink=result.get("permalink"), + status=("PENDING" if is_pending else "CONTINGENT" if is_contingent else result["status"].upper()), + list_price=result["list_price"], + list_price_min=result["list_price_min"], + list_price_max=result["list_price_max"], + list_date=(datetime.fromisoformat(result["list_date"].split("T")[0]) if result.get("list_date") else None), + prc_sqft=result.get("price_per_sqft"), + last_sold_date=(datetime.fromisoformat(result["last_sold_date"]) if result.get("last_sold_date") else None), + pending_date=(datetime.fromisoformat(result["pending_date"].split("T")[0]) if result.get("pending_date") else None), + new_construction=result["flags"].get("is_new_construction") is True, + hoa_fee=(result["hoa"]["fee"] if result.get("hoa") and isinstance(result["hoa"], dict) else None), + latitude=(result["location"]["address"]["coordinate"].get("lat") if able_to_get_lat_long else None), + longitude=(result["location"]["address"]["coordinate"].get("lon") if able_to_get_lat_long else None), + address=parse_address(result, search_type="general_search"), + description=parse_description(result), + neighborhoods=parse_neighborhoods(result), + county=(result["location"]["county"].get("name") if result["location"]["county"] else None), + fips_code=(result["location"]["county"].get("fips_code") if result["location"]["county"] else None), + days_on_mls=calculate_days_on_mls(result), + nearby_schools=prop_details.get("schools"), + assessed_value=prop_details.get("assessed_value"), + estimated_value=estimated_value if estimated_value else None, + advertisers=advertisers, + tax=prop_details.get("tax"), + tax_history=prop_details.get("tax_history"), + + # Additional fields from GraphQL + mls_status=result.get("mls_status"), + last_sold_price=result.get("last_sold_price"), + tags=result.get("tags"), + details=result.get("details"), + open_houses=parse_open_houses(result.get("open_houses")), + pet_policy=result.get("pet_policy"), + units=parse_units(result.get("units")), + monthly_fees=result.get("monthly_fees"), + one_time_fees=result.get("one_time_fees"), + parking=result.get("parking"), + terms=result.get("terms"), + popularity=result.get("popularity"), + tax_record=parse_tax_record(result.get("tax_record")), + parcel_info=result.get("location", {}).get("parcel"), + current_estimates=parse_current_estimates(result.get("current_estimates")), + estimates=parse_estimates(result.get("estimates")), + photos=result.get("photos"), + flags=result.get("flags"), + ) + return realty_property + + +def process_extra_property_details(result: dict, get_key_func=None) -> dict: + """Process extra property details from GraphQL response""" + if get_key_func: + schools = get_key_func(result, ["nearbySchools", "schools"]) + assessed_value = get_key_func(result, ["taxHistory", 0, "assessment", "total"]) + tax_history = get_key_func(result, ["taxHistory"]) + else: + nearby_schools = result.get("nearbySchools") + schools = nearby_schools.get("schools", []) if nearby_schools else [] + tax_history_data = result.get("taxHistory", []) + assessed_value = tax_history_data[0]["assessment"]["total"] if tax_history_data and tax_history_data[0].get("assessment", {}).get("total") else None + tax_history = tax_history_data + + if schools: + schools = [school["district"]["name"] for school in schools if school["district"].get("name")] + + # Process tax history + latest_tax = None + processed_tax_history = None + if tax_history and isinstance(tax_history, list): + tax_history = sorted(tax_history, key=lambda x: x.get("year", 0), reverse=True) + + if tax_history and "tax" in tax_history[0]: + latest_tax = tax_history[0]["tax"] + + processed_tax_history = [] + for entry in tax_history: + if "year" in entry and "tax" in entry: + processed_entry = { + "year": entry["year"], + "tax": entry["tax"], + } + if "assessment" in entry and isinstance(entry["assessment"], dict): + processed_entry["assessment"] = { + "building": entry["assessment"].get("building"), + "land": entry["assessment"].get("land"), + "total": entry["assessment"].get("total"), + } + processed_tax_history.append(processed_entry) + + return { + "schools": schools if schools else None, + "assessed_value": assessed_value if assessed_value else None, + "tax": latest_tax, + "tax_history": processed_tax_history, + } + + +def get_key(data: dict, keys: list): + """Get nested key from dictionary safely""" + try: + value = data + for key in keys: + value = value[key] + return value or {} + except (KeyError, TypeError, IndexError): + return {} \ No newline at end of file diff --git a/homeharvest/utils.py b/homeharvest/utils.py index 6a18c76..2a1c505 100644 --- a/homeharvest/utils.py +++ b/homeharvest/utils.py @@ -15,13 +15,13 @@ ordered_properties = [ "mls_status", "text", "style", + "formatted_address", "full_street_line", "street", "unit", "city", "state", "zip_code", - "formatted_address", "beds", "full_baths", "half_baths", diff --git a/tests/test_realtor.py b/tests/test_realtor.py index 05f0924..1f29f52 100644 --- a/tests/test_realtor.py +++ b/tests/test_realtor.py @@ -382,4 +382,4 @@ def test_return_type_consistency(): # All return types should have some properties assert len(pandas_ids) > 0, f"pandas should return properties for {search_type}" assert len(pydantic_ids) > 0, f"pydantic should return properties for {search_type}" - assert len(raw_ids) > 0, f"raw should return properties for {search_type}" + assert len(raw_ids) > 0, f"raw should return properties for {search_type}" \ No newline at end of file