Merge pull request #1 from yariv245/goozali_scrap

Goozali scrap
pull/231/head
Yariv Menachem 2024-12-25 14:01:01 +02:00 committed by GitHub
commit 0189ecb0ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 30152 additions and 196 deletions

View File

@ -1,10 +1,14 @@
from __future__ import annotations
from datetime import datetime
from threading import Lock
import pandas as pd
from typing import Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from jobspy.scrapers.site import Site
from .scrapers.goozali import GoozaliScraper
from .jobs import JobPost, JobType, Location
from .scrapers.utils import set_logger_level, extract_salary, create_logger
from .scrapers.indeed import IndeedScraper
@ -12,7 +16,7 @@ from .scrapers.ziprecruiter import ZipRecruiterScraper
from .scrapers.glassdoor import GlassdoorScraper
from .scrapers.google import GoogleJobsScraper
from .scrapers.linkedin import LinkedInScraper
from .scrapers import SalarySource, ScraperInput, Site, JobResponse, Country
from .scrapers import SalarySource, ScraperInput, JobResponse, Country
from .scrapers.exceptions import (
LinkedInException,
IndeedException,
@ -21,6 +25,7 @@ from .scrapers.exceptions import (
GoogleJobsException,
)
def scrape_jobs(
site_name: str | list[str] | Site | list[Site] | None = None,
search_term: str | None = None,
@ -55,6 +60,7 @@ def scrape_jobs(
Site.ZIP_RECRUITER: ZipRecruiterScraper,
Site.GLASSDOOR: GlassdoorScraper,
Site.GOOGLE: GoogleJobsScraper,
Site.GOOZALI: GoozaliScraper,
}
set_logger_level(verbose)
@ -83,7 +89,6 @@ def scrape_jobs(
return site_types
country_enum = Country.from_string(country_indeed)
scraper_input = ScraperInput(
site_type=get_site_type(),
country=country_enum,
@ -100,7 +105,7 @@ def scrape_jobs(
results_wanted=results_wanted,
linkedin_company_ids=linkedin_company_ids,
offset=offset,
hours_old=hours_old,
hours_old=hours_old
)
def scrape_site(site: Site) -> Tuple[str, JobResponse]:
@ -114,150 +119,33 @@ def scrape_jobs(
site_to_jobs_dict = {}
merged_jobs: list[JobPost] = []
lock = Lock()
def worker(site):
logger = create_logger(f"Worker {site}")
logger.info("Starting")
try:
site_val, scraped_info = scrape_site(site)
# Add the scraped jobs to the merged list
merged_jobs.extend(scraped_info.jobs) # Assuming scraped_info has 'jobs' as a list
with lock:
merged_jobs.extend(scraped_info.jobs)
logger.info("Finished")
return site_val, scraped_info
except Exception as e:
logger.error(f"Error: {e}")
return None, None
with ThreadPoolExecutor() as executor:
with ThreadPoolExecutor(max_workers=5) as executor:
logger = create_logger("ThreadPoolExecutor")
future_to_site = {
executor.submit(worker, site): site for site in scraper_input.site_type
}
# An iterator over the given futures that yields each as it completes.
for future in as_completed(future_to_site):
try:
site_value, scraped_data = future.result()
if site_value and scraped_data:
site_to_jobs_dict[site_value] = scraped_data
except Exception as e:
logger.error(f"Future Error occurred: {e}")
return merged_jobs
def convert_to_annual(job_data: dict):
if job_data["interval"] == "hourly":
job_data["min_amount"] *= 2080
job_data["max_amount"] *= 2080
if job_data["interval"] == "monthly":
job_data["min_amount"] *= 12
job_data["max_amount"] *= 12
if job_data["interval"] == "weekly":
job_data["min_amount"] *= 52
job_data["max_amount"] *= 52
if job_data["interval"] == "daily":
job_data["min_amount"] *= 260
job_data["max_amount"] *= 260
job_data["interval"] = "yearly"
jobs_dfs: list[pd.DataFrame] = []
for site, job_response in site_to_jobs_dict.items():
for job in job_response.jobs:
job_data = job.dict()
job_url = job_data["job_url"]
job_data["job_url_hyper"] = f'<a href="{job_url}">{job_url}</a>'
job_data["site"] = site
job_data["company"] = job_data["company_name"]
job_data["job_type"] = (
", ".join(job_type.value[0] for job_type in job_data["job_type"])
if job_data["job_type"]
else None
)
job_data["emails"] = (
", ".join(job_data["emails"]) if job_data["emails"] else None
)
if job_data["location"]:
job_data["location"] = Location(
**job_data["location"]
).display_location()
compensation_obj = job_data.get("compensation")
if compensation_obj and isinstance(compensation_obj, dict):
job_data["interval"] = (
compensation_obj.get("interval").value
if compensation_obj.get("interval")
else None
)
job_data["min_amount"] = compensation_obj.get("min_amount")
job_data["max_amount"] = compensation_obj.get("max_amount")
job_data["currency"] = compensation_obj.get("currency", "USD")
job_data["salary_source"] = SalarySource.DIRECT_DATA.value
if enforce_annual_salary and (
job_data["interval"]
and job_data["interval"] != "yearly"
and job_data["min_amount"]
and job_data["max_amount"]
):
convert_to_annual(job_data)
else:
if country_enum == Country.USA:
(
job_data["interval"],
job_data["min_amount"],
job_data["max_amount"],
job_data["currency"],
) = extract_salary(
job_data["description"],
enforce_annual_salary=enforce_annual_salary,
)
job_data["salary_source"] = SalarySource.DESCRIPTION.value
job_data["salary_source"] = (
job_data["salary_source"]
if "min_amount" in job_data and job_data["min_amount"]
else None
)
job_df = pd.DataFrame([job_data])
jobs_dfs.append(job_df)
if jobs_dfs:
# Step 1: Filter out all-NA columns from each DataFrame before concatenation
filtered_dfs = [df.dropna(axis=1, how="all") for df in jobs_dfs]
# Step 2: Concatenate the filtered DataFrames
jobs_df = pd.concat(filtered_dfs, ignore_index=True)
# Desired column order
desired_order = [
"id",
"site",
"job_url_hyper" if hyperlinks else "job_url",
"job_url_direct",
"title",
"company",
"location",
"date_posted",
"job_type",
"salary_source",
"interval",
"min_amount",
"max_amount",
"currency",
"is_remote",
"job_level",
"job_function",
"listing_type",
"emails",
"description",
"company_industry",
"company_url",
"company_logo",
"company_url_direct",
"company_addresses",
"company_num_employees",
"company_revenue",
"company_description",
]
# Step 3: Ensure all desired columns are present, adding missing ones as empty
for column in desired_order:
if column not in jobs_df.columns:
jobs_df[column] = None # Add missing columns as empty
# Reorder the DataFrame according to the desired order
jobs_df = jobs_df[desired_order]
# Step 4: Sort the DataFrame as required
return jobs_df.sort_values(
by=["site", "date_posted"], ascending=[True, False]
).reset_index(drop=True)
else:
return pd.DataFrame()

View File

@ -2,6 +2,7 @@ import os
from typing import List
from dotenv import load_dotenv
from pymongo import MongoClient, UpdateOne
import pymongo
from jobspy.jobs import JobPost

View File

@ -185,6 +185,7 @@ class Location(BaseModel):
country: Country | str | None = None
city: Optional[str] = None
state: Optional[str] = None
text: str = None
def display_location(self) -> str:
location_parts = []
@ -253,6 +254,12 @@ class DescriptionFormat(Enum):
class JobPost(BaseModel):
# def __init__(self, obj):
# super().__init__()
# for key, value in obj.items():
# setattr(self, key, value)
id: str | None = None
title: str
company_name: str | None
@ -271,6 +278,7 @@ class JobPost(BaseModel):
emails: list[str] | None = None
is_remote: bool | None = None
listing_type: str | None = None
field: str | None = None
# linkedin specific
job_level: str | None = None

View File

@ -1,30 +1,44 @@
import asyncio
from db.job_repository import JobRepository
from jobspy import scrape_jobs
import re
from jobspy import Site, scrape_jobs
from jobspy.db.job_repository import JobRepository
from jobspy.jobs import JobPost
from jobspy.scrapers.utils import create_logger
from jobspy.telegram_bot import TelegramBot
logger = create_logger("Main")
filter_by_title: list[str] = ["test", "qa", "Lead", "Full-Stack", "Full Stack", "Fullstack", "Frontend", "Front-end", "Front End", "DevOps", "Physical", "Staff",
"automation", "BI", "Principal", "Architect", "Android", "Machine Learning", "Student", "Data Engineer", "DevSecOps"]
def filter_jobs_by_title_name(job: JobPost):
for filter_title in filter_by_title:
if re.search(filter_title, job.title, re.IGNORECASE):
logger.info(f"job filtered out by title: {job.id} , {
job.title} , found {filter_title}")
return False
return True
async def main():
telegramBot = TelegramBot()
jobRepository = JobRepository()
# sites_to_scrap = [Site.LINKEDIN, Site.GLASSDOOR, Site.INDEED, Site.GOOZALI]
sites_to_scrap = [Site.LINKEDIN]
# sites_to_scrap = [Site.GOOZALI]
jobs = scrape_jobs(
# site_name=["indeed", "linkedin", "zip_recruiter", "glassdoor", "google"],
site_name=["indeed"],
site_name=sites_to_scrap,
search_term="software engineer",
google_search_term="software engineer jobs near Tel Aviv Israel since yesterday",
location="Central, Israel",
# locations=["Rehovot"],
locations=["Tel Aviv, Israel", "Ramat Gan, Israel",
"Central, Israel", "Rehovot ,Israel"],
results_wanted=200,
hours_old=200,
country_indeed='israel',
hours_old=48,
country_indeed='israel'
)
print(f"Found {len(jobs)} jobs")
logger.info(f"Found {len(jobs)} jobs")
jobs = list(filter(filter_jobs_by_title_name, jobs))
newJobs = jobRepository.insertManyIfNotFound(jobs)
for newJob in newJobs:
await telegramBot.sendJob(newJob)

View File

@ -2,6 +2,8 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from jobspy.scrapers.site import Site
from ..jobs import (
Enum,
BaseModel,
@ -12,14 +14,6 @@ from ..jobs import (
)
class Site(Enum):
LINKEDIN = "linkedin"
INDEED = "indeed"
ZIP_RECRUITER = "zip_recruiter"
GLASSDOOR = "glassdoor"
GOOGLE = "google"
class SalarySource(Enum):
DIRECT_DATA = "direct_data"
DESCRIPTION = "description"

View File

@ -37,6 +37,7 @@ from ...jobs import (
logger = create_logger("Glassdoor")
class GlassdoorScraper(Scraper):
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
@ -62,7 +63,8 @@ class GlassdoorScraper(Scraper):
:return: JobResponse containing a list of jobs.
"""
self.scraper_input = scraper_input
self.scraper_input.results_wanted = min(900, scraper_input.results_wanted)
self.scraper_input.results_wanted = min(
900, scraper_input.results_wanted)
self.base_url = self.scraper_input.country.get_glassdoor_url()
self.session = create_session(
@ -71,16 +73,17 @@ class GlassdoorScraper(Scraper):
token = self._get_csrf_token()
headers["gd-csrf-token"] = token if token else fallback_token
self.session.headers.update(headers)
job_list: list[JobPost] = [];
job_list: list[JobPost] = []
for location in scraper_input.locations:
glassDoorLocatiions: List[GlassDoorLocationResponse] = self._get_locations(
location, scraper_input.is_remote
)
for glassDoorLocatiion in glassDoorLocatiions:
logger.info(f"Location: {glassDoorLocatiion.longName}")
locationType = get_location_type(glassDoorLocatiion);
locationId = get_location_id(glassDoorLocatiion);
jobs_temp = self.get_jobs(scraper_input,locationId,locationType);
locationType = get_location_type(glassDoorLocatiion)
locationId = get_location_id(glassDoorLocatiion)
jobs_temp = self.get_jobs(
scraper_input, locationId, locationType)
if (jobs_temp is not None and len(jobs_temp) > 1):
job_list.extend(jobs_temp)
return JobResponse(jobs=job_list)
@ -99,7 +102,8 @@ class GlassdoorScraper(Scraper):
jobs = []
self.scraper_input = scraper_input
try:
payload = self._add_payload(location_id, location_type, page_num, cursor)
payload = self._add_payload(
location_id, location_type, page_num, cursor)
response = self.session.post(
f"{self.base_url}/graph",
timeout_seconds=15,
@ -107,9 +111,11 @@ class GlassdoorScraper(Scraper):
)
if response.status_code != 200:
exc_msg = f"bad response status code: {response.status_code}"
logger.error(f"GlassdoorException : {exc_msg}")
raise GlassdoorException(exc_msg)
res_json = response.json()[0]
if "errors" in res_json:
logger.error("Error encountered in API response")
raise ValueError("Error encountered in API response")
except (
requests.exceptions.ReadTimeout,
@ -132,7 +138,9 @@ class GlassdoorScraper(Scraper):
if job_post:
jobs.append(job_post)
except Exception as exc:
raise GlassdoorException(f"Glassdoor generated an exception: {exc}")
logger.error(f"Glassdoor generated an exception: {exc}")
raise GlassdoorException(
f"Glassdoor generated an exception: {exc}")
return jobs, self.get_cursor_for_page(
res_json["data"]["jobListings"]["paginationCursors"], page_num + 1
@ -150,7 +158,8 @@ class GlassdoorScraper(Scraper):
cursor = None
range_start = 1 + (scraper_input.offset // self.jobs_per_page)
tot_pages = (scraper_input.results_wanted // self.jobs_per_page) + 2
tot_pages = (scraper_input.results_wanted //
self.jobs_per_page) + 2
range_end = min(tot_pages, self.max_pages + 1)
for page in range(range_start, range_end):
logger.info(f"search page: {page} / {range_end-1}")
@ -174,7 +183,8 @@ class GlassdoorScraper(Scraper):
"""
Fetches csrf token needed for API by visiting a generic page
"""
res = self.session.get(f"{self.base_url}/Job/computer-science-jobs.htm")
res = self.session.get(
f"{self.base_url}/Job/computer-science-jobs.htm")
pattern = r'"token":\s*"([^"]+)"'
matches = re.findall(pattern, res.text)
token = None
@ -234,7 +244,8 @@ class GlassdoorScraper(Scraper):
compensation=compensation,
is_remote=is_remote,
description=description,
emails=extract_emails_from_text(description) if description else None,
emails=extract_emails_from_text(
description) if description else None,
company_logo=company_logo,
listing_type=listing_type,
)
@ -280,7 +291,8 @@ class GlassdoorScraper(Scraper):
def _get_location(self, location: str, is_remote: bool) -> (int, str):
if not location or is_remote:
return "11047", "STATE" # remote options
url = f"{self.base_url}/findPopularLocationAjax.htm?maxLocationsToReturn=10&term={location}"
url = f"{
self.base_url}/findPopularLocationAjax.htm?maxLocationsToReturn=10&term={location}"
res = self.session.get(url)
if res.status_code != 200:
if res.status_code == 429:
@ -290,7 +302,8 @@ class GlassdoorScraper(Scraper):
else:
err = f"Glassdoor response status code {res.status_code}"
err += f" - {res.text}"
logger.error(f"Glassdoor response status code {res.status_code}")
logger.error(f"Glassdoor response status code {
res.status_code}")
return None, None
items = res.json()
@ -309,12 +322,14 @@ class GlassdoorScraper(Scraper):
# Example string 'Tel Aviv, Israel'
def get_city_from_location(self, location: str) -> str:
return location.split(',')[0].strip() # Replace space with %2 to get "Tel%2Aviv"
# Replace space with %2 to get "Tel%2Aviv"
return location.split(',')[0].strip()
def _get_locations(self, location: str, is_remote: bool) -> List[GlassDoorLocationResponse]:
if not location or is_remote:
return "11047", "STATE" # remote options
url = f"{self.base_url}/findPopularLocationAjax.htm?maxLocationsToReturn=10&term={location}"
url = f"{
self.base_url}/findPopularLocationAjax.htm?maxLocationsToReturn=10&term={location}"
res = self.session.get(url)
if res.status_code != 200:
if res.status_code == 429:
@ -324,7 +339,8 @@ class GlassdoorScraper(Scraper):
else:
err = f"Glassdoor response status code {res.status_code}"
err += f" - {res.text}"
logger.error(f"Glassdoor response status code {res.status_code}")
logger.error(f"Glassdoor response status code {
res.status_code}")
return None, None
formatted_city = self.get_city_from_location(location)
items: List[GlassDoorLocationResponse] = [
@ -334,10 +350,11 @@ class GlassdoorScraper(Scraper):
item for item in items if item.label is not None and formatted_city in item.label
]
if not items:
logger.error(f"location not found in Glassdoor: {location}")
logger.error(f"ValueError: Location '{
location}' not found on Glassdoor")
# raise ValueError(f"Location '{location}' not found on Glassdoor")
return items;
return items
def _add_payload(
self,
@ -351,9 +368,11 @@ class GlassdoorScraper(Scraper):
fromage = max(self.scraper_input.hours_old // 24, 1)
filter_params = []
if self.scraper_input.easy_apply:
filter_params.append({"filterKey": "applicationType", "values": "1"})
filter_params.append(
{"filterKey": "applicationType", "values": "1"})
if fromage:
filter_params.append({"filterKey": "fromAge", "values": str(fromage)})
filter_params.append(
{"filterKey": "fromAge", "values": str(fromage)})
payload = {
"operationName": "JobSearchResultsQuery",
"variables": {
@ -373,7 +392,8 @@ class GlassdoorScraper(Scraper):
}
if self.scraper_input.job_type:
payload["variables"]["filterParams"].append(
{"filterKey": "jobType", "values": self.scraper_input.job_type.value[0]}
{"filterKey": "jobType",
"values": self.scraper_input.job_type.value[0]}
)
return json.dumps([payload])

View File

@ -0,0 +1,109 @@
from datetime import datetime
import json
from jobspy.jobs import JobPost, Location
from jobspy.scrapers.goozali.model import GoozaliColumnTypeOptions, GoozaliResponse, GoozaliRow, GoozaliColumn, GoozaliColumnChoice, GoozaliResponseData
from .constants import job_post_column_to_goozali_column, job_post_column_names
# Mapping function to convert parsed dictionary into GoozaliResponseData
class GoozaliMapper:
def _map_dict_to_goozali_response_column_choice(self, column_choices: dict) -> dict[str, GoozaliColumnChoice]:
# Create a dictionary to store GoozaliColumnChoice objects
goolzali_column_choices: dict[str, GoozaliColumnChoice] = {}
# Map the data to GoozaliColumnChoice instances
for key, value in column_choices.items():
goolzali_column_choices[key] = GoozaliColumnChoice(
id=value['id'],
name=value['name'],
# Using get to safely access 'color', it may not always be present
color=value.get('color', "")
)
return goolzali_column_choices
def _map_dict_to_goozali_response_column_type_option(self, type_options: dict) -> GoozaliColumnTypeOptions:
goozali_type_options = GoozaliColumnTypeOptions(
typeOptions=type_options)
if goozali_type_options.choices:
goozali_type_options.choices = self._map_dict_to_goozali_response_column_choice(
goozali_type_options.choices)
return goozali_type_options
def _map_dict_to_goozali_response_columns(self, columns: list) -> list[GoozaliColumn]:
goozali_columns: list[GoozaliColumn] = []
for column in columns:
goozali_column = GoozaliColumn(**column)
if goozali_column.typeOptions:
goozali_column.typeOptions = self._map_dict_to_goozali_response_column_type_option(
goozali_column.typeOptions)
goozali_columns.append(goozali_column)
return goozali_columns
def _map_dict_to_goozali_response_data(self, data: dict) -> GoozaliResponseData:
columns = self._map_dict_to_goozali_response_columns(data['columns'])
rows = [GoozaliRow(**row) for row in data['rows']]
return GoozaliResponseData(
applicationId=data['applicationId'],
id=data['id'],
name=data['name'],
columns=columns,
primaryColumnId=data['primaryColumnId'],
meaningfulColumnOrder=data['meaningfulColumnOrder'],
viewOrder=data['viewOrder'],
rows=rows
)
# Updated map response function
def map_response_to_goozali_response(self, response) -> GoozaliResponse:
# Check the response content (this is a bytes object)
response_content = response.content
# Decode the byte content to a string
decoded_content = response_content.decode('utf-8')
# Now you can parse the decoded content as JSON
data = json.loads(decoded_content)
# Convert the 'data' dictionary into GoozaliResponseData object
data_obj = self._map_dict_to_goozali_response_data(data['data'])
# Return a new GoozaliResponse with msg and the converted data
return GoozaliResponse(msg=data['msg'], data=data_obj)
def get_value_by_job_post_Id(self, job_post_column: str, row: GoozaliRow, dict_column_name_to_column: dict[str, GoozaliColumn]):
goozali_column_name = job_post_column_to_goozali_column[job_post_column]
column = dict_column_name_to_column[goozali_column_name]
value = row.cellValuesByColumnId[column.id]
if (job_post_column == "location"):
location = Location(text="Not Found")
if type(value) is list:
location_text = column.typeOptions.choices[value[0]].name
location.text = location_text
return location
if (job_post_column == "company_industry"):
if type(value) is list:
value = column.typeOptions.choices[value[0]].name
if (job_post_column == "date_posted"):
return datetime.fromisoformat(value.replace("Z", "")).date()
if (job_post_column == "field"):
value = column.typeOptions.choices[value].name
return str(value)
def map_goozali_response_to_job_post(self, row: GoozaliRow, dict_column_name_to_column: dict[str, GoozaliColumn]) -> JobPost:
temp = {}
for col in job_post_column_names:
value = self.get_value_by_job_post_Id(
col, row, dict_column_name_to_column)
temp[col] = value
return JobPost.model_validate(temp)

View File

@ -0,0 +1,52 @@
from datetime import datetime, timedelta
from jobspy.scrapers.goozali.model import GoozaliRow, GoozaliColumn, GoozaliColumnChoice
from jobspy.scrapers.utils import create_logger
# Mapping function to convert parsed dictionary into GoozaliResponseData
logger = create_logger("GoozaliScrapperComponent")
class GoozaliScrapperComponent:
def __init__(self):
pass
# Function to filter GoozaliRows based on hours old
def filter_rows_by_column_choice(self, rows: list[GoozaliRow], column: GoozaliColumn, column_choice: GoozaliColumnChoice) -> list[GoozaliRow]:
return [
row for row in rows
if row.cellValuesByColumnId[column.id] == column_choice.id
]
def filter_rows_by_hours(self, rows: list[GoozaliRow], hours: int) -> list[GoozaliRow]:
# Current time
now = datetime.now()
# Calculate the time delta for the given hours
time_delta = timedelta(hours=hours)
# Filter rows
filtered_rows = [
row for row in rows
if now - row.createdTime <= time_delta
]
return filtered_rows
def find_column(self, columns: list[GoozaliColumn], column_name: str) -> GoozaliColumn:
for column in columns:
if (column.name == column_name):
return column
def find_choice_from_column(self, column: GoozaliColumn, choice_name: str) -> GoozaliColumnChoice:
if not column.typeOptions.choices:
logger.exception(f"Choices for column {column.name} doesn't exist")
raise Exception(f"Choices for column {column.name} doesn't exist")
for key, choice in column.typeOptions.choices.items():
if (choice.name == choice_name):
return choice
logger.exception(f"Can't find {choice_name} for column {column.name}")
raise Exception(f"Can't find {choice_name} for column {column.name}")

View File

@ -0,0 +1,96 @@
"""
jobspy.scrapers.Goozali
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape Goozali.
"""
from __future__ import annotations
from .. import Scraper, ScraperInput
from jobspy.scrapers.goozali.GoozaliMapper import GoozaliMapper
from jobspy.scrapers.goozali.GoozaliScrapperComponent import GoozaliScrapperComponent
from jobspy.scrapers.goozali.constants import extract_goozali_column_name, job_post_column_to_goozali_column
from jobspy.scrapers.goozali.model import GoozaliColumn, GoozaliFieldChoice, GoozaliPartRequest, GoozaliFullRequest
from jobspy.scrapers.site import Site
from ..utils import create_dict_by_key_and_value, create_session, create_logger
from ...jobs import (
JobPost,
JobResponse,
)
logger = create_logger("Goozali")
class GoozaliScraper(Scraper):
delay = 3
band_delay = 4
jobs_per_page = 25
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
"""
Initializes GoozaliScraper with the Goozalijob search url
"""
super().__init__(site=Site.GOOZALI, proxies=proxies, ca_cert=ca_cert)
self.session = create_session(
proxies=self.proxies,
ca_cert=ca_cert,
is_tls=False,
has_retry=True,
delay=5,
clear_cookies=False,
)
self.mapper = GoozaliMapper()
self.base_url = "https://airtable.com/v0.3/view/{view_id}/readSharedViewData"
self.component = GoozaliScrapperComponent()
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
"""
Scrapes Goozali for jobs with scraper_input criteria
:param scraper_input:
:return: job_response
"""
self.scraper_input = scraper_input
job_list: list[JobPost] = []
full_request = GoozaliFullRequest(self.base_url)
part_request = GoozaliPartRequest(self.base_url)
try:
response = self.session.get(
url=full_request.url,
params=full_request.params,
timeout=10,
headers=full_request.headers,
cookies=full_request.cookies)
logger.info(f"response: {str(response)}")
if (response.status_code != 200):
logger.error(f"Status code: {response.status_code}, Error: {
str(response.text)}")
return JobResponse(jobs=job_list)
except Exception as e:
logger.error(f"Exception: {str(e)}")
return JobResponse(jobs=job_list)
# model the response with models
goozali_response = self.mapper.map_response_to_goozali_response(
response=response)
# suggestL create groupby field and then filter by hours
# filter result by Field
column = self.component.find_column(
goozali_response.data.columns, job_post_column_to_goozali_column["field"])
column_choice = self.component.find_choice_from_column(
column, GoozaliFieldChoice.SOFTWARE_ENGINEERING.value)
filtered_rows_by_column_choice = self.component.filter_rows_by_column_choice(
goozali_response.data.rows, column, column_choice)
filtered_rows_by_age_and_column_choice = self.component.filter_rows_by_hours(
filtered_rows_by_column_choice, scraper_input.hours_old)
dict_column_name_to_column: dict[str, GoozaliColumn] = create_dict_by_key_and_value(
goozali_response.data.columns, extract_goozali_column_name)
# map to JobResponse Object
for row in filtered_rows_by_age_and_column_choice:
job_post = self.mapper.map_goozali_response_to_job_post(
row, dict_column_name_to_column)
job_list.append(job_post)
return JobResponse(jobs=job_list)

View File

@ -0,0 +1,29 @@
from jobspy.scrapers.goozali.model import GoozaliColumn
job_post_column_to_goozali_column = {
"date_posted": "Discovered",
"field": "Field",
"title": "Job Title",
"job_url": "Position Link",
"company_name": "Company",
"description": "Requirements",
"location": "Location",
"company_industry": "Company Industry",
"id": "Job ID"
}
job_post_column_names = ["id",
"date_posted",
"field",
"title",
"job_url",
"company_name",
"description",
"location",
"company_industry"]
# Key mapper: Extract 'name' as the key
def extract_goozali_column_name(column): return column.name if isinstance(
column, GoozaliColumn) else None

View File

@ -0,0 +1,20 @@
from typing import Optional
from jobspy.scrapers.goozali.model import GoozaliColumnTypeOptions
class GoozaliColumn:
def __init__(self, id: str, name: str, description: Optional[str], type: str, typeOptions: GoozaliColumnTypeOptions,
default: Optional[str], initialCreatedTime: str, initialCreatedByUserId: str,
lastModifiedTime: str, lastModifiedByUserId: str, isEditableFromSync: bool):
self.id = id
self.name = name
self.description = description
self.type = type
self.typeOptions = typeOptions
self.default = default
self.initialCreatedTime = initialCreatedTime
self.initialCreatedByUserId = initialCreatedByUserId
self.lastModifiedTime = lastModifiedTime
self.lastModifiedByUserId = lastModifiedByUserId
self.isEditableFromSync = isEditableFromSync

View File

@ -0,0 +1,8 @@
from typing import Optional
class GoozaliColumnChoice:
def __init__(self, id: str, name: str, color: Optional[str] = None):
self.id = id
self.name = name
self.color = color

View File

@ -0,0 +1,23 @@
from jobspy.scrapers.goozali.model import GoozaliColumnChoice
class GoozaliColumnTypeOptions:
def __init__(self, choiceOrder: list[str], choices: dict[str, GoozaliColumnChoice], disableColors: bool):
self.choiceOrder = choiceOrder
self.choices = choices
self.disableColors = disableColors
def __init__(self, typeOptions: dict):
self.choiceOrder = typeOptions.get("choiceOrder", [])
self.choices: dict[str, GoozaliColumnChoice] = typeOptions.get(
"choices", {})
self.disableColors = typeOptions.get("disableColors", False)
self.dateFormat = typeOptions.get("dateFormat", "")
self.isDateTime = typeOptions.get("isDateTime", False)
self.timeZone = typeOptions.get("timeZone", "")
self.shouldDisplayTimeZone = typeOptions.get(
"shouldDisplayTimeZone", False)
self.formulaTextParsed = typeOptions.get("formulaTextParsed", "")
self.dependencies = typeOptions.get("dependencies", [])
self.resultType = typeOptions.get("resultType", "")
self.resultIsArray = typeOptions.get("resultIsArray", False)

View File

@ -0,0 +1,31 @@
from enum import Enum
class GoozaliFieldChoice(Enum):
PRODUCT_MANAGEMENT = "Product Management"
DATA_ANALYST = "Data Analyst"
DATA_SCIENCE_ML_ALGORITHMS = "Data Science, ML & Algorithms"
SOFTWARE_ENGINEERING = "Software Engineering"
QA = "QA"
CYBERSECURITY = "Cybersecurity"
IT_AND_SYSTEM_ADMINISTRATION = "IT and System Administration"
FRONTEND_DEVELOPMENT = "Frontend Development"
DEVOPS = "DevOps"
UI_UX_DESIGN_CONTENT = "UI/UX, Design & Content"
HR_RECRUITMENT = "HR & Recruitment"
MOBILE_DEVELOPMENT = "Mobile Development"
HARDWARE_ENGINEERING = "Hardware Engineering"
EMBEDDED_LOW_LEVEL_FIRMWARE_ENGINEERING = "Embedded, Low Level & Firmware Engineering"
CUSTOMER_SUCCESS = "Customer Success"
PROJECT_MANAGEMENT = "Project Management"
OPERATIONS = "Operations"
FINANCE = "Finance"
SYSTEMS_ENGINEERING = "Systems Engineering"
MARKETING = "Marketing"
SALES = "Sales"
COMPLIANCE_LEGAL_POLICY = "Compliance, Legal & Policy"
C_LEVEL = "C-Level"
BUSINESS_DEVELOPMENT = "Business Development"
MECHANICAL_ENGINEERING = "Mechanical Engineering"
NATURAL_SCIENCE = "Natural Science"
OTHER = "Other"

View File

@ -0,0 +1,73 @@
import json
class GoozaliFullRequest():
def __init__(self, base_url: str):
self.view_id: str = "viwIOzPYaUGxlA0Jd"
self.url = base_url.format(view_id=self.view_id)
self.application_id: str = "appwewqLk7iUY4azc"
self.air_table_page_load_id: str = "pglqAAzFDZEWCEC7s"
self.stringifiedObjectParams = {
"shouldUseNestedResponseFormat": "true"}
self.cookies: dict[str, str] = {}
self.request_id: str = "req4q4tKw3woEEWxw&"
self.share_id: str = "shrQBuWjXd0YgPqV6"
self.signature: str = "be8bd40c133f051f929ebab311c416013f5af0d5acae4264575b88ccf051ee59"
self.headers = self._generate_headers()
self.params = self._generate_params()
self.cookies = {}
def _generate_params(self) -> dict[str, str]:
access_policy = self._generate_access_policy()
return {
"stringifiedObjectParams": self.stringifiedObjectParams,
"request_id": self.request_id,
"accessPolicy": access_policy
}
def _generate_headers(self) -> str:
return {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9,he-IL;q=0.8,he;q=0.7',
'priority': 'u=1, i',
'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'x-airtable-accept-msgpack': 'true',
'x-airtable-application-id': self.application_id,
'x-airtable-inter-service-client': 'webClient',
'x-airtable-page-load-id': self.air_table_page_load_id,
'x-early-prefetch': 'true',
'x-requested-with': 'XMLHttpRequest',
'x-time-zone': 'Asia/Jerusalem',
'x-user-locale': 'en'
}
def _generate_access_policy(self) -> str:
"""
Generates a JSON string for access policy.
"""
access_policy = {
"allowedActions": [
{"modelClassName": "view", "modelIdSelector": self.view_id,
"action": "readSharedViewData"},
{"modelClassName": "view", "modelIdSelector": self.view_id,
"action": "getMetadataForPrinting"},
{"modelClassName": "view", "modelIdSelector": self.view_id,
"action": "readSignedAttachmentUrls"},
{"modelClassName": "row", "modelIdSelector": f"rows *[displayedInView={self.view_id}]",
"action": "createDocumentPreviewSession"}
],
"shareId": self.share_id,
"applicationId": self.application_id,
"generationNumber": 0,
"expires": "2025-01-02T00:00:00.000Z",
"signature": self.signature
}
# Convert to a JSON string
return json.dumps(access_policy)

View File

@ -0,0 +1,74 @@
import json
class GoozaliPartRequest():
def __init__(self, base_url: str):
self.view_id: str = "viwNRSqqmqZLP0a3C"
self.url = base_url.format(view_id=self.view_id)
self.application_id: str = "app7OQjqEzTtCRq7u"
self.air_table_page_load_id: str = "pglG8mlPvtT0UiBaN"
self.stringifiedObjectParams = {
"shouldUseNestedResponseFormat": "true"}
self.session_id: str = ""
self.cookies: dict[str, str] = {}
self.request_id: str = "requFlC1ueInFAWHe"
self.share_id: str = "shrNtlFxOG2ag1kyB"
self.signature: str = "64689d9701d871b8f3a3fe8ad01de23c06421011eb92a8816399a9e2a869b523"
self.headers = self._generate_headers()
self.params = self._generate_params()
self.cookies = {}
def _generate_params(self) -> dict[str, str]:
access_policy = self._generate_access_policy()
return {
"stringifiedObjectParams": self.stringifiedObjectParams,
"request_id": self.request_id,
"accessPolicy": access_policy
}
def _generate_headers(self) -> str:
return {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9,he-IL;q=0.8,he;q=0.7',
'priority': 'u=1, i',
'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'x-airtable-accept-msgpack': 'true',
'x-airtable-application-id': self.application_id,
'x-airtable-inter-service-client': 'webClient',
'x-airtable-page-load-id': self.air_table_page_load_id,
'x-early-prefetch': 'true',
'x-requested-with': 'XMLHttpRequest',
'x-time-zone': 'Asia/Jerusalem',
'x-user-locale': 'en'
}
def _generate_access_policy(self) -> str:
"""
Generates a JSON string for access policy.
"""
access_policy = {
"allowedActions": [
{"modelClassName": "view", "modelIdSelector": self.view_id,
"action": "readSharedViewData"},
{"modelClassName": "view", "modelIdSelector": self.view_id,
"action": "getMetadataForPrinting"},
{"modelClassName": "view", "modelIdSelector": self.view_id,
"action": "readSignedAttachmentUrls"},
{"modelClassName": "row", "modelIdSelector": f"rows *[displayedInView={self.view_id}]",
"action": "createDocumentPreviewSession"}
],
"shareId": self.share_id,
"applicationId": self.application_id,
"generationNumber": 0,
"expires": "2025-01-02T00:00:00.000Z",
"signature": self.signature
}
# Convert to a JSON string
return json.dumps(access_policy)

View File

@ -0,0 +1,8 @@
from abc import ABC, abstractmethod
class GoozaliRequest(ABC):
@abstractmethod
def create(self):
"""Abstract method to be implemented in subclasses."""
pass

View File

@ -0,0 +1,7 @@
from jobspy.scrapers.goozali.model import GoozaliResponseData
class GoozaliResponse:
def __init__(self, msg: str, data: GoozaliResponseData):
self.msg = msg
self.data = data

View File

@ -0,0 +1,14 @@
from jobspy.scrapers.goozali.model import GoozaliRow, GoozaliColumn
class GoozaliResponseData:
def __init__(self, applicationId: str, id: str, name: str, columns: list[GoozaliColumn], primaryColumnId: str,
meaningfulColumnOrder: list[dict[str, str]], viewOrder: list[str], rows: list[GoozaliRow]):
self.applicationId = applicationId
self.id = id
self.name = name
self.columns = columns
self.primaryColumnId = primaryColumnId
self.meaningfulColumnOrder = meaningfulColumnOrder
self.viewOrder = viewOrder
self.rows = rows

View File

@ -0,0 +1,10 @@
from datetime import datetime
from typing import Dict, List
class GoozaliRow:
def __init__(self, id: str, createdTime: str, cellValuesByColumnId: Dict[str, List[str]]):
self.id = id
self.createdTime = datetime.strptime(
createdTime, '%Y-%m-%dT%H:%M:%S.%fZ')
self.cellValuesByColumnId = cellValuesByColumnId

View File

@ -0,0 +1,9 @@
from .GoozaliRow import GoozaliRow
from .GoozaliResponse import GoozaliResponse
from .GoozaliColumn import GoozaliColumn
from .GoozaliPartRequest import GoozaliPartRequest
from .GoozaliFullRequest import GoozaliFullRequest
from .GoozaliColumnTypeOptions import GoozaliColumnTypeOptions
from .GoozaliFieldChoice import GoozaliFieldChoice
from .GoozaliResponseData import GoozaliResponseData
from .GoozaliColumnChoice import GoozaliColumnChoice

View File

@ -0,0 +1,10 @@
from enum import Enum
class Site(Enum):
LINKEDIN = "linkedin"
INDEED = "indeed"
ZIP_RECRUITER = "zip_recruiter"
GLASSDOOR = "glassdoor"
GOOGLE = "google"
GOOZALI = "goozali"

View File

@ -1,4 +1,5 @@
from __future__ import annotations
from typing import Callable, TypeVar, List, Dict, Optional
import re
import logging
@ -283,3 +284,40 @@ def extract_job_type(description: str):
listing_types.append(key)
return listing_types if listing_types else None
K = TypeVar('K') # Key type
V = TypeVar('V') # Value type
def create_dict_by_key_and_value(
values: List[V],
key_mapper: Callable[[V], K],
value_mapper: Optional[Callable[[V], V]] = None
) -> Dict[K, V]:
"""
Create a dictionary by mapping keys and optionally mapping values.
:param values: List of input values
:param key_mapper: Function to map a value to a key
:param value_mapper: Optional function to map a value to a transformed value
:return: A dictionary with mapped keys and values
"""
result = {}
for value in values:
key = key_mapper(value)
result[key] = value_mapper(value) if value_mapper else value
return result
# Example usage:
# values = [
# {"id": 1, "name": "Alice"},
# {"id": 2, "name": "Bob"},
# {"id": 3, "name": "Charlie"}
# ]
# Key mapper: Extract 'id' as the key
# key_mapper = lambda x: x["id"]
# Value mapper: Extract 'name' as the value
# value_mapper = lambda x: x["name"]

View File

@ -3,9 +3,12 @@ from dotenv import load_dotenv
from telegram import Bot
from jobspy.jobs import JobPost
from jobspy.scrapers.utils import create_logger
load_dotenv()
logger = create_logger("TelegramBot")
class TelegramBot:
@ -26,6 +29,7 @@ class TelegramBot:
f"Link: {job.job_url}\n"
try:
await self.bot.sendMessage(chat_id=self.chatId, text=message)
print(f"Sent job to Telegram: {job.id}")
logger.info(f"Sent job to Telegram: {job.id}")
except Exception as e:
print(f"Failed to send job to Telegram: {e}")
logger.error(f"Failed to send job to Telegram: {job.id}")
logger.error(f"Error: {e}")

File diff suppressed because it is too large Load Diff

51
src/tests/test_goozali.py Normal file
View File

@ -0,0 +1,51 @@
import json
import os
from jobspy.jobs import JobPost
from jobspy.scrapers.goozali.GoozaliMapper import GoozaliMapper
from jobspy.scrapers.goozali.GoozaliScrapperComponent import GoozaliScrapperComponent
from jobspy.scrapers.goozali.constants import extract_goozali_column_name, job_post_column_to_goozali_column
from jobspy.scrapers.goozali.model import GoozaliColumn, GoozaliFieldChoice, GoozaliResponseData
from jobspy.scrapers.utils import create_dict_by_key_and_value
# URL Example
# https://airtable.com/v0.3/view/viwagEIbkfz2iMsLU/readSharedViewData?stringifiedObjectParams=%7B%22shouldUseNestedResponseFormat%22%3Atrue%7D&requestId=reqXyRSHWlXyiRgY9&accessPolicy=%7B%22allowedActions%22%3A%5B%7B%22modelClassName%22%3A%22view%22%2C%22modelIdSelector%22%3A%22viwagEIbkfz2iMsLU%22%2C%22action%22%3A%22readSharedViewData%22%7D%2C%7B%22modelClassName%22%3A%22view%22%2C%22modelIdSelector%22%3A%22viwagEIbkfz2iMsLU%22%2C%22action%22%3A%22getMetadataForPrinting%22%7D%2C%7B%22modelClassName%22%3A%22view%22%2C%22modelIdSelector%22%3A%22viwagEIbkfz2iMsLU%22%2C%22action%22%3A%22readSignedAttachmentUrls%22%7D%2C%7B%22modelClassName%22%3A%22row%22%2C%22modelIdSelector%22%3A%22rows%20*%5BdisplayedInView%3DviwagEIbkfz2iMsLU%5D%22%2C%22action%22%3A%22createDocumentPreviewSession%22%7D%5D%2C%22shareId%22%3A%22shr97tl6luEk4Ca9R%22%2C%22applicationId%22%3A%22app5sYJyDgcRbJWYU%22%2C%22generationNumber%22%3A0%2C%22expires%22%3A%222025-01-02T00%3A00%3A00.000Z%22%2C%22signature%22%3A%223aa292ee44d15aa75d9506200329e413653471f89e000fa370ef9fa38393070a%22%7D
try:
current_directory = os.getcwd()
file_path = os.path.join(current_directory, 'src',
'tests', 'goozali_response_example.json')
with open(file_path, 'r', encoding='ISO-8859-1') as file:
test_json_response = json.load(file)
print(test_json_response['msg']) # Output: Success
mapper = GoozaliMapper()
response_data: GoozaliResponseData = mapper._map_dict_to_goozali_response_data(
test_json_response['data'])
print("ya gever!!")
component = GoozaliScrapperComponent()
hours_old = 200
column = component.find_column(
response_data.columns, job_post_column_to_goozali_column["field"])
column_choice = component.find_choice_from_column(
column, GoozaliFieldChoice.SOFTWARE_ENGINEERING)
filtered_rows_by_column_choice = component.filter_rows_by_column_choice(
response_data.rows, column, column_choice)
filtered_rows_by_age_and_column_choice = component.filter_rows_by_hours(
filtered_rows_by_column_choice, hours_old)
dict_column_name_to_column: dict[str, GoozaliColumn] = create_dict_by_key_and_value(
response_data.columns, extract_goozali_column_name)
response: list[JobPost] = []
for row in filtered_rows_by_age_and_column_choice:
job_post = mapper.map_goozali_response_to_job_post(
row, dict_column_name_to_column)
response.append(job_post)
print("kingggggg")
except FileNotFoundError:
print("The file was not found.")
except json.JSONDecodeError:
print("There was an error decoding the JSON data.")
except UnicodeDecodeError as e:
print(f"Unicode decode error: {e}")