[enh]: make last_x_days generic

add mls_only
make radius generic
This commit is contained in:
Cullen Watson
2023-10-04 10:11:53 -05:00
parent 51bde20c3c
commit c4870677c2
9 changed files with 220 additions and 201 deletions

View File

@@ -1,103 +1,41 @@
import warnings
import pandas as pd
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from .core.scrapers import ScraperInput
from .utils import process_result, ordered_properties
from .utils import process_result, ordered_properties, validate_input
from .core.scrapers.realtor import RealtorScraper
from .core.scrapers.models import ListingType, Property, SiteName
from .exceptions import InvalidListingType
_scrapers = {
"realtor.com": RealtorScraper,
}
def _validate_input(listing_type: str) -> None:
if listing_type.upper() not in ListingType.__members__:
raise InvalidListingType(f"Provided listing type, '{listing_type}', does not exist.")
def _scrape_single_site(location: str, site_name: str, listing_type: str, radius: float, proxy: str = None, sold_last_x_days: int = None) -> pd.DataFrame:
"""
Helper function to scrape a single site.
"""
_validate_input(listing_type)
scraper_input = ScraperInput(
location=location,
listing_type=ListingType[listing_type.upper()],
site_name=SiteName.get_by_value(site_name.lower()),
proxy=proxy,
radius=radius,
sold_last_x_days=sold_last_x_days
)
site = _scrapers[site_name.lower()](scraper_input)
results = site.search()
print(f"found {len(results)}")
properties_dfs = [process_result(result) for result in results]
if not properties_dfs:
return pd.DataFrame()
return pd.concat(properties_dfs, ignore_index=True, axis=0)[ordered_properties]
from .core.scrapers.models import ListingType
from .exceptions import InvalidListingType, NoResultsFound
def scrape_property(
location: str,
listing_type: str = "for_sale",
radius: float = None,
sold_last_x_days: int = None,
mls_only: bool = False,
last_x_days: int = None,
proxy: str = None,
) -> pd.DataFrame:
"""
Scrape properties from Realtor.com based on a given location and listing type.
:param location: US Location (e.g. 'San Francisco, CA', 'Cook County, IL', '85281', '2530 Al Lipscomb Way')
:param listing_type: Listing type (e.g. 'for_sale', 'for_rent', 'sold'). Default is 'for_sale'.
:param radius: Radius in miles to find comparable properties on individual addresses. Optional.
:param sold_last_x_days: Number of past days to filter sold properties. Optional.
:param proxy: Proxy IP address to be used for scraping. Optional.
:returns: pd.DataFrame containing properties
"""
site_name = "realtor.com"
validate_input(listing_type)
if site_name is None:
site_name = list(_scrapers.keys())
scraper_input = ScraperInput(
location=location,
listing_type=ListingType[listing_type.upper()],
proxy=proxy,
radius=radius,
mls_only=mls_only,
last_x_days=last_x_days,
)
if not isinstance(site_name, list):
site_name = [site_name]
site = RealtorScraper(scraper_input)
results = site.search()
results = []
properties_dfs = [process_result(result) for result in results]
if not properties_dfs:
raise NoResultsFound("no results found for the query")
if len(site_name) == 1:
final_df = _scrape_single_site(location, site_name[0], listing_type, radius, proxy, sold_last_x_days)
results.append(final_df)
else:
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(_scrape_single_site, location, s_name, listing_type, radius, proxy, sold_last_x_days): s_name
for s_name in site_name
}
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
results = [df for df in results if not df.empty and not df.isna().all().all()]
if not results:
return pd.DataFrame()
final_df = pd.concat(results, ignore_index=True)
columns_to_track = ["Street", "Unit", "Zip"]
#: validate they exist, otherwise create them
for col in columns_to_track:
if col not in final_df.columns:
final_df[col] = None
return final_df
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
return pd.concat(properties_dfs, ignore_index=True, axis=0)[ordered_properties]

View File

@@ -5,7 +5,9 @@ from homeharvest import scrape_property
def main():
parser = argparse.ArgumentParser(description="Home Harvest Property Scraper")
parser.add_argument("location", type=str, help="Location to scrape (e.g., San Francisco, CA)")
parser.add_argument(
"location", type=str, help="Location to scrape (e.g., San Francisco, CA)"
)
parser.add_argument(
"-l",
@@ -33,21 +35,41 @@ def main():
help="Name of the output file (without extension)",
)
parser.add_argument("-p", "--proxy", type=str, default=None, help="Proxy to use for scraping")
parser.add_argument("-d", "--days", type=int, default=None, help="Sold in last _ days filter.")
parser.add_argument(
"-p", "--proxy", type=str, default=None, help="Proxy to use for scraping"
)
parser.add_argument(
"-d",
"--days",
type=int,
default=None,
help="Sold/listed in last _ days filter.",
)
parser.add_argument(
"-r",
"--sold-properties-radius",
dest="sold_properties_radius", # This makes sure the parsed argument is stored as radius_for_comps in args
"--radius",
type=float,
default=None,
help="Get comparable properties within _ (eg. 0.0) miles. Only applicable for individual addresses."
help="Get comparable properties within _ (eg. 0.0) miles. Only applicable for individual addresses.",
)
parser.add_argument(
"-m",
"--mls_only",
action="store_true",
help="If set, fetches only MLS listings.",
)
args = parser.parse_args()
result = scrape_property(args.location, args.listing_type, radius_for_comps=args.radius_for_comps, proxy=args.proxy)
result = scrape_property(
args.location,
args.listing_type,
radius=args.radius,
proxy=args.proxy,
mls_only=args.mls_only,
last_x_days=args.days,
)
if not args.filename:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

View File

@@ -8,14 +8,18 @@ from .models import Property, ListingType, SiteName
class ScraperInput:
location: str
listing_type: ListingType
site_name: SiteName
radius: float | None = None
mls_only: bool | None = None
proxy: str | None = None
sold_last_x_days: int | None = None
last_x_days: int | None = None
class Scraper:
def __init__(self, scraper_input: ScraperInput, session: requests.Session | tls_client.Session = None):
def __init__(
self,
scraper_input: ScraperInput,
session: requests.Session | tls_client.Session = None,
):
self.location = scraper_input.location
self.listing_type = scraper_input.listing_type
@@ -30,9 +34,9 @@ class Scraper:
self.session.proxies.update(proxies)
self.listing_type = scraper_input.listing_type
self.site_name = scraper_input.site_name
self.radius = scraper_input.radius
self.sold_last_x_days = scraper_input.sold_last_x_days
self.last_x_days = scraper_input.last_x_days
self.mls_only = scraper_input.mls_only
def search(self) -> list[Property]:
...

View File

@@ -106,12 +106,16 @@ class RealtorScraper(Scraper):
Property(
mls_id=property_id,
property_url=f"{self.PROPERTY_URL}{property_info['details']['permalink']}",
address=self._parse_address(property_info, search_type="handle_address"),
description=self._parse_description(property_info)
address=self._parse_address(
property_info, search_type="handle_address"
),
description=self._parse_description(property_info),
)
]
def general_search(self, variables: dict, search_type: str) -> Dict[str, Union[int, list[Property]]]:
def general_search(
self, variables: dict, search_type: str
) -> Dict[str, Union[int, list[Property]]]:
"""
Handles a location area & returns a list of properties
"""
@@ -169,17 +173,23 @@ class RealtorScraper(Scraper):
}
}"""
sold_date_param = ('sold_date: { min: "$today-%sD" }' % self.sold_last_x_days
if self.listing_type == ListingType.SOLD and self.sold_last_x_days
else "")
sort_param = ('sort: [{ field: sold_date, direction: desc }]'
if self.listing_type == ListingType.SOLD
else 'sort: [{ field: list_date, direction: desc }]')
date_param = (
'sold_date: { min: "$today-%sD" }' % self.last_x_days
if self.listing_type == ListingType.SOLD and self.last_x_days
else (
'list_date: { min: "$today-%sD" }' % self.last_x_days
if self.last_x_days
else ""
)
)
sort_param = (
"sort: [{ field: sold_date, direction: desc }]"
if self.listing_type == ListingType.SOLD
else "sort: [{ field: list_date, direction: desc }]"
)
if search_type == "comps":
print('general - comps')
query = (
"""query Property_search(
query = """query Property_search(
$coordinates: [Float]!
$radius: String!
$offset: Int!,
@@ -197,16 +207,13 @@ class RealtorScraper(Scraper):
limit: 200
offset: $offset
) %s""" % (
self.listing_type.value.lower(),
sold_date_param,
sort_param,
results_query
)
self.listing_type.value.lower(),
date_param,
sort_param,
results_query,
)
else:
print('general - not comps')
query = (
"""query Home_search(
query = """query Home_search(
$city: String,
$county: [String],
$state_code: String,
@@ -225,13 +232,11 @@ class RealtorScraper(Scraper):
%s
limit: 200
offset: $offset
) %s"""
% (
self.listing_type.value.lower(),
sold_date_param,
sort_param,
results_query
)
) %s""" % (
self.listing_type.value.lower(),
date_param,
sort_param,
results_query,
)
payload = {
@@ -247,12 +252,12 @@ class RealtorScraper(Scraper):
properties: list[Property] = []
if (
response_json is None
or "data" not in response_json
or response_json["data"] is None
or search_key not in response_json["data"]
or response_json["data"][search_key] is None
or "results" not in response_json["data"][search_key]
response_json is None
or "data" not in response_json
or response_json["data"] is None
or search_key not in response_json["data"]
or response_json["data"][search_key] is None
or "results" not in response_json["data"][search_key]
):
return {"total": 0, "properties": []}
@@ -264,32 +269,44 @@ class RealtorScraper(Scraper):
else None
)
if not mls:
if not mls and self.mls_only:
continue
able_to_get_lat_long = result and result.get("location") and result["location"].get("address") and result["location"]["address"].get("coordinate")
able_to_get_lat_long = (
result
and result.get("location")
and result["location"].get("address")
and result["location"]["address"].get("coordinate")
)
realty_property = Property(
mls=mls,
mls_id=result["source"].get("listing_id") if "source" in result and isinstance(result["source"], dict) else None,
mls_id=result["source"].get("listing_id")
if "source" in result and isinstance(result["source"], dict)
else None,
property_url=f"{self.PROPERTY_URL}{result['property_id']}",
status=result["status"].upper(),
list_price=result["list_price"],
list_date=result["list_date"].split("T")[0] if result.get("list_date") else None,
list_date=result["list_date"].split("T")[0]
if result.get("list_date")
else None,
prc_sqft=result.get("price_per_sqft"),
last_sold_date=result.get("last_sold_date"),
hoa_fee=result["hoa"]["fee"] if result.get("hoa") and isinstance(result["hoa"], dict) else None,
latitude=result["location"]["address"]["coordinate"].get("lat") if able_to_get_lat_long else None,
longitude=result["location"]["address"]["coordinate"].get("lon") if able_to_get_lat_long else None,
hoa_fee=result["hoa"]["fee"]
if result.get("hoa") and isinstance(result["hoa"], dict)
else None,
latitude=result["location"]["address"]["coordinate"].get("lat")
if able_to_get_lat_long
else None,
longitude=result["location"]["address"]["coordinate"].get("lon")
if able_to_get_lat_long
else None,
address=self._parse_address(result, search_type="general_search"),
neighborhoods=self._parse_neighborhoods(result),
description=self._parse_description(result)
description=self._parse_description(result),
)
properties.append(realty_property)
# print(response_json["data"]["property_search"], variables["offset"])
# print(response_json["data"]["home_search"]["total"], variables["offset"])
return {
"total": response_json["data"][search_key]["total"],
"properties": properties,
@@ -304,14 +321,13 @@ class RealtorScraper(Scraper):
}
search_type = "comps" if self.radius and location_type == "address" else "area"
print(search_type)
if location_type == "address":
if not self.radius: #: single address search, non comps
if not self.radius: #: single address search, non comps
property_id = location_info["mpr_id"]
search_variables |= {"property_id": property_id}
return self.handle_address(property_id)
else: #: general search, comps (radius)
else: #: general search, comps (radius)
coordinates = list(location_info["centroid"].values())
search_variables |= {
"coordinates": coordinates,
@@ -370,10 +386,10 @@ class RealtorScraper(Scraper):
)
return Address(
street=f"{result['address']['street_number']} {result['address']['street_name']} {result['address']['street_suffix']}",
unit=result['address']['unit'],
city=result['address']['city'],
state=result['address']['state_code'],
zip=result['address']['postal_code'],
unit=result["address"]["unit"],
city=result["address"]["city"],
state=result["address"]["state_code"],
zip=result["address"]["postal_code"],
)
@staticmethod
@@ -390,4 +406,4 @@ class RealtorScraper(Scraper):
year_built=description_data.get("year_built"),
garage=description_data.get("garage"),
stories=description_data.get("stories"),
)
)

View File

@@ -1,4 +1,4 @@
from .core.scrapers.models import Property
from .core.scrapers.models import Property, ListingType
import pandas as pd
ordered_properties = [
@@ -73,4 +73,11 @@ def process_result(result: Property) -> pd.DataFrame:
properties_df = pd.DataFrame([prop_data])
properties_df = properties_df.reindex(columns=ordered_properties)
return properties_df[ordered_properties]
return properties_df[ordered_properties]
def validate_input(listing_type: str) -> None:
if listing_type.upper() not in ListingType.__members__:
raise InvalidListingType(
f"Provided listing type, '{listing_type}', does not exist."
)