aligned and added the telegram and chat id

pull/231/head
Yariv Menachem 2024-12-10 19:29:34 +02:00
parent b147ac7f85
commit cc782ddfc1
2 changed files with 94 additions and 70 deletions

View File

@ -2,11 +2,9 @@ from __future__ import annotations
from datetime import datetime from datetime import datetime
import pandas as pd import pandas as pd
from typing import List, Tuple from typing import Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from pymongo import MongoClient, UpdateOne
from .jobs import JobPost, JobType, Location from .jobs import JobPost, JobType, Location
from .scrapers.utils import set_logger_level, extract_salary, create_logger from .scrapers.utils import set_logger_level, extract_salary, create_logger
from .scrapers.indeed import IndeedScraper from .scrapers.indeed import IndeedScraper
@ -22,14 +20,6 @@ from .scrapers.exceptions import (
GlassdoorException, GlassdoorException,
GoogleJobsException, GoogleJobsException,
) )
# Connect to MongoDB server
client = MongoClient("mongodb://localhost:27017/")
# Access a database (it will be created automatically if it doesn't exist)
db = client["jobs_database"]
# Access a collection
jobs_collection = db["jobs"]
def scrape_jobs( def scrape_jobs(
site_name: str | list[str] | Site | list[Site] | None = None, site_name: str | list[str] | Site | list[Site] | None = None,
@ -113,58 +103,22 @@ def scrape_jobs(
hours_old=hours_old, hours_old=hours_old,
) )
# def insert_jobs(jobs: List[JobPost], collection):
# # Convert JobPost objects to dictionaries
# # job_dicts = [job.model_dump() for job in jobs]
# job_dicts = [job.model_dump(exclude={"date_posted"}) for job in jobs]
# collection.insert_many(job_dicts)
# print(f"Inserted {len(job_dicts)} jobs into MongoDB.")
def insert_jobs(jobs: List[JobPost], collection):
"""
Perform bulk upserts for a list of JobPost objects into a MongoDB collection.
Only insert new jobs and return the list of newly inserted jobs.
"""
operations = []
new_jobs = [] # List to store the new jobs inserted into MongoDB
for job in jobs:
job_dict = job.model_dump(exclude={"date_posted"})
operations.append(
UpdateOne(
{"id": job.id}, # Match by `id`
{"$setOnInsert": job_dict}, # Only set fields if the job is being inserted (not updated)
upsert=True # Insert if not found, but do not update if already exists
)
)
if operations:
# Execute all operations in bulk
result = collection.bulk_write(operations)
print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}, Modified: {result.modified_count}")
# Get the newly inserted jobs (those that were upserted)
# The `upserted_count` corresponds to how many new documents were inserted
for i, job in enumerate(jobs):
if result.upserted_count > 0 and i < result.upserted_count:
new_jobs.append(job)
print(f"New Job ID: {job.id}, Label: {job.label}")
return new_jobs
def scrape_site(site: Site) -> Tuple[str, JobResponse]: def scrape_site(site: Site) -> Tuple[str, JobResponse]:
scraper_class = SCRAPER_MAPPING[site] scraper_class = SCRAPER_MAPPING[site]
scraper = scraper_class(proxies=proxies, ca_cert=ca_cert) scraper = scraper_class(proxies=proxies, ca_cert=ca_cert)
scraped_data: JobResponse = scraper.scrape(scraper_input) scraped_data: JobResponse = scraper.scrape(scraper_input)
insert_jobs(scraped_data.jobs, jobs_collection)
cap_name = site.value.capitalize() cap_name = site.value.capitalize()
site_name = "ZipRecruiter" if cap_name == "Zip_recruiter" else cap_name site_name = "ZipRecruiter" if cap_name == "Zip_recruiter" else cap_name
create_logger(site_name).info(f"finished scraping") create_logger(site_name).info(f"finished scraping")
return site.value, scraped_data return site.value, scraped_data
site_to_jobs_dict = {} site_to_jobs_dict = {}
merged_jobs:list[JobPost] = []
def worker(site): def worker(site):
site_val, scraped_info = scrape_site(site) site_val, scraped_info = scrape_site(site)
# Add the scraped jobs to the merged list
merged_jobs.extend(scraped_info.jobs) # Assuming scraped_info has 'jobs' as a list
return site_val, scraped_info return site_val, scraped_info
with ThreadPoolExecutor() as executor: with ThreadPoolExecutor() as executor:
@ -176,6 +130,7 @@ def scrape_jobs(
site_value, scraped_data = future.result() site_value, scraped_data = future.result()
site_to_jobs_dict[site_value] = scraped_data site_to_jobs_dict[site_value] = scraped_data
return merged_jobs
def convert_to_annual(job_data: dict): def convert_to_annual(job_data: dict):
if job_data["interval"] == "hourly": if job_data["interval"] == "hourly":
job_data["min_amount"] *= 2080 job_data["min_amount"] *= 2080

View File

@ -1,22 +1,91 @@
import asyncio
import csv import csv
from typing import List
from pymongo import MongoClient, UpdateOne
from telegram import Bot
from jobspy import scrape_jobs from jobspy import scrape_jobs
from jobspy.jobs import JobPost
TELEGRAM_API_TOKEN =
CHAT_ID =
# Connect to MongoDB server
client = MongoClient("mongodb://localhost:27017/")
# Access a database (it will be created automatically if it doesn't exist)
db = client["jobs_database"]
# Access a collection
jobs_collection = db["jobs"]
# Initialize the Telegram bot
bot = Bot(token=TELEGRAM_API_TOKEN)
jobs = scrape_jobs( async def send_job_to_telegram(job:JobPost):
"""
Send job details to Telegram chat.
"""
message = f"New Job Posted:\n\n" \
f"Job ID: {job.id}\n" \
f"Job Title: {job.title}\n" \
f"Company: {job.company_name}\n" \
f"Location: {job.location}\n" \
f"Link: {job.job_url}\n"
try:
await bot.sendMessage(chat_id=CHAT_ID, text=message)
print(f"Sent job to Telegram: {job.id}")
except Exception as e:
print(f"Failed to send job to Telegram: {e}")
def insert_jobs(jobs: List[JobPost], collection):
"""
Perform bulk upserts for a list of JobPost objects into a MongoDB collection.
Only insert new jobs and return the list of newly inserted jobs.
"""
operations = []
new_jobs = [] # List to store the new jobs inserted into MongoDB
for job in jobs:
job_dict = job.model_dump(exclude={"date_posted"})
operations.append(
UpdateOne(
{"id": job.id}, # Match by `id`
{"$setOnInsert": job_dict}, # Only set fields if the job is being inserted (not updated)
upsert=True # Insert if not found, but do not update if already exists
)
)
if operations:
# Execute all operations in bulk
result = collection.bulk_write(operations)
print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}, Modified: {result.modified_count}")
# Get the newly inserted jobs (those that were upserted)
# The `upserted_count` corresponds to how many new documents were inserted
for i, job in enumerate(jobs):
if result.upserted_count > 0 and i < result.upserted_count:
new_jobs.append(job)
print(f"New Job ID: {job.id}, Label: {job.title}")
return new_jobs
async def main():
jobs = scrape_jobs(
# site_name=["indeed", "linkedin", "zip_recruiter", "glassdoor", "google"], # site_name=["indeed", "linkedin", "zip_recruiter", "glassdoor", "google"],
site_name=["glassdoor"], site_name=["glassdoor"],
search_term="software engineer", search_term="software engineer",
google_search_term="software engineer jobs near Tel Aviv Israel since yesterday", google_search_term="software engineer jobs near Tel Aviv Israel since yesterday",
location="Central, Israel", location="Central, Israel",
locations=["Tel Aviv, Israel","Ramat Gan, Israel","Central, Israel","Rehovot ,Israel"], locations=["Ramat Gan, Israel"],
results_wanted=200, # locations=["Tel Aviv, Israel","Ramat Gan, Israel","Central, Israel","Rehovot ,Israel"],
results_wanted=50,
hours_old=200, hours_old=200,
country_indeed='israel', country_indeed='israel',
)
print(f"Found {len(jobs)} jobs")
# linkedin_fetch_description=True # gets more info such as description, direct job url (slower) new_jobs = insert_jobs(jobs, jobs_collection)
# proxies=["208.195.175.46:65095", "208.195.175.45:65095", "localhost"],
)
print(f"Found {len(jobs)} jobs")
# print(jobs.head()) for new_job in new_jobs:
# jobs.to_csv("jobs.csv", quoting=csv.QUOTE_NONNUMERIC, escapechar="\\", index=False) # to_excel await send_job_to_telegram(new_job)
# Run the async main function
if __name__ == "__main__":
asyncio.run(main())