enh: add agent name/phone (#66)

This commit is contained in:
Cullen Watson
2024-04-16 14:55:44 -05:00
committed by GitHub
parent 1f47fc3b7e
commit 0bdf56568e
13 changed files with 407 additions and 158 deletions

View File

@@ -5,9 +5,7 @@ from homeharvest import scrape_property
def main():
parser = argparse.ArgumentParser(description="Home Harvest Property Scraper")
parser.add_argument(
"location", type=str, help="Location to scrape (e.g., San Francisco, CA)"
)
parser.add_argument("location", type=str, help="Location to scrape (e.g., San Francisco, CA)")
parser.add_argument(
"-l",
@@ -35,9 +33,7 @@ def main():
help="Name of the output file (without extension)",
)
parser.add_argument(
"-p", "--proxy", type=str, default=None, help="Proxy to use for scraping"
)
parser.add_argument("-p", "--proxy", type=str, default=None, help="Proxy to use for scraping")
parser.add_argument(
"-d",
"--days",

View File

@@ -1,5 +1,6 @@
from dataclasses import dataclass
import requests
import uuid
from .models import Property, ListingType, SiteName
@@ -27,6 +28,12 @@ class Scraper:
if not session:
self.session = requests.Session()
self.session.headers.update(
{
"auth": f"Bearer {self.get_access_token()}",
"apollographql-client-name": "com.move.Realtor-apollo-ios",
}
)
else:
self.session = session
@@ -43,12 +50,26 @@ class Scraper:
self.date_to = scraper_input.date_to
self.foreclosure = scraper_input.foreclosure
def search(self) -> list[Property]:
...
def search(self) -> list[Property]: ...
@staticmethod
def _parse_home(home) -> Property:
...
def _parse_home(home) -> Property: ...
def handle_location(self):
...
def handle_location(self): ...
def get_access_token(self):
url = "https://graph.realtor.com/auth/token"
payload = f'{{"client_app_id":"rdc_mobile_native,24.20.4.149916,iphone","device_id":"{str(uuid.uuid4()).upper()}","grant_type":"device_mobile"}}'
headers = {
"Host": "graph.realtor.com",
"x-client-version": "24.20.4.149916",
"accept": "*/*",
"content-type": "Application/json",
"user-agent": "Realtor.com/24.20.4.149916 CFNetwork/1410.0.3 Darwin/22.6.0",
"accept-language": "en-US,en;q=0.9",
}
response = requests.post(url, headers=headers, data=payload)
data = response.json()
return data["access_token"]

View File

@@ -69,6 +69,12 @@ class Description:
stories: int | None = None
@dataclass
class Agent:
name: str | None = None
phone: str | None = None
@dataclass
class Property:
property_url: str
@@ -89,3 +95,5 @@ class Property:
latitude: float | None = None
longitude: float | None = None
neighborhoods: Optional[str] = None
agents: list[Agent] = None

View File

@@ -4,18 +4,21 @@ homeharvest.realtor.__init__
This module implements the scraper for realtor.com
"""
from datetime import datetime
from typing import Dict, Union, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from .. import Scraper
from ..models import Property, Address, ListingType, Description, PropertyType
from ..models import Property, Address, ListingType, Description, PropertyType, Agent
class RealtorScraper(Scraper):
SEARCH_GQL_URL = "https://www.realtor.com/api/v1/rdc_search_srp?client_id=rdc-search-new-communities&schema=vesta"
PROPERTY_URL = "https://www.realtor.com/realestateandhomes-detail/"
PROPERTY_GQL = "https://graph.realtor.com/graphql"
ADDRESS_AUTOCOMPLETE_URL = "https://parser-external.geo.moveaws.com/suggest"
NUM_PROPERTY_WORKERS = 20
def __init__(self, scraper_input):
super().__init__(scraper_input)
@@ -110,15 +113,17 @@ class RealtorScraper(Scraper):
)
able_to_get_lat_long = (
property_info
and property_info.get("address")
and property_info["address"].get("location")
and property_info["address"]["location"].get("coordinate")
property_info
and property_info.get("address")
and property_info["address"].get("location")
and property_info["address"]["location"].get("coordinate")
)
list_date_str = (
property_info["basic"]["list_date"].split("T")[0] if property_info["basic"].get("list_date") else None
)
last_sold_date_str = (
property_info["basic"]["sold_date"].split("T")[0] if property_info["basic"].get("sold_date") else None
)
list_date_str = property_info["basic"]["list_date"].split("T")[0] if property_info["basic"].get(
"list_date") else None
last_sold_date_str = property_info["basic"]["sold_date"].split("T")[0] if property_info["basic"].get(
"sold_date") else None
pending_date_str = property_info["pending_date"].split("T")[0] if property_info.get("pending_date") else None
list_date = datetime.strptime(list_date_str, "%Y-%m-%d") if list_date_str else None
@@ -131,33 +136,31 @@ class RealtorScraper(Scraper):
if list_date:
if status == "sold" and last_sold_date:
days_on_mls = (last_sold_date - list_date).days
elif status in ('for_sale', 'for_rent'):
elif status in ("for_sale", "for_rent"):
days_on_mls = (today - list_date).days
if days_on_mls and days_on_mls < 0:
days_on_mls = None
listing = Property(
mls=mls,
mls_id=property_info["source"].get("listing_id")
if "source" in property_info and isinstance(property_info["source"], dict)
else None,
mls_id=(
property_info["source"].get("listing_id")
if "source" in property_info and isinstance(property_info["source"], dict)
else None
),
property_url=f"{self.PROPERTY_URL}{property_info['details']['permalink']}",
status=property_info["basic"]["status"].upper(),
list_price=property_info["basic"]["price"],
list_date=list_date,
prc_sqft=property_info["basic"].get("price")
/ property_info["basic"].get("sqft")
if property_info["basic"].get("price")
and property_info["basic"].get("sqft")
else None,
prc_sqft=(
property_info["basic"].get("price") / property_info["basic"].get("sqft")
if property_info["basic"].get("price") and property_info["basic"].get("sqft")
else None
),
last_sold_date=last_sold_date,
pending_date=pending_date,
latitude=property_info["address"]["location"]["coordinate"].get("lat")
if able_to_get_lat_long
else None,
longitude=property_info["address"]["location"]["coordinate"].get("lon")
if able_to_get_lat_long
else None,
latitude=property_info["address"]["location"]["coordinate"].get("lat") if able_to_get_lat_long else None,
longitude=property_info["address"]["location"]["coordinate"].get("lon") if able_to_get_lat_long else None,
address=self._parse_address(property_info, search_type="handle_listing"),
description=Description(
alt_photos=self.process_alt_photos(property_info.get("media", {}).get("photos", [])),
@@ -172,7 +175,7 @@ class RealtorScraper(Scraper):
garage=property_info["details"].get("garage"),
stories=property_info["details"].get("stories"),
),
days_on_mls=days_on_mls
days_on_mls=days_on_mls,
)
return [listing]
@@ -281,16 +284,12 @@ class RealtorScraper(Scraper):
Property(
mls_id=property_id,
property_url=f"{self.PROPERTY_URL}{property_info['details']['permalink']}",
address=self._parse_address(
property_info, search_type="handle_address"
),
address=self._parse_address(property_info, search_type="handle_address"),
description=self._parse_description(property_info),
)
]
def general_search(
self, variables: dict, search_type: str
) -> Dict[str, Union[int, list[Property]]]:
def general_search(self, variables: dict, search_type: str) -> Dict[str, Union[int, list[Property]]]:
"""
Handles a location area & returns a list of properties
"""
@@ -380,17 +379,15 @@ class RealtorScraper(Scraper):
)
pending_or_contingent_param = (
"or_filters: { contingent: true, pending: true }"
if self.listing_type == ListingType.PENDING
else ""
"or_filters: { contingent: true, pending: true }" if self.listing_type == ListingType.PENDING else ""
)
listing_type = ListingType.FOR_SALE if self.listing_type == ListingType.PENDING else self.listing_type
is_foreclosure = ""
if variables.get('foreclosure') is True:
if variables.get("foreclosure") is True:
is_foreclosure = "foreclosure: true"
elif variables.get('foreclosure') is False:
elif variables.get("foreclosure") is False:
is_foreclosure = "foreclosure: false"
if search_type == "comps": #: comps search, came from an address
@@ -400,11 +397,11 @@ class RealtorScraper(Scraper):
$offset: Int!,
) {
home_search(
query: {
query: {
%s
nearby: {
coordinates: $coordinates
radius: $radius
radius: $radius
}
status: %s
%s
@@ -453,7 +450,7 @@ class RealtorScraper(Scraper):
)
else: #: general search, came from an address
query = (
"""query Property_search(
"""query Property_search(
$property_id: [ID]!
$offset: Int!,
) {
@@ -464,7 +461,7 @@ class RealtorScraper(Scraper):
limit: 1
offset: $offset
) %s"""
% results_query
% results_query
)
payload = {
@@ -480,64 +477,72 @@ class RealtorScraper(Scraper):
properties: list[Property] = []
if (
response_json is None
or "data" not in response_json
or response_json["data"] is None
or search_key not in response_json["data"]
or response_json["data"][search_key] is None
or "results" not in response_json["data"][search_key]
response_json is None
or "data" not in response_json
or response_json["data"] is None
or search_key not in response_json["data"]
or response_json["data"][search_key] is None
or "results" not in response_json["data"][search_key]
):
return {"total": 0, "properties": []}
for result in response_json["data"][search_key]["results"]:
mls = (
result["source"].get("id")
if "source" in result and isinstance(result["source"], dict)
else None
)
def process_property(result: dict) -> Property | None:
mls = result["source"].get("id") if "source" in result and isinstance(result["source"], dict) else None
if not mls and self.mls_only:
continue
return
able_to_get_lat_long = (
result
and result.get("location")
and result["location"].get("address")
and result["location"]["address"].get("coordinate")
result
and result.get("location")
and result["location"].get("address")
and result["location"]["address"].get("coordinate")
)
is_pending = result["flags"].get("is_pending") or result["flags"].get("is_contingent")
if is_pending and self.listing_type != ListingType.PENDING:
continue
return
property_id = result["property_id"]
agents = self.get_agents(property_id)
realty_property = Property(
mls=mls,
mls_id=result["source"].get("listing_id")
if "source" in result and isinstance(result["source"], dict)
else None,
property_url=f"{self.PROPERTY_URL}{result['property_id']}" if self.listing_type != ListingType.FOR_RENT else f"{self.PROPERTY_URL}M{result['property_id']}?listing_status=rental",
mls_id=(
result["source"].get("listing_id")
if "source" in result and isinstance(result["source"], dict)
else None
),
property_url=(
f"{self.PROPERTY_URL}{property_id}"
if self.listing_type != ListingType.FOR_RENT
else f"{self.PROPERTY_URL}M{property_id}?listing_status=rental"
),
status="PENDING" if is_pending else result["status"].upper(),
list_price=result["list_price"],
list_date=result["list_date"].split("T")[0]
if result.get("list_date")
else None,
list_date=result["list_date"].split("T")[0] if result.get("list_date") else None,
prc_sqft=result.get("price_per_sqft"),
last_sold_date=result.get("last_sold_date"),
hoa_fee=result["hoa"]["fee"]
if result.get("hoa") and isinstance(result["hoa"], dict)
else None,
latitude=result["location"]["address"]["coordinate"].get("lat")
if able_to_get_lat_long
else None,
longitude=result["location"]["address"]["coordinate"].get("lon")
if able_to_get_lat_long
else None,
hoa_fee=result["hoa"]["fee"] if result.get("hoa") and isinstance(result["hoa"], dict) else None,
latitude=result["location"]["address"]["coordinate"].get("lat") if able_to_get_lat_long else None,
longitude=result["location"]["address"]["coordinate"].get("lon") if able_to_get_lat_long else None,
address=self._parse_address(result, search_type="general_search"),
description=self._parse_description(result),
days_on_mls=self.calculate_days_on_mls(result)
days_on_mls=self.calculate_days_on_mls(result),
agents=agents,
)
properties.append(realty_property)
return realty_property
with ThreadPoolExecutor(max_workers=self.NUM_PROPERTY_WORKERS) as executor:
futures = [
executor.submit(process_property, result) for result in response_json["data"][search_key]["results"]
]
for future in as_completed(futures):
result = future.result()
if result:
properties.append(result)
return {
"total": response_json["data"][search_key]["total"],
@@ -558,18 +563,14 @@ class RealtorScraper(Scraper):
search_type = (
"comps"
if self.radius and location_type == "address"
else "address"
if location_type == "address" and not self.radius
else "area"
else "address" if location_type == "address" and not self.radius else "area"
)
if location_type == "address":
if not self.radius: #: single address search, non comps
property_id = location_info["mpr_id"]
search_variables |= {"property_id": property_id}
gql_results = self.general_search(
search_variables, search_type=search_type
)
gql_results = self.general_search(search_variables, search_type=search_type)
if gql_results["total"] == 0:
listing_id = self.get_latest_listing_id(property_id)
if listing_id is None:
@@ -603,7 +604,7 @@ class RealtorScraper(Scraper):
}
if self.foreclosure:
search_variables['foreclosure'] = self.foreclosure
search_variables["foreclosure"] = self.foreclosure
result = self.general_search(search_variables, search_type=search_type)
total = result["total"]
@@ -624,6 +625,19 @@ class RealtorScraper(Scraper):
return homes
def get_agents(self, property_id: str) -> list[Agent]:
payload = f'{{"query":"query GetHome($property_id: ID!) {{\\n home(property_id: $property_id) {{\\n __typename\\n\\n consumerAdvertisers: consumer_advertisers {{\\n __typename\\n type\\n advertiserId: advertiser_id\\n name\\n phone\\n type\\n href\\n slogan\\n photo {{\\n __typename\\n href\\n }}\\n showRealtorLogo: show_realtor_logo\\n hours\\n }}\\n\\n\\n }}\\n}}\\n","variables":{{"property_id":"{property_id}"}}}}'
response = self.session.post(self.PROPERTY_GQL, data=payload)
data = response.json()
try:
ads = data["data"]["home"]["consumerAdvertisers"]
except (KeyError, TypeError):
return []
agents = [Agent(name=ad["name"], phone=ad["phone"]) for ad in ads]
return agents
@staticmethod
def _parse_neighborhoods(result: dict) -> Optional[str]:
neighborhoods_list = []
@@ -646,17 +660,19 @@ class RealtorScraper(Scraper):
def _parse_address(self, result: dict, search_type):
if search_type == "general_search":
address = result['location']['address']
address = result["location"]["address"]
else:
address = result["address"]
return Address(
street=" ".join([
self.handle_none_safely(address.get('street_number')),
self.handle_none_safely(address.get('street_direction')),
self.handle_none_safely(address.get('street_name')),
self.handle_none_safely(address.get('street_suffix')),
]).strip(),
street=" ".join(
[
self.handle_none_safely(address.get("street_number")),
self.handle_none_safely(address.get("street_direction")),
self.handle_none_safely(address.get("street_name")),
self.handle_none_safely(address.get("street_suffix")),
]
).strip(),
unit=address["unit"],
city=address["city"],
state=address["state_code"],
@@ -705,12 +721,12 @@ class RealtorScraper(Scraper):
today = datetime.now()
if list_date:
if result["status"] == 'sold':
if result["status"] == "sold":
if last_sold_date:
days = (last_sold_date - list_date).days
if days >= 0:
return days
elif result["status"] in ('for_sale', 'for_rent'):
elif result["status"] in ("for_sale", "for_rent"):
days = (today - list_date).days
if days >= 0:
return days

View File

@@ -1,5 +1,6 @@
class InvalidListingType(Exception):
"""Raised when a provided listing type is does not exist."""
class InvalidDate(Exception):
"""Raised when only one of date_from or date_to is provided or not in the correct format. ex: 2023-10-23 """
"""Raised when only one of date_from or date_to is provided or not in the correct format. ex: 2023-10-23"""

View File

@@ -31,6 +31,9 @@ ordered_properties = [
"stories",
"hoa_fee",
"parking_garage",
"agent",
"broker",
"broker_phone",
"primary_photo",
"alt_photos",
]
@@ -48,6 +51,14 @@ def process_result(result: Property) -> pd.DataFrame:
prop_data["state"] = address_data.state
prop_data["zip_code"] = address_data.zip
if "agents" in prop_data:
agents = prop_data["agents"]
if agents:
prop_data["agent"] = agents[0].name
if len(agents) > 1:
prop_data["broker"] = agents[1].name
prop_data["broker_phone"] = agents[1].phone
prop_data["price_per_sqft"] = prop_data["prc_sqft"]
description = result.description
@@ -72,9 +83,7 @@ def process_result(result: Property) -> pd.DataFrame:
def validate_input(listing_type: str) -> None:
if listing_type.upper() not in ListingType.__members__:
raise InvalidListingType(
f"Provided listing type, '{listing_type}', does not exist."
)
raise InvalidListingType(f"Provided listing type, '{listing_type}', does not exist.")
def validate_dates(date_from: str | None, date_to: str | None) -> None: