added mongo to save new jobs, new column to jobPost beacause mongo doesnt support date,

pull/231/head
Yariv Menachem 2024-12-10 17:54:04 +02:00
parent d388211a92
commit b147ac7f85
4 changed files with 67 additions and 9 deletions

View File

@ -1,10 +1,13 @@
from __future__ import annotations
from datetime import datetime
import pandas as pd
from typing import Tuple
from typing import List, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from .jobs import JobType, Location
from pymongo import MongoClient, UpdateOne
from .jobs import JobPost, JobType, Location
from .scrapers.utils import set_logger_level, extract_salary, create_logger
from .scrapers.indeed import IndeedScraper
from .scrapers.ziprecruiter import ZipRecruiterScraper
@ -19,7 +22,14 @@ from .scrapers.exceptions import (
GlassdoorException,
GoogleJobsException,
)
# Connect to MongoDB server
client = MongoClient("mongodb://localhost:27017/")
# Access a database (it will be created automatically if it doesn't exist)
db = client["jobs_database"]
# Access a collection
jobs_collection = db["jobs"]
def scrape_jobs(
site_name: str | list[str] | Site | list[Site] | None = None,
@ -103,10 +113,49 @@ def scrape_jobs(
hours_old=hours_old,
)
# def insert_jobs(jobs: List[JobPost], collection):
# # Convert JobPost objects to dictionaries
# # job_dicts = [job.model_dump() for job in jobs]
# job_dicts = [job.model_dump(exclude={"date_posted"}) for job in jobs]
# collection.insert_many(job_dicts)
# print(f"Inserted {len(job_dicts)} jobs into MongoDB.")
def insert_jobs(jobs: List[JobPost], collection):
"""
Perform bulk upserts for a list of JobPost objects into a MongoDB collection.
Only insert new jobs and return the list of newly inserted jobs.
"""
operations = []
new_jobs = [] # List to store the new jobs inserted into MongoDB
for job in jobs:
job_dict = job.model_dump(exclude={"date_posted"})
operations.append(
UpdateOne(
{"id": job.id}, # Match by `id`
{"$setOnInsert": job_dict}, # Only set fields if the job is being inserted (not updated)
upsert=True # Insert if not found, but do not update if already exists
)
)
if operations:
# Execute all operations in bulk
result = collection.bulk_write(operations)
print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}, Modified: {result.modified_count}")
# Get the newly inserted jobs (those that were upserted)
# The `upserted_count` corresponds to how many new documents were inserted
for i, job in enumerate(jobs):
if result.upserted_count > 0 and i < result.upserted_count:
new_jobs.append(job)
print(f"New Job ID: {job.id}, Label: {job.label}")
return new_jobs
def scrape_site(site: Site) -> Tuple[str, JobResponse]:
scraper_class = SCRAPER_MAPPING[site]
scraper = scraper_class(proxies=proxies, ca_cert=ca_cert)
scraped_data: JobResponse = scraper.scrape(scraper_input)
insert_jobs(scraped_data.jobs, jobs_collection)
cap_name = site.value.capitalize()
site_name = "ZipRecruiter" if cap_name == "Zip_recruiter" else cap_name
create_logger(site_name).info(f"finished scraping")

View File

@ -1,7 +1,8 @@
from __future__ import annotations
from dataclasses import Field
from typing import Optional
from datetime import date
from datetime import date, datetime
from enum import Enum
from pydantic import BaseModel
@ -240,7 +241,8 @@ class JobPost(BaseModel):
job_type: list[JobType] | None = None
compensation: Compensation | None = None
date_posted: date | None = None
date_posted: date | None
datetime_posted: datetime | None = None
emails: list[str] | None = None
is_remote: bool | None = None
listing_type: str | None = None
@ -262,6 +264,10 @@ class JobPost(BaseModel):
# linkedin only atm
job_function: str | None = None
class Config:
# Exclude `date_posted` in model dumps
exclude = {"date_posted"}
class JobResponse(BaseModel):
jobs: list[JobPost] = []

View File

@ -73,10 +73,11 @@ class GlassdoorScraper(Scraper):
self.session.headers.update(headers)
job_list: list[JobPost] = [];
for location in scraper_input.locations:
glassDoorLocatiions = self._get_locations(
glassDoorLocatiions: List[GlassDoorLocationResponse] = self._get_locations(
location, scraper_input.is_remote
)
for glassDoorLocatiion in glassDoorLocatiions:
logger.info(f"Location: {glassDoorLocatiion.longName}")
locationType = get_location_type(glassDoorLocatiion);
locationId = get_location_id(glassDoorLocatiion);
jobs_temp = self.get_jobs(scraper_input,locationId,locationType);
@ -198,7 +199,7 @@ class GlassdoorScraper(Scraper):
location_type = job["header"].get("locationType", "")
age_in_days = job["header"].get("ageInDays")
is_remote, location = False, None
date_diff = (datetime.now() - timedelta(days=age_in_days)).date()
date_diff = (datetime.now() - timedelta(days=age_in_days))
date_posted = date_diff if age_in_days is not None else None
if location_type == "S":
@ -226,7 +227,8 @@ class GlassdoorScraper(Scraper):
title=title,
company_url=company_url if company_id else None,
company_name=company_name,
date_posted=date_posted,
date_posted=date_posted.date(),
datetime_posted=date_posted,
job_url=job_url,
location=location,
compensation=compensation,

View File

@ -17,5 +17,6 @@ jobs = scrape_jobs(
# proxies=["208.195.175.46:65095", "208.195.175.45:65095", "localhost"],
)
print(f"Found {len(jobs)} jobs")
print(jobs.head())
jobs.to_csv("jobs.csv", quoting=csv.QUOTE_NONNUMERIC, escapechar="\\", index=False) # to_excel
# print(jobs.head())
# jobs.to_csv("jobs.csv", quoting=csv.QUOTE_NONNUMERIC, escapechar="\\", index=False) # to_excel