enh: add agent

pull/68/head
Cullen 2024-04-16 15:07:50 -05:00
parent 1f47fc3b7e
commit 7037dc9b15
2 changed files with 135 additions and 93 deletions

21
.pre-commit-config.yaml Normal file
View File

@ -0,0 +1,21 @@
---
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-added-large-files
- id: check-yaml
- repo: https://github.com/adrienverge/yamllint
rev: v1.29.0
hooks:
- id: yamllint
verbose: true # create awareness of linter findings
args: ["-d", "{extends: relaxed, rules: {line-length: {max: 120}}}"]
- repo: https://github.com/psf/black
rev: 24.2.0
hooks:
- id: black
language_version: python
args: [--line-length=120, --quiet]

View File

@ -4,18 +4,21 @@ homeharvest.realtor.__init__
This module implements the scraper for realtor.com This module implements the scraper for realtor.com
""" """
from datetime import datetime from datetime import datetime
from typing import Dict, Union, Optional from typing import Dict, Union, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from .. import Scraper from .. import Scraper
from ..models import Property, Address, ListingType, Description, PropertyType from ..models import Property, Address, ListingType, Description, PropertyType, Agent
class RealtorScraper(Scraper): class RealtorScraper(Scraper):
SEARCH_GQL_URL = "https://www.realtor.com/api/v1/rdc_search_srp?client_id=rdc-search-new-communities&schema=vesta" SEARCH_GQL_URL = "https://www.realtor.com/api/v1/rdc_search_srp?client_id=rdc-search-new-communities&schema=vesta"
PROPERTY_URL = "https://www.realtor.com/realestateandhomes-detail/" PROPERTY_URL = "https://www.realtor.com/realestateandhomes-detail/"
PROPERTY_GQL = "https://graph.realtor.com/graphql"
ADDRESS_AUTOCOMPLETE_URL = "https://parser-external.geo.moveaws.com/suggest" ADDRESS_AUTOCOMPLETE_URL = "https://parser-external.geo.moveaws.com/suggest"
NUM_PROPERTY_WORKERS = 20
def __init__(self, scraper_input): def __init__(self, scraper_input):
super().__init__(scraper_input) super().__init__(scraper_input)
@ -110,15 +113,17 @@ class RealtorScraper(Scraper):
) )
able_to_get_lat_long = ( able_to_get_lat_long = (
property_info property_info
and property_info.get("address") and property_info.get("address")
and property_info["address"].get("location") and property_info["address"].get("location")
and property_info["address"]["location"].get("coordinate") and property_info["address"]["location"].get("coordinate")
)
list_date_str = (
property_info["basic"]["list_date"].split("T")[0] if property_info["basic"].get("list_date") else None
)
last_sold_date_str = (
property_info["basic"]["sold_date"].split("T")[0] if property_info["basic"].get("sold_date") else None
) )
list_date_str = property_info["basic"]["list_date"].split("T")[0] if property_info["basic"].get(
"list_date") else None
last_sold_date_str = property_info["basic"]["sold_date"].split("T")[0] if property_info["basic"].get(
"sold_date") else None
pending_date_str = property_info["pending_date"].split("T")[0] if property_info.get("pending_date") else None pending_date_str = property_info["pending_date"].split("T")[0] if property_info.get("pending_date") else None
list_date = datetime.strptime(list_date_str, "%Y-%m-%d") if list_date_str else None list_date = datetime.strptime(list_date_str, "%Y-%m-%d") if list_date_str else None
@ -131,33 +136,33 @@ class RealtorScraper(Scraper):
if list_date: if list_date:
if status == "sold" and last_sold_date: if status == "sold" and last_sold_date:
days_on_mls = (last_sold_date - list_date).days days_on_mls = (last_sold_date - list_date).days
elif status in ('for_sale', 'for_rent'): elif status in ("for_sale", "for_rent"):
days_on_mls = (today - list_date).days days_on_mls = (today - list_date).days
if days_on_mls and days_on_mls < 0: if days_on_mls and days_on_mls < 0:
days_on_mls = None days_on_mls = None
property_id = property_info["details"]["permalink"]
agents = self.get_agents(property_id)
listing = Property( listing = Property(
mls=mls, mls=mls,
mls_id=property_info["source"].get("listing_id") mls_id=(
if "source" in property_info and isinstance(property_info["source"], dict) property_info["source"].get("listing_id")
else None, if "source" in property_info and isinstance(property_info["source"], dict)
property_url=f"{self.PROPERTY_URL}{property_info['details']['permalink']}", else None
),
property_url=f"{self.PROPERTY_URL}{property_id}",
status=property_info["basic"]["status"].upper(), status=property_info["basic"]["status"].upper(),
list_price=property_info["basic"]["price"], list_price=property_info["basic"]["price"],
list_date=list_date, list_date=list_date,
prc_sqft=property_info["basic"].get("price") prc_sqft=(
/ property_info["basic"].get("sqft") property_info["basic"].get("price") / property_info["basic"].get("sqft")
if property_info["basic"].get("price") if property_info["basic"].get("price") and property_info["basic"].get("sqft")
and property_info["basic"].get("sqft") else None
else None, ),
last_sold_date=last_sold_date, last_sold_date=last_sold_date,
pending_date=pending_date, pending_date=pending_date,
latitude=property_info["address"]["location"]["coordinate"].get("lat") latitude=property_info["address"]["location"]["coordinate"].get("lat") if able_to_get_lat_long else None,
if able_to_get_lat_long longitude=property_info["address"]["location"]["coordinate"].get("lon") if able_to_get_lat_long else None,
else None,
longitude=property_info["address"]["location"]["coordinate"].get("lon")
if able_to_get_lat_long
else None,
address=self._parse_address(property_info, search_type="handle_listing"), address=self._parse_address(property_info, search_type="handle_listing"),
description=Description( description=Description(
alt_photos=self.process_alt_photos(property_info.get("media", {}).get("photos", [])), alt_photos=self.process_alt_photos(property_info.get("media", {}).get("photos", [])),
@ -172,7 +177,8 @@ class RealtorScraper(Scraper):
garage=property_info["details"].get("garage"), garage=property_info["details"].get("garage"),
stories=property_info["details"].get("stories"), stories=property_info["details"].get("stories"),
), ),
days_on_mls=days_on_mls days_on_mls=days_on_mls,
agents=agents,
) )
return [listing] return [listing]
@ -266,6 +272,7 @@ class RealtorScraper(Scraper):
}""" }"""
variables = {"property_id": property_id} variables = {"property_id": property_id}
agents = self.get_agents(property_id)
payload = { payload = {
"query": query, "query": query,
@ -281,16 +288,13 @@ class RealtorScraper(Scraper):
Property( Property(
mls_id=property_id, mls_id=property_id,
property_url=f"{self.PROPERTY_URL}{property_info['details']['permalink']}", property_url=f"{self.PROPERTY_URL}{property_info['details']['permalink']}",
address=self._parse_address( address=self._parse_address(property_info, search_type="handle_address"),
property_info, search_type="handle_address"
),
description=self._parse_description(property_info), description=self._parse_description(property_info),
agents=agents,
) )
] ]
def general_search( def general_search(self, variables: dict, search_type: str) -> Dict[str, Union[int, list[Property]]]:
self, variables: dict, search_type: str
) -> Dict[str, Union[int, list[Property]]]:
""" """
Handles a location area & returns a list of properties Handles a location area & returns a list of properties
""" """
@ -380,17 +384,15 @@ class RealtorScraper(Scraper):
) )
pending_or_contingent_param = ( pending_or_contingent_param = (
"or_filters: { contingent: true, pending: true }" "or_filters: { contingent: true, pending: true }" if self.listing_type == ListingType.PENDING else ""
if self.listing_type == ListingType.PENDING
else ""
) )
listing_type = ListingType.FOR_SALE if self.listing_type == ListingType.PENDING else self.listing_type listing_type = ListingType.FOR_SALE if self.listing_type == ListingType.PENDING else self.listing_type
is_foreclosure = "" is_foreclosure = ""
if variables.get('foreclosure') is True: if variables.get("foreclosure") is True:
is_foreclosure = "foreclosure: true" is_foreclosure = "foreclosure: true"
elif variables.get('foreclosure') is False: elif variables.get("foreclosure") is False:
is_foreclosure = "foreclosure: false" is_foreclosure = "foreclosure: false"
if search_type == "comps": #: comps search, came from an address if search_type == "comps": #: comps search, came from an address
@ -400,11 +402,11 @@ class RealtorScraper(Scraper):
$offset: Int!, $offset: Int!,
) { ) {
home_search( home_search(
query: { query: {
%s %s
nearby: { nearby: {
coordinates: $coordinates coordinates: $coordinates
radius: $radius radius: $radius
} }
status: %s status: %s
%s %s
@ -453,7 +455,7 @@ class RealtorScraper(Scraper):
) )
else: #: general search, came from an address else: #: general search, came from an address
query = ( query = (
"""query Property_search( """query Property_search(
$property_id: [ID]! $property_id: [ID]!
$offset: Int!, $offset: Int!,
) { ) {
@ -464,7 +466,7 @@ class RealtorScraper(Scraper):
limit: 1 limit: 1
offset: $offset offset: $offset
) %s""" ) %s"""
% results_query % results_query
) )
payload = { payload = {
@ -480,64 +482,72 @@ class RealtorScraper(Scraper):
properties: list[Property] = [] properties: list[Property] = []
if ( if (
response_json is None response_json is None
or "data" not in response_json or "data" not in response_json
or response_json["data"] is None or response_json["data"] is None
or search_key not in response_json["data"] or search_key not in response_json["data"]
or response_json["data"][search_key] is None or response_json["data"][search_key] is None
or "results" not in response_json["data"][search_key] or "results" not in response_json["data"][search_key]
): ):
return {"total": 0, "properties": []} return {"total": 0, "properties": []}
for result in response_json["data"][search_key]["results"]: def process_property(result: dict) -> Property | None:
mls = ( mls = result["source"].get("id") if "source" in result and isinstance(result["source"], dict) else None
result["source"].get("id")
if "source" in result and isinstance(result["source"], dict)
else None
)
if not mls and self.mls_only: if not mls and self.mls_only:
continue return
able_to_get_lat_long = ( able_to_get_lat_long = (
result result
and result.get("location") and result.get("location")
and result["location"].get("address") and result["location"].get("address")
and result["location"]["address"].get("coordinate") and result["location"]["address"].get("coordinate")
) )
is_pending = result["flags"].get("is_pending") or result["flags"].get("is_contingent") is_pending = result["flags"].get("is_pending") or result["flags"].get("is_contingent")
if is_pending and self.listing_type != ListingType.PENDING: if is_pending and self.listing_type != ListingType.PENDING:
continue return
property_id = result["property_id"]
agents = self.get_agents(property_id)
realty_property = Property( realty_property = Property(
mls=mls, mls=mls,
mls_id=result["source"].get("listing_id") mls_id=(
if "source" in result and isinstance(result["source"], dict) result["source"].get("listing_id")
else None, if "source" in result and isinstance(result["source"], dict)
property_url=f"{self.PROPERTY_URL}{result['property_id']}" if self.listing_type != ListingType.FOR_RENT else f"{self.PROPERTY_URL}M{result['property_id']}?listing_status=rental", else None
),
property_url=(
f"{self.PROPERTY_URL}{property_id}"
if self.listing_type != ListingType.FOR_RENT
else f"{self.PROPERTY_URL}M{property_id}?listing_status=rental"
),
status="PENDING" if is_pending else result["status"].upper(), status="PENDING" if is_pending else result["status"].upper(),
list_price=result["list_price"], list_price=result["list_price"],
list_date=result["list_date"].split("T")[0] list_date=result["list_date"].split("T")[0] if result.get("list_date") else None,
if result.get("list_date")
else None,
prc_sqft=result.get("price_per_sqft"), prc_sqft=result.get("price_per_sqft"),
last_sold_date=result.get("last_sold_date"), last_sold_date=result.get("last_sold_date"),
hoa_fee=result["hoa"]["fee"] hoa_fee=result["hoa"]["fee"] if result.get("hoa") and isinstance(result["hoa"], dict) else None,
if result.get("hoa") and isinstance(result["hoa"], dict) latitude=result["location"]["address"]["coordinate"].get("lat") if able_to_get_lat_long else None,
else None, longitude=result["location"]["address"]["coordinate"].get("lon") if able_to_get_lat_long else None,
latitude=result["location"]["address"]["coordinate"].get("lat")
if able_to_get_lat_long
else None,
longitude=result["location"]["address"]["coordinate"].get("lon")
if able_to_get_lat_long
else None,
address=self._parse_address(result, search_type="general_search"), address=self._parse_address(result, search_type="general_search"),
description=self._parse_description(result), description=self._parse_description(result),
days_on_mls=self.calculate_days_on_mls(result) days_on_mls=self.calculate_days_on_mls(result),
agents=agents,
) )
properties.append(realty_property) return realty_property
with ThreadPoolExecutor(max_workers=self.NUM_PROPERTY_WORKERS) as executor:
futures = [
executor.submit(process_property, result) for result in response_json["data"][search_key]["results"]
]
for future in as_completed(futures):
result = future.result()
if result:
properties.append(result)
return { return {
"total": response_json["data"][search_key]["total"], "total": response_json["data"][search_key]["total"],
@ -558,18 +568,14 @@ class RealtorScraper(Scraper):
search_type = ( search_type = (
"comps" "comps"
if self.radius and location_type == "address" if self.radius and location_type == "address"
else "address" else "address" if location_type == "address" and not self.radius else "area"
if location_type == "address" and not self.radius
else "area"
) )
if location_type == "address": if location_type == "address":
if not self.radius: #: single address search, non comps if not self.radius: #: single address search, non comps
property_id = location_info["mpr_id"] property_id = location_info["mpr_id"]
search_variables |= {"property_id": property_id} search_variables |= {"property_id": property_id}
gql_results = self.general_search( gql_results = self.general_search(search_variables, search_type=search_type)
search_variables, search_type=search_type
)
if gql_results["total"] == 0: if gql_results["total"] == 0:
listing_id = self.get_latest_listing_id(property_id) listing_id = self.get_latest_listing_id(property_id)
if listing_id is None: if listing_id is None:
@ -603,7 +609,7 @@ class RealtorScraper(Scraper):
} }
if self.foreclosure: if self.foreclosure:
search_variables['foreclosure'] = self.foreclosure search_variables["foreclosure"] = self.foreclosure
result = self.general_search(search_variables, search_type=search_type) result = self.general_search(search_variables, search_type=search_type)
total = result["total"] total = result["total"]
@ -624,6 +630,19 @@ class RealtorScraper(Scraper):
return homes return homes
def get_agents(self, property_id: str) -> list[Agent]:
payload = f'{{"query":"query GetHome($property_id: ID!) {{\\n home(property_id: $property_id) {{\\n __typename\\n\\n consumerAdvertisers: consumer_advertisers {{\\n __typename\\n type\\n advertiserId: advertiser_id\\n name\\n phone\\n type\\n href\\n slogan\\n photo {{\\n __typename\\n href\\n }}\\n showRealtorLogo: show_realtor_logo\\n hours\\n }}\\n\\n\\n }}\\n}}\\n","variables":{{"property_id":"{property_id}"}}}}'
response = self.session.post(self.PROPERTY_GQL, data=payload)
data = response.json()
try:
ads = data["data"]["home"]["consumerAdvertisers"]
except (KeyError, TypeError):
return []
agents = [Agent(name=ad["name"], phone=ad["phone"]) for ad in ads]
return agents
@staticmethod @staticmethod
def _parse_neighborhoods(result: dict) -> Optional[str]: def _parse_neighborhoods(result: dict) -> Optional[str]:
neighborhoods_list = [] neighborhoods_list = []
@ -646,17 +665,19 @@ class RealtorScraper(Scraper):
def _parse_address(self, result: dict, search_type): def _parse_address(self, result: dict, search_type):
if search_type == "general_search": if search_type == "general_search":
address = result['location']['address'] address = result["location"]["address"]
else: else:
address = result["address"] address = result["address"]
return Address( return Address(
street=" ".join([ street=" ".join(
self.handle_none_safely(address.get('street_number')), [
self.handle_none_safely(address.get('street_direction')), self.handle_none_safely(address.get("street_number")),
self.handle_none_safely(address.get('street_name')), self.handle_none_safely(address.get("street_direction")),
self.handle_none_safely(address.get('street_suffix')), self.handle_none_safely(address.get("street_name")),
]).strip(), self.handle_none_safely(address.get("street_suffix")),
]
).strip(),
unit=address["unit"], unit=address["unit"],
city=address["city"], city=address["city"],
state=address["state_code"], state=address["state_code"],
@ -705,12 +726,12 @@ class RealtorScraper(Scraper):
today = datetime.now() today = datetime.now()
if list_date: if list_date:
if result["status"] == 'sold': if result["status"] == "sold":
if last_sold_date: if last_sold_date:
days = (last_sold_date - list_date).days days = (last_sold_date - list_date).days
if days >= 0: if days >= 0:
return days return days
elif result["status"] in ('for_sale', 'for_rent'): elif result["status"] in ("for_sale", "for_rent"):
days = (today - list_date).days days = (today - list_date).days
if days >= 0: if days >= 0:
return days return days