- data quality and clean up code

This commit is contained in:
Zachary Hampton
2025-07-15 13:51:47 -07:00
parent 6c6243eba4
commit 145c337b55
6 changed files with 555 additions and 405 deletions

View File

@@ -146,6 +146,7 @@ class Agent(Entity):
phones: list[dict] | AgentPhone | None = None
email: str | None = None
href: str | None = None
state_license: str | None = Field(None, description="Advertiser agent state license number")
class Office(Entity):
@@ -197,7 +198,7 @@ class Property(BaseModel):
days_on_mls: int | None = Field(None, description="An integer value determined by the MLS to calculate days on market")
description: Description | None = None
tags: list[str] | None = None
details: list[dict] | None = None
details: list[HomeDetails] | None = None
latitude: float | None = None
longitude: float | None = None
@@ -208,7 +209,7 @@ class Property(BaseModel):
assessed_value: int | None = None
estimated_value: int | None = None
tax: int | None = None
tax_history: list[dict] | None = None
tax_history: list[TaxHistory] | None = None
advertisers: Advertisers | None = None
@@ -228,7 +229,7 @@ class Property(BaseModel):
tax_record: TaxRecord | None = None
parcel_info: dict | None = None # Keep as dict for flexibility
current_estimates: list[PropertyEstimate] | None = None
estimates: dict | None = None # Keep as dict for flexibility
estimates: HomeEstimates | None = None
photos: list[dict] | None = None # Keep as dict for photo structure
flags: HomeFlags | None = Field(None, description="Home flags for Listing/Property")
@@ -294,6 +295,22 @@ class Popularity(BaseModel):
periods: list[PopularityPeriod] | None = None
class Assessment(BaseModel):
building: int | None = None
land: int | None = None
total: int | None = None
class TaxHistory(BaseModel):
assessment: Assessment | None = None
market: Assessment | None = Field(None, description="Market values as provided by the county or local taxing/assessment authority")
appraisal: Assessment | None = Field(None, description="Appraised value given by taxing authority")
value: Assessment | None = Field(None, description="Value closest to current market value used for assessment by county or local taxing authorities")
tax: int | None = None
year: int | None = None
assessed_year: int | None = Field(None, description="Assessment year for which taxes were billed")
class TaxRecord(BaseModel):
cl_id: str | None = None
public_record_id: str | None = None
@@ -302,12 +319,22 @@ class TaxRecord(BaseModel):
tax_parcel_id: str | None = None
class EstimateSource(BaseModel):
type: str | None = Field(None, description="Type of the avm vendor, list of values: corelogic, collateral, quantarium")
name: str | None = Field(None, description="Name of the avm vendor")
class PropertyEstimate(BaseModel):
estimate: int | None = None
estimate_high: int | None = None
estimate_low: int | None = None
date: datetime | None = None
estimate: int | None = Field(None, description="Estimated value of a property")
estimate_high: int | None = Field(None, description="Estimated high value of a property")
estimate_low: int | None = Field(None, description="Estimated low value of a property")
date: datetime | None = Field(None, description="Date of estimation")
is_best_home_value: bool | None = None
source: EstimateSource | None = Field(None, description="Source of the latest estimate value")
class HomeEstimates(BaseModel):
current_values: list[PropertyEstimate] | None = Field(None, description="Current valuation and best value for home from multiple AVM vendors")
class PropertyDetails(BaseModel):
@@ -316,6 +343,12 @@ class PropertyDetails(BaseModel):
parent_category: str | None = None
class HomeDetails(BaseModel):
category: str | None = None
text: list[str] | None = None
parent_category: str | None = None
class UnitDescription(BaseModel):
baths_consolidated: str | None = None
baths: float | None = None # Changed to float to handle values like 2.5

View File

@@ -11,7 +11,7 @@ import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from json import JSONDecodeError
from typing import Dict, Union, Optional
from typing import Dict, Union
from tenacity import (
retry,
@@ -23,18 +23,15 @@ from tenacity import (
from .. import Scraper
from ..models import (
Property,
Address,
ListingType,
Description,
PropertyType,
Agent,
Broker,
Builder,
Advertisers,
Office,
ReturnType
)
from .queries import GENERAL_RESULTS_QUERY, SEARCH_HOMES_DATA, HOMES_DATA, HOME_FRAGMENT
from .processors import (
process_property,
process_extra_property_details,
get_key
)
class RealtorScraper(Scraper):
@@ -122,140 +119,12 @@ class RealtorScraper(Scraper):
property_info = response_json["data"]["home"]
if self.return_type != ReturnType.raw:
return [self.process_property(property_info)]
return [process_property(property_info, self.mls_only, self.extra_property_data,
self.exclude_pending, self.listing_type, get_key, process_extra_property_details)]
else:
return [property_info]
@staticmethod
def process_advertisers(advertisers: list[dict] | None) -> Advertisers | None:
if not advertisers:
return None
def _parse_fulfillment_id(fulfillment_id: str | None) -> str | None:
return fulfillment_id if fulfillment_id and fulfillment_id != "0" else None
processed_advertisers = Advertisers()
for advertiser in advertisers:
advertiser_type = advertiser.get("type")
if advertiser_type == "seller": #: agent
processed_advertisers.agent = Agent(
uuid=_parse_fulfillment_id(advertiser.get("fulfillment_id")),
nrds_id=advertiser.get("nrds_id"),
mls_set=advertiser.get("mls_set"),
name=advertiser.get("name"),
email=advertiser.get("email"),
phones=advertiser.get("phones"),
)
if advertiser.get("broker") and advertiser["broker"].get("name"): #: has a broker
processed_advertisers.broker = Broker(
uuid=_parse_fulfillment_id(advertiser["broker"].get("fulfillment_id")),
name=advertiser["broker"].get("name"),
)
if advertiser.get("office"): #: has an office
processed_advertisers.office = Office(
uuid=_parse_fulfillment_id(advertiser["office"].get("fulfillment_id")),
mls_set=advertiser["office"].get("mls_set"),
name=advertiser["office"].get("name"),
email=advertiser["office"].get("email"),
phones=advertiser["office"].get("phones"),
)
if advertiser_type == "community": #: could be builder
if advertiser.get("builder"):
processed_advertisers.builder = Builder(
uuid=_parse_fulfillment_id(advertiser["builder"].get("fulfillment_id")),
name=advertiser["builder"].get("name"),
)
return processed_advertisers
def process_property(self, result: dict) -> Property | None:
mls = result["source"].get("id") if "source" in result and isinstance(result["source"], dict) else None
if not mls and self.mls_only:
return
able_to_get_lat_long = (
result
and result.get("location")
and result["location"].get("address")
and result["location"]["address"].get("coordinate")
)
is_pending = result["flags"].get("is_pending")
is_contingent = result["flags"].get("is_contingent")
if (is_pending or is_contingent) and (self.exclude_pending and self.listing_type != ListingType.PENDING):
return
property_id = result["property_id"]
prop_details = self.process_extra_property_details(result) if self.extra_property_data else {}
property_estimates_root = result.get("current_estimates") or result.get("estimates", {}).get("currentValues")
estimated_value = self.get_key(property_estimates_root, [0, "estimate"])
advertisers = self.process_advertisers(result.get("advertisers"))
realty_property = Property(
mls=mls,
mls_id=(
result["source"].get("listing_id")
if "source" in result and isinstance(result["source"], dict)
else None
),
property_url=result["href"],
property_id=property_id,
listing_id=result.get("listing_id"),
permalink=result.get("permalink"),
status=("PENDING" if is_pending else "CONTINGENT" if is_contingent else result["status"].upper()),
list_price=result["list_price"],
list_price_min=result["list_price_min"],
list_price_max=result["list_price_max"],
list_date=(datetime.fromisoformat(result["list_date"].split("T")[0]) if result.get("list_date") else None),
prc_sqft=result.get("price_per_sqft"),
last_sold_date=(datetime.fromisoformat(result["last_sold_date"]) if result.get("last_sold_date") else None),
pending_date=(datetime.fromisoformat(result["pending_date"].split("T")[0]) if result.get("pending_date") else None),
new_construction=result["flags"].get("is_new_construction") is True,
hoa_fee=(result["hoa"]["fee"] if result.get("hoa") and isinstance(result["hoa"], dict) else None),
latitude=(result["location"]["address"]["coordinate"].get("lat") if able_to_get_lat_long else None),
longitude=(result["location"]["address"]["coordinate"].get("lon") if able_to_get_lat_long else None),
address=self._parse_address(result, search_type="general_search"),
description=self._parse_description(result),
neighborhoods=self._parse_neighborhoods(result),
county=(result["location"]["county"].get("name") if result["location"]["county"] else None),
fips_code=(result["location"]["county"].get("fips_code") if result["location"]["county"] else None),
days_on_mls=self.calculate_days_on_mls(result),
nearby_schools=prop_details.get("schools"),
assessed_value=prop_details.get("assessed_value"),
estimated_value=estimated_value if estimated_value else None,
advertisers=advertisers,
tax=prop_details.get("tax"),
tax_history=prop_details.get("tax_history"),
# Additional fields from GraphQL
mls_status=result.get("mls_status"),
last_sold_price=result.get("last_sold_price"),
tags=result.get("tags"),
details=result.get("details"),
open_houses=self._parse_open_houses(result.get("open_houses")),
pet_policy=result.get("pet_policy"),
units=self._parse_units(result.get("units")),
monthly_fees=result.get("monthly_fees"),
one_time_fees=result.get("one_time_fees"),
parking=result.get("parking"),
terms=result.get("terms"),
popularity=result.get("popularity"),
tax_record=self._parse_tax_record(result.get("tax_record")),
parcel_info=result.get("location", {}).get("parcel"),
current_estimates=self._parse_current_estimates(result.get("current_estimates")),
estimates=result.get("estimates"),
photos=result.get("photos"),
flags=result.get("flags"),
)
return realty_property
def general_search(self, variables: dict, search_type: str) -> Dict[str, Union[int, Union[list[Property], list[dict]]]]:
"""
@@ -425,7 +294,8 @@ class RealtorScraper(Scraper):
if self.return_type != ReturnType.raw:
with ThreadPoolExecutor(max_workers=self.NUM_PROPERTY_WORKERS) as executor:
futures = [executor.submit(self.process_property, result) for result in properties_list]
futures = [executor.submit(process_property, result, self.mls_only, self.extra_property_data,
self.exclude_pending, self.listing_type, get_key, process_extra_property_details) for result in properties_list]
for future in as_completed(futures):
result = future.result()
@@ -510,54 +380,7 @@ class RealtorScraper(Scraper):
return homes
@staticmethod
def get_key(data: dict, keys: list):
try:
value = data
for key in keys:
value = value[key]
return value or {}
except (KeyError, TypeError, IndexError):
return {}
def process_extra_property_details(self, result: dict) -> dict:
schools = self.get_key(result, ["nearbySchools", "schools"])
assessed_value = self.get_key(result, ["taxHistory", 0, "assessment", "total"])
tax_history = self.get_key(result, ["taxHistory"])
schools = [school["district"]["name"] for school in schools if school["district"].get("name")]
# Process tax history
latest_tax = None
processed_tax_history = None
if tax_history and isinstance(tax_history, list):
tax_history = sorted(tax_history, key=lambda x: x.get("year", 0), reverse=True)
if tax_history and "tax" in tax_history[0]:
latest_tax = tax_history[0]["tax"]
processed_tax_history = []
for entry in tax_history:
if "year" in entry and "tax" in entry:
processed_entry = {
"year": entry["year"],
"tax": entry["tax"],
}
if "assessment" in entry and isinstance(entry["assessment"], dict):
processed_entry["assessment"] = {
"building": entry["assessment"].get("building"),
"land": entry["assessment"].get("land"),
"total": entry["assessment"].get("total"),
}
processed_tax_history.append(processed_entry)
return {
"schools": schools if schools else None,
"assessed_value": assessed_value if assessed_value else None,
"tax": latest_tax,
"tax_history": processed_tax_history,
}
@retry(
retry=retry_if_exception_type(JSONDecodeError),
@@ -594,213 +417,4 @@ class RealtorScraper(Scraper):
properties = data["data"]
return {data.replace('home_', ''): properties[data] for data in properties if properties[data]}
@staticmethod
def _parse_neighborhoods(result: dict) -> Optional[str]:
neighborhoods_list = []
neighborhoods = result["location"].get("neighborhoods", [])
if neighborhoods:
for neighborhood in neighborhoods:
name = neighborhood.get("name")
if name:
neighborhoods_list.append(name)
return ", ".join(neighborhoods_list) if neighborhoods_list else None
@staticmethod
def handle_none_safely(address_part):
if address_part is None:
return ""
return address_part
@staticmethod
def _parse_address(result: dict, search_type):
if search_type == "general_search":
address = result["location"]["address"]
else:
address = result["address"]
return Address(
full_line=address.get("line"),
street=" ".join(
part
for part in [
address.get("street_number"),
address.get("street_direction"),
address.get("street_name"),
address.get("street_suffix"),
]
if part is not None
).strip(),
unit=address["unit"],
city=address["city"],
state=address["state_code"],
zip=address["postal_code"],
# Additional address fields
street_direction=address.get("street_direction"),
street_number=address.get("street_number"),
street_name=address.get("street_name"),
street_suffix=address.get("street_suffix"),
)
@staticmethod
def _parse_description(result: dict) -> Description | None:
if not result:
return None
description_data = result.get("description", {})
if description_data is None or not isinstance(description_data, dict):
description_data = {}
style = description_data.get("type", "")
if style is not None:
style = style.upper()
primary_photo = None
if (primary_photo_info := result.get("primary_photo")) and (
primary_photo_href := primary_photo_info.get("href")
):
primary_photo = primary_photo_href.replace("s.jpg", "od-w480_h360_x2.webp?w=1080&q=75")
return Description(
primary_photo=primary_photo,
alt_photos=RealtorScraper.process_alt_photos(result.get("photos", [])),
style=(PropertyType.__getitem__(style) if style and style in PropertyType.__members__ else None),
beds=description_data.get("beds"),
baths_full=description_data.get("baths_full"),
baths_half=description_data.get("baths_half"),
sqft=description_data.get("sqft"),
lot_sqft=description_data.get("lot_sqft"),
sold_price=(
result.get("last_sold_price") or description_data.get("sold_price")
if result.get("last_sold_date") or result["list_price"] != description_data.get("sold_price")
else None
), #: has a sold date or list and sold price are different
year_built=description_data.get("year_built"),
garage=description_data.get("garage"),
stories=description_data.get("stories"),
text=description_data.get("text"),
# Additional description fields
name=description_data.get("name"),
type=description_data.get("type"),
)
@staticmethod
def calculate_days_on_mls(result: dict) -> Optional[int]:
list_date_str = result.get("list_date")
list_date = datetime.strptime(list_date_str.split("T")[0], "%Y-%m-%d") if list_date_str else None
last_sold_date_str = result.get("last_sold_date")
last_sold_date = datetime.strptime(last_sold_date_str, "%Y-%m-%d") if last_sold_date_str else None
today = datetime.now()
if list_date:
if result["status"] == "sold":
if last_sold_date:
days = (last_sold_date - list_date).days
if days >= 0:
return days
elif result["status"] in ("for_sale", "for_rent"):
days = (today - list_date).days
if days >= 0:
return days
@staticmethod
def process_alt_photos(photos_info: list[dict]) -> list[str] | None:
if not photos_info:
return None
return [
photo_info["href"].replace("s.jpg", "od-w480_h360_x2.webp?w=1080&q=75")
for photo_info in photos_info
if photo_info.get("href")
]
@staticmethod
def _parse_open_houses(open_houses_data: list[dict] | None) -> list[dict] | None:
"""Parse open houses data and convert date strings to datetime objects"""
if not open_houses_data:
return None
parsed_open_houses = []
for oh in open_houses_data:
parsed_oh = oh.copy()
# Parse start_date and end_date
if parsed_oh.get("start_date"):
try:
parsed_oh["start_date"] = datetime.fromisoformat(parsed_oh["start_date"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
parsed_oh["start_date"] = None
if parsed_oh.get("end_date"):
try:
parsed_oh["end_date"] = datetime.fromisoformat(parsed_oh["end_date"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
parsed_oh["end_date"] = None
parsed_open_houses.append(parsed_oh)
return parsed_open_houses
@staticmethod
def _parse_units(units_data: list[dict] | None) -> list[dict] | None:
"""Parse units data and convert date strings to datetime objects"""
if not units_data:
return None
parsed_units = []
for unit in units_data:
parsed_unit = unit.copy()
# Parse availability date
if parsed_unit.get("availability") and parsed_unit["availability"].get("date"):
try:
parsed_unit["availability"]["date"] = datetime.fromisoformat(parsed_unit["availability"]["date"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
parsed_unit["availability"]["date"] = None
parsed_units.append(parsed_unit)
return parsed_units
@staticmethod
def _parse_tax_record(tax_record_data: dict | None) -> dict | None:
"""Parse tax record data and convert date strings to datetime objects"""
if not tax_record_data:
return None
parsed_tax_record = tax_record_data.copy()
# Parse last_update_date
if parsed_tax_record.get("last_update_date"):
try:
parsed_tax_record["last_update_date"] = datetime.fromisoformat(parsed_tax_record["last_update_date"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
parsed_tax_record["last_update_date"] = None
return parsed_tax_record
@staticmethod
def _parse_current_estimates(estimates_data: list[dict] | None) -> list[dict] | None:
"""Parse current estimates data and convert date strings to datetime objects"""
if not estimates_data:
return None
parsed_estimates = []
for estimate in estimates_data:
parsed_estimate = estimate.copy()
# Parse date
if parsed_estimate.get("date"):
try:
parsed_estimate["date"] = datetime.fromisoformat(parsed_estimate["date"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
parsed_estimate["date"] = None
parsed_estimates.append(parsed_estimate)
return parsed_estimates

View File

@@ -0,0 +1,279 @@
"""
Parsers for realtor.com data processing
"""
from datetime import datetime
from typing import Optional
from ..models import Address, Description, PropertyType
def parse_open_houses(open_houses_data: list[dict] | None) -> list[dict] | None:
"""Parse open houses data and convert date strings to datetime objects"""
if not open_houses_data:
return None
parsed_open_houses = []
for oh in open_houses_data:
parsed_oh = oh.copy()
# Parse start_date and end_date
if parsed_oh.get("start_date"):
try:
parsed_oh["start_date"] = datetime.fromisoformat(parsed_oh["start_date"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
parsed_oh["start_date"] = None
if parsed_oh.get("end_date"):
try:
parsed_oh["end_date"] = datetime.fromisoformat(parsed_oh["end_date"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
parsed_oh["end_date"] = None
parsed_open_houses.append(parsed_oh)
return parsed_open_houses
def parse_units(units_data: list[dict] | None) -> list[dict] | None:
"""Parse units data and convert date strings to datetime objects"""
if not units_data:
return None
parsed_units = []
for unit in units_data:
parsed_unit = unit.copy()
# Parse availability date
if parsed_unit.get("availability") and parsed_unit["availability"].get("date"):
try:
parsed_unit["availability"]["date"] = datetime.fromisoformat(parsed_unit["availability"]["date"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
parsed_unit["availability"]["date"] = None
parsed_units.append(parsed_unit)
return parsed_units
def parse_tax_record(tax_record_data: dict | None) -> dict | None:
"""Parse tax record data and convert date strings to datetime objects"""
if not tax_record_data:
return None
parsed_tax_record = tax_record_data.copy()
# Parse last_update_date
if parsed_tax_record.get("last_update_date"):
try:
parsed_tax_record["last_update_date"] = datetime.fromisoformat(parsed_tax_record["last_update_date"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
parsed_tax_record["last_update_date"] = None
return parsed_tax_record
def parse_current_estimates(estimates_data: list[dict] | None) -> list[dict] | None:
"""Parse current estimates data and convert date strings to datetime objects"""
if not estimates_data:
return None
parsed_estimates = []
for estimate in estimates_data:
parsed_estimate = estimate.copy()
# Parse date
if parsed_estimate.get("date"):
try:
parsed_estimate["date"] = datetime.fromisoformat(parsed_estimate["date"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
parsed_estimate["date"] = None
# Parse source information
if parsed_estimate.get("source"):
source_data = parsed_estimate["source"]
parsed_estimate["source"] = {
"type": source_data.get("type"),
"name": source_data.get("name")
}
parsed_estimates.append(parsed_estimate)
return parsed_estimates
def parse_estimates(estimates_data: dict | None) -> dict | None:
"""Parse estimates data and convert date strings to datetime objects"""
if not estimates_data:
return None
parsed_estimates = estimates_data.copy()
# Parse current_values (which is aliased as currentValues in GraphQL)
current_values = parsed_estimates.get("currentValues") or parsed_estimates.get("current_values")
if current_values:
parsed_current_values = []
for estimate in current_values:
parsed_estimate = estimate.copy()
# Parse date
if parsed_estimate.get("date"):
try:
parsed_estimate["date"] = datetime.fromisoformat(parsed_estimate["date"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
parsed_estimate["date"] = None
# Parse source information
if parsed_estimate.get("source"):
source_data = parsed_estimate["source"]
parsed_estimate["source"] = {
"type": source_data.get("type"),
"name": source_data.get("name")
}
# Convert GraphQL aliases to Pydantic field names
if "estimateHigh" in parsed_estimate:
parsed_estimate["estimate_high"] = parsed_estimate.pop("estimateHigh")
if "estimateLow" in parsed_estimate:
parsed_estimate["estimate_low"] = parsed_estimate.pop("estimateLow")
if "isBestHomeValue" in parsed_estimate:
parsed_estimate["is_best_home_value"] = parsed_estimate.pop("isBestHomeValue")
parsed_current_values.append(parsed_estimate)
parsed_estimates["current_values"] = parsed_current_values
# Remove the GraphQL alias if it exists
if "currentValues" in parsed_estimates:
del parsed_estimates["currentValues"]
return parsed_estimates
def parse_neighborhoods(result: dict) -> Optional[str]:
"""Parse neighborhoods from location data"""
neighborhoods_list = []
neighborhoods = result["location"].get("neighborhoods", [])
if neighborhoods:
for neighborhood in neighborhoods:
name = neighborhood.get("name")
if name:
neighborhoods_list.append(name)
return ", ".join(neighborhoods_list) if neighborhoods_list else None
def handle_none_safely(address_part):
"""Handle None values safely for address parts"""
if address_part is None:
return ""
return address_part
def parse_address(result: dict, search_type: str) -> Address:
"""Parse address data from result"""
if search_type == "general_search":
address = result["location"]["address"]
else:
address = result["address"]
return Address(
full_line=address.get("line"),
street=" ".join(
part
for part in [
address.get("street_number"),
address.get("street_direction"),
address.get("street_name"),
address.get("street_suffix"),
]
if part is not None
).strip(),
unit=address["unit"],
city=address["city"],
state=address["state_code"],
zip=address["postal_code"],
# Additional address fields
street_direction=address.get("street_direction"),
street_number=address.get("street_number"),
street_name=address.get("street_name"),
street_suffix=address.get("street_suffix"),
)
def parse_description(result: dict) -> Description | None:
"""Parse description data from result"""
if not result:
return None
description_data = result.get("description", {})
if description_data is None or not isinstance(description_data, dict):
description_data = {}
style = description_data.get("type", "")
if style is not None:
style = style.upper()
primary_photo = None
if (primary_photo_info := result.get("primary_photo")) and (
primary_photo_href := primary_photo_info.get("href")
):
primary_photo = primary_photo_href.replace("s.jpg", "od-w480_h360_x2.webp?w=1080&q=75")
return Description(
primary_photo=primary_photo,
alt_photos=process_alt_photos(result.get("photos", [])),
style=(PropertyType.__getitem__(style) if style and style in PropertyType.__members__ else None),
beds=description_data.get("beds"),
baths_full=description_data.get("baths_full"),
baths_half=description_data.get("baths_half"),
sqft=description_data.get("sqft"),
lot_sqft=description_data.get("lot_sqft"),
sold_price=(
result.get("last_sold_price") or description_data.get("sold_price")
if result.get("last_sold_date") or result["list_price"] != description_data.get("sold_price")
else None
), #: has a sold date or list and sold price are different
year_built=description_data.get("year_built"),
garage=description_data.get("garage"),
stories=description_data.get("stories"),
text=description_data.get("text"),
# Additional description fields
name=description_data.get("name"),
type=description_data.get("type"),
)
def calculate_days_on_mls(result: dict) -> Optional[int]:
"""Calculate days on MLS from result data"""
list_date_str = result.get("list_date")
list_date = datetime.strptime(list_date_str.split("T")[0], "%Y-%m-%d") if list_date_str else None
last_sold_date_str = result.get("last_sold_date")
last_sold_date = datetime.strptime(last_sold_date_str, "%Y-%m-%d") if last_sold_date_str else None
today = datetime.now()
if list_date:
if result["status"] == "sold":
if last_sold_date:
days = (last_sold_date - list_date).days
if days >= 0:
return days
elif result["status"] in ("for_sale", "for_rent"):
days = (today - list_date).days
if days >= 0:
return days
def process_alt_photos(photos_info: list[dict]) -> list[str] | None:
"""Process alternative photos from photos info"""
if not photos_info:
return None
return [
photo_info["href"].replace("s.jpg", "od-w480_h360_x2.webp?w=1080&q=75")
for photo_info in photos_info
if photo_info.get("href")
]

View File

@@ -0,0 +1,224 @@
"""
Processors for realtor.com property data processing
"""
from datetime import datetime
from typing import Optional
from ..models import (
Property,
ListingType,
Agent,
Broker,
Builder,
Advertisers,
Office,
ReturnType
)
from .parsers import (
parse_open_houses,
parse_units,
parse_tax_record,
parse_current_estimates,
parse_estimates,
parse_neighborhoods,
parse_address,
parse_description,
calculate_days_on_mls,
process_alt_photos
)
def process_advertisers(advertisers: list[dict] | None) -> Advertisers | None:
"""Process advertisers data from GraphQL response"""
if not advertisers:
return None
def _parse_fulfillment_id(fulfillment_id: str | None) -> str | None:
return fulfillment_id if fulfillment_id and fulfillment_id != "0" else None
processed_advertisers = Advertisers()
for advertiser in advertisers:
advertiser_type = advertiser.get("type")
if advertiser_type == "seller": #: agent
processed_advertisers.agent = Agent(
uuid=_parse_fulfillment_id(advertiser.get("fulfillment_id")),
nrds_id=advertiser.get("nrds_id"),
mls_set=advertiser.get("mls_set"),
name=advertiser.get("name"),
email=advertiser.get("email"),
phones=advertiser.get("phones"),
state_license=advertiser.get("state_license"),
)
if advertiser.get("broker") and advertiser["broker"].get("name"): #: has a broker
processed_advertisers.broker = Broker(
uuid=_parse_fulfillment_id(advertiser["broker"].get("fulfillment_id")),
name=advertiser["broker"].get("name"),
)
if advertiser.get("office"): #: has an office
processed_advertisers.office = Office(
uuid=_parse_fulfillment_id(advertiser["office"].get("fulfillment_id")),
mls_set=advertiser["office"].get("mls_set"),
name=advertiser["office"].get("name"),
email=advertiser["office"].get("email"),
phones=advertiser["office"].get("phones"),
)
if advertiser_type == "community": #: could be builder
if advertiser.get("builder"):
processed_advertisers.builder = Builder(
uuid=_parse_fulfillment_id(advertiser["builder"].get("fulfillment_id")),
name=advertiser["builder"].get("name"),
)
return processed_advertisers
def process_property(result: dict, mls_only: bool = False, extra_property_data: bool = False,
exclude_pending: bool = False, listing_type: ListingType = ListingType.FOR_SALE,
get_key_func=None, process_extra_property_details_func=None) -> Property | None:
"""Process property data from GraphQL response"""
mls = result["source"].get("id") if "source" in result and isinstance(result["source"], dict) else None
if not mls and mls_only:
return None
able_to_get_lat_long = (
result
and result.get("location")
and result["location"].get("address")
and result["location"]["address"].get("coordinate")
)
is_pending = result["flags"].get("is_pending")
is_contingent = result["flags"].get("is_contingent")
if (is_pending or is_contingent) and (exclude_pending and listing_type != ListingType.PENDING):
return None
property_id = result["property_id"]
prop_details = process_extra_property_details_func(result) if extra_property_data and process_extra_property_details_func else {}
property_estimates_root = result.get("current_estimates") or result.get("estimates", {}).get("currentValues")
estimated_value = get_key_func(property_estimates_root, [0, "estimate"]) if get_key_func else None
advertisers = process_advertisers(result.get("advertisers"))
realty_property = Property(
mls=mls,
mls_id=(
result["source"].get("listing_id")
if "source" in result and isinstance(result["source"], dict)
else None
),
property_url=result["href"],
property_id=property_id,
listing_id=result.get("listing_id"),
permalink=result.get("permalink"),
status=("PENDING" if is_pending else "CONTINGENT" if is_contingent else result["status"].upper()),
list_price=result["list_price"],
list_price_min=result["list_price_min"],
list_price_max=result["list_price_max"],
list_date=(datetime.fromisoformat(result["list_date"].split("T")[0]) if result.get("list_date") else None),
prc_sqft=result.get("price_per_sqft"),
last_sold_date=(datetime.fromisoformat(result["last_sold_date"]) if result.get("last_sold_date") else None),
pending_date=(datetime.fromisoformat(result["pending_date"].split("T")[0]) if result.get("pending_date") else None),
new_construction=result["flags"].get("is_new_construction") is True,
hoa_fee=(result["hoa"]["fee"] if result.get("hoa") and isinstance(result["hoa"], dict) else None),
latitude=(result["location"]["address"]["coordinate"].get("lat") if able_to_get_lat_long else None),
longitude=(result["location"]["address"]["coordinate"].get("lon") if able_to_get_lat_long else None),
address=parse_address(result, search_type="general_search"),
description=parse_description(result),
neighborhoods=parse_neighborhoods(result),
county=(result["location"]["county"].get("name") if result["location"]["county"] else None),
fips_code=(result["location"]["county"].get("fips_code") if result["location"]["county"] else None),
days_on_mls=calculate_days_on_mls(result),
nearby_schools=prop_details.get("schools"),
assessed_value=prop_details.get("assessed_value"),
estimated_value=estimated_value if estimated_value else None,
advertisers=advertisers,
tax=prop_details.get("tax"),
tax_history=prop_details.get("tax_history"),
# Additional fields from GraphQL
mls_status=result.get("mls_status"),
last_sold_price=result.get("last_sold_price"),
tags=result.get("tags"),
details=result.get("details"),
open_houses=parse_open_houses(result.get("open_houses")),
pet_policy=result.get("pet_policy"),
units=parse_units(result.get("units")),
monthly_fees=result.get("monthly_fees"),
one_time_fees=result.get("one_time_fees"),
parking=result.get("parking"),
terms=result.get("terms"),
popularity=result.get("popularity"),
tax_record=parse_tax_record(result.get("tax_record")),
parcel_info=result.get("location", {}).get("parcel"),
current_estimates=parse_current_estimates(result.get("current_estimates")),
estimates=parse_estimates(result.get("estimates")),
photos=result.get("photos"),
flags=result.get("flags"),
)
return realty_property
def process_extra_property_details(result: dict, get_key_func=None) -> dict:
"""Process extra property details from GraphQL response"""
if get_key_func:
schools = get_key_func(result, ["nearbySchools", "schools"])
assessed_value = get_key_func(result, ["taxHistory", 0, "assessment", "total"])
tax_history = get_key_func(result, ["taxHistory"])
else:
nearby_schools = result.get("nearbySchools")
schools = nearby_schools.get("schools", []) if nearby_schools else []
tax_history_data = result.get("taxHistory", [])
assessed_value = tax_history_data[0]["assessment"]["total"] if tax_history_data and tax_history_data[0].get("assessment", {}).get("total") else None
tax_history = tax_history_data
if schools:
schools = [school["district"]["name"] for school in schools if school["district"].get("name")]
# Process tax history
latest_tax = None
processed_tax_history = None
if tax_history and isinstance(tax_history, list):
tax_history = sorted(tax_history, key=lambda x: x.get("year", 0), reverse=True)
if tax_history and "tax" in tax_history[0]:
latest_tax = tax_history[0]["tax"]
processed_tax_history = []
for entry in tax_history:
if "year" in entry and "tax" in entry:
processed_entry = {
"year": entry["year"],
"tax": entry["tax"],
}
if "assessment" in entry and isinstance(entry["assessment"], dict):
processed_entry["assessment"] = {
"building": entry["assessment"].get("building"),
"land": entry["assessment"].get("land"),
"total": entry["assessment"].get("total"),
}
processed_tax_history.append(processed_entry)
return {
"schools": schools if schools else None,
"assessed_value": assessed_value if assessed_value else None,
"tax": latest_tax,
"tax_history": processed_tax_history,
}
def get_key(data: dict, keys: list):
"""Get nested key from dictionary safely"""
try:
value = data
for key in keys:
value = value[key]
return value or {}
except (KeyError, TypeError, IndexError):
return {}

View File

@@ -15,13 +15,13 @@ ordered_properties = [
"mls_status",
"text",
"style",
"formatted_address",
"full_street_line",
"street",
"unit",
"city",
"state",
"zip_code",
"formatted_address",
"beds",
"full_baths",
"half_baths",