created new component to able test it properly

pull/231/head
Yariv Menachem 2024-12-17 19:00:47 +02:00
parent 7926356233
commit 7971d0fab7
2 changed files with 59 additions and 45 deletions

View File

@ -0,0 +1,49 @@
from datetime import datetime
import json
from jobspy.scrapers.goozali.model import GoozaliRow
from jobspy.scrapers.goozali.model.GoozaliColumn import GoozaliColumn
from jobspy.scrapers.goozali.model.GoozaliColumnChoice import GoozaliColumnChoice
from jobspy.scrapers.utils import create_logger
# Mapping function to convert parsed dictionary into GoozaliResponseData
logger = create_logger("GoozaliScrapperComponent")
class GoozaliScrapperComponent:
def __init__(self):
pass
# Function to filter GoozaliRows based on hours old
def filter_rows_by_hours(self, rows: list[GoozaliRow], hours: int) -> list[GoozaliRow]:
# Current time
now = datetime.now()
# Calculate the time delta for the given hours
time_delta = datetime.timedelta(hours=hours)
# Filter rows
filtered_rows = [
row for row in rows
if now - row.createdTime <= time_delta
]
return filtered_rows
def find_column(self, columns: list[GoozaliColumn], column_name: str) -> GoozaliColumn:
for column in columns:
if (column.name == column_name):
return column
def find_choice_from_column(self, column: GoozaliColumn, choice_name: str) -> GoozaliColumnChoice:
if not column.typeOptions.choices:
logger.exception(f"Choices for column {column.name} doesn't exist")
raise Exception(f"Choices for column {column.name} doesn't exist")
for key, choice in column.typeOptions.choices.items():
if (choice.name == choice_name):
return choice
logger.exception(f"Can't find {choice_name} for column {column.name}")
raise Exception(f"Can't find {choice_name} for column {column.name}")

View File

@ -12,7 +12,9 @@ import json
from jobspy.scrapers import Scraper, ScraperInput
from jobspy.scrapers.goozali.GoozaliMapper import GoozaliMapper
from jobspy.scrapers.goozali.GoozaliScrapperComponent import GoozaliScrapperComponent
from jobspy.scrapers.goozali.model import GoozaliRow, GoozaliColumn, GoozaliResponse, GoozaliPartRequest, GoozaliFullRequest
from jobspy.scrapers.goozali.model.GoozaliColumnChoice import GoozaliColumnChoice
from jobspy.scrapers.site import Site
from ..utils import create_session, create_logger
@ -47,47 +49,7 @@ class GoozaliScraper(Scraper):
self.mapper = GoozaliMapper()
self.base_url = "https://airtable.com/v0.3/view/{view_id}/readSharedViewData"
self.view_ids = ["viwIOzPYaUGxlA0Jd"]
def map_respone_to_goozali_response(self, response) -> GoozaliResponse:
# Check the response content (this is a bytes object)
response_content = response.content
# Decode the byte content to a string
decoded_content = response_content.decode('utf-8')
# Now you can parse the decoded content as JSON
data = json.loads(decoded_content)
return GoozaliResponse(**data)
# Function to filter GoozaliRows based on hours old
def filter_rows_by_hours(self, rows: list[GoozaliRow], hours: int) -> list[GoozaliRow]:
# Current time
now = datetime.datetime.now()
# Calculate the time delta for the given hours
time_delta = datetime.timedelta(hours=hours)
# Filter rows
filtered_rows = [
row for row in rows
if now - row.createdTime <= time_delta
]
return filtered_rows
def find_column(self, columns: list[GoozaliColumn], column_name: str) -> GoozaliColumn:
for column in columns:
if (column.name == column_name):
return column
# def filter_rows_by_column(rows: list[GoozaliRow], goozali_column: GoozaliColumn) -> list[GoozaliRow]:
# # Filter rows
# filtered_rows = [
# row for row in rows
# if row.cellValuesByColumnId[goozali_column.id] == goozali_column.
# ]
# return filtered_rows
self.component = GoozaliScrapperComponent()
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
"""
@ -118,12 +80,15 @@ class GoozaliScraper(Scraper):
# model the response with models
goozali_response = self.mapper.map_response_to_goozali_response(
response=response)
# filter by date
filtered_rows_by_age = self.filter_rows_by_hours(
goozali_response.data.rows, scraper_input.hours_old)
# suggestL create groupby field and then filter by hours
# filter result by Field like the web
field_cloumn = self.find_column(
field_cloumn = self.component.find_column(
goozali_response.data.columns, "Field")
software_engineering_choice = self.component.find_choice_from_column(
field_cloumn, "Software Engineering")
# filter by date
filtered_rows_by_age = self.component.filter_rows_by_hours(
goozali_response.data.rows, scraper_input.hours_old)
# map to JobResponse Object
return JobResponse(jobs=job_list)