added model dump method to fix insert problem to mongo

pull/231/head
Yariv Menachem 2024-12-11 17:22:53 +02:00
parent 175acfb918
commit bfc83bdc03
5 changed files with 50 additions and 32 deletions

View File

@ -16,23 +16,12 @@ class JobRepository:
# Access a collection # Access a collection
self.collection = self.db["jobs"] self.collection = self.db["jobs"]
def insert_or_update_job(self, job: JobPost): def insert_job(self, job: JobPost):
# Convert JobPost to dictionary # Convert JobPost to dictionary
job_dict = job.model_dump(exclude={"date_posted"}) job_dict = job.model_dump(exclude={"date_posted"})
# If it doesn't exist, insert a new job with the current `created_at` and `updated_at`
# Check if the job already exists by its ID self.collection.insert_one(job_dict)
if job.id: print(f"Inserted new job with title {job.title}.")
# If it exists, update the `updated_at` field and other fields
# job_dict['updated_at'] = datetime.utcnow() # Set updated time to current time
self.collection.update_one(
{'_id': job.id},
{'$set': job_dict}
)
print(f"Updated job with ID {job.id}.")
else:
# If it doesn't exist, insert a new job with the current `created_at` and `updated_at`
self.collection.insert_one(job_dict)
print(f"Inserted new job with title {job.title}.")
def insertManyIfNotFound(self, jobs: List[JobPost]) -> List[JobPost]: def insertManyIfNotFound(self, jobs: List[JobPost]) -> List[JobPost]:
""" """

View File

@ -88,7 +88,7 @@ class Country(Enum):
INDIA = ("india", "in", "co.in") INDIA = ("india", "in", "co.in")
INDONESIA = ("indonesia", "id") INDONESIA = ("indonesia", "id")
IRELAND = ("ireland", "ie", "ie") IRELAND = ("ireland", "ie", "ie")
ISRAEL = ("israel", "il","com") ISRAEL = ("israel", "il", "com")
ITALY = ("italy", "it", "it") ITALY = ("italy", "it", "it")
JAPAN = ("japan", "jp") JAPAN = ("japan", "jp")
KUWAIT = ("kuwait", "kw") KUWAIT = ("kuwait", "kw")
@ -165,9 +165,21 @@ class Country(Enum):
return country return country
valid_countries = [country.value for country in cls] valid_countries = [country.value for country in cls]
raise ValueError( raise ValueError(
f"Invalid country string: '{country_str}'. Valid countries are: {', '.join([country[0] for country in valid_countries])}" f"Invalid country string: '{country_str}'. Valid countries are: {
', '.join([country[0] for country in valid_countries])}"
) )
def to_dict(self):
return {
"name": self.value[0].split(",")
}
@staticmethod
def from_dict(data):
if not data:
return None
return Country[data["name"]]
class Location(BaseModel): class Location(BaseModel):
country: Country | str | None = None country: Country | str | None = None
@ -195,6 +207,19 @@ class Location(BaseModel):
location_parts.append(country_name.title()) location_parts.append(country_name.title())
return ", ".join(location_parts) return ", ".join(location_parts)
def model_dump(self):
# Convert the model into a dictionary and serialize the country enum
data = super().model_dump()
if self.country:
data['country'] = self.country.to_dict()
return data
@staticmethod
def model_load(data):
if 'country' in data:
data['country'] = Country.from_dict(data['country'])
return Location(**data)
class CompensationInterval(Enum): class CompensationInterval(Enum):
YEARLY = "yearly" YEARLY = "yearly"
@ -264,9 +289,18 @@ class JobPost(BaseModel):
# linkedin only atm # linkedin only atm
job_function: str | None = None job_function: str | None = None
class Config: def model_dump(self, exclude: set = None):
# Exclude `date_posted` in model dumps # Use `Location`'s custom serialization logic
exclude = {"date_posted"} data = super().model_dump(exclude=exclude)
if self.location:
data['location'] = self.location.model_dump()
return data
@staticmethod
def model_load(data):
if 'location' in data:
data['location'] = Location.model_load(data['location'])
return JobPost(**data)
class JobResponse(BaseModel): class JobResponse(BaseModel):

View File

@ -22,13 +22,11 @@ async def main():
) )
print(f"Found {len(jobs)} jobs") print(f"Found {len(jobs)} jobs")
for job in jobs: new_jobs = jobRepository.insertManyIfNotFound(jobs)
jobRepository.insert_or_update_job(job)
# new_jobs = jobRepository.insertManyIfNotFound(jobs, jobs_collection) for new_job in new_jobs:
await telegramBot.send_job(new_job)
# for new_job in new_jobs: # Run the async main function
# await telegramBot.send_job(new_job)
# Run the async main function
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())

View File

@ -16,8 +16,8 @@ class DatabaseTests:
Inserts a mock job into the database using JobRepository. Inserts a mock job into the database using JobRepository.
""" """
job = createMockJob() job = createMockJob()
self.jobRepository.insert_or_update_job(job) self.jobRepository.insert_job(job)
print("Job inserted successfully.") print(f"Job inserted successfully.{job.id}")
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,9 +1,6 @@
from datetime import datetime, date from datetime import datetime, date
from typing import List from typing import List
from jobspy.jobs import Country, JobPost, Location
from telegram import Location
from jobspy.jobs import Country, JobPost
# Creating some test job posts # Creating some test job posts