position state works
pull/231/head
Yariv Menachem 2025-01-02 16:19:05 +02:00
parent fe3ee7c4d8
commit 1f2b1b3892
8 changed files with 409 additions and 52 deletions

5
src/db/Position.py Normal file
View File

@ -0,0 +1,5 @@
from pydantic import BaseModel
class Position(BaseModel):
name: str

13
src/db/User.py Normal file
View File

@ -0,0 +1,13 @@
from datetime import datetime
from typing import Optional, Union
from pydantic import BaseModel
from db.Position import Position
class User(BaseModel):
full_name: str
username: str
chat_id: Union[int,str]
field: Optional[Position] = None

View File

@ -3,9 +3,9 @@ from typing import Optional
from dotenv import load_dotenv
from pymongo import UpdateOne
from .monogo_db import MongoDB
from jobspy import create_logger
from jobspy.jobs import JobPost
from .monogo_db import mongo_client
load_dotenv()
@ -21,7 +21,6 @@ class JobRepository:
self = super().__new__(cls)
cls._instance = self
self.logger = create_logger("JobRepository")
mongo_client = MongoDB()
self.collection = mongo_client.db["jobs"]
return cls._instance

View File

@ -32,3 +32,5 @@ class MongoDB:
self.db = client[database_name]
logger.info("Succeed connect to MongoDB")
return cls._instance
mongo_client = MongoDB()

View File

@ -0,0 +1,106 @@
from typing import Optional
from dotenv import load_dotenv
from pymongo import UpdateOne
from jobspy import create_logger
from .Position import Position
from .monogo_db import mongo_client
load_dotenv()
class PositionRepository:
_instance = None
def __new__(cls):
if cls._instance is not None:
return cls._instance
self = super().__new__(cls)
cls._instance = self
self.logger = create_logger("PositionRepository")
self.collection = mongo_client.db["field"]
return cls._instance
def find_all(self) -> list[Position]:
positions = list(self.collection.find({}))
return [Position(**position) for position in positions]
def find_by_id(self, position_id: str) -> Optional[Position]:
"""
Finds a position document in the collection by its ID.
Args:
position_id: The ID of the position to find.
Returns:
The position document if found, otherwise None.
"""
result = self.collection.find_one({"id": position_id})
return Position(**result)
def update(self, position: Position) -> bool:
"""
Updates a Position in the database.
Args:
position: A dictionary representing the Position data.
Returns:
True if the update was successful, False otherwise.
"""
result = self.collection.update_one({"id": position.id}, {"$set": position.model_dump()})
return result.modified_count > 0
def insert_position(self, position: Position):
"""
Inserts a new position posting into the database collection.
Args:
position (Position): The Position object to be inserted.
Raises:
Exception: If an error occurs during insertion.
"""
self.collection.insert_one(position.model_dump())
self.logger.info(f"Inserted new position with name {position.name}.")
def insert_many_if_not_found(self, positions: list[Position]) -> tuple[list[Position], list[Position]]:
"""
Perform bulk upserts for a list of Position objects into a MongoDB collection.
Only insert new positions and return the list of newly inserted positions.
"""
operations = []
new_positions = [] # List to store the new positions inserted into MongoDB
old_positions = [] # List to store the new positions inserted into MongoDB
for position in positions:
position_dict = position.model_dump()
operations.append(
UpdateOne(
{"id": position.id}, # Match by `id`
# Only set positions if the position is being inserted (not updated)
{"$setOnInsert": position_dict},
upsert=True # Insert if not found, but do not update if already exists
)
)
if operations:
# Execute all operations in bulk
result = self.collection.bulk_write(operations)
self.logger.info(f"Matched: {result.matched_count}, Upserts: {
result.upserted_count}, Modified: {result.modified_count}")
# Get the newly inserted positions (those that were upserted)
# The `upserted_count` corresponds to how many new documents were inserted
for i, position in enumerate(positions):
if result.upserted_count > 0 and i < result.upserted_count:
new_positions.append(position)
else:
old_positions.append(position)
return old_positions, new_positions
position_repository = PositionRepository()

99
src/db/user_repository.py Normal file
View File

@ -0,0 +1,99 @@
from typing import Optional
from dotenv import load_dotenv
from pymongo import UpdateOne
from jobspy import create_logger
from .User import User
from .monogo_db import mongo_client
load_dotenv()
class UserRepository:
_instance = None
def __new__(cls):
if cls._instance is not None:
return cls._instance
self = super().__new__(cls)
cls._instance = self
self.logger = create_logger("UserRepository")
self.collection = mongo_client.db["user"]
return cls._instance
def find_by_id(self, user_id: str) -> Optional[User]:
"""
Finds a user document in the collection by its ID.
Args:
user_id: The ID of the user to find.
Returns:
The user document if found, otherwise None.
"""
result = self.collection.find_one({"id": user_id})
return User(**result)
def update(self, user: User) -> bool:
"""
Updates a User in the database.
Args:
user: A dictionary representing the User data.
Returns:
True if the update was successful, False otherwise.
"""
result = self.collection.update_one({"id": user.id}, {"$set": user.model_dump()})
return result.modified_count > 0
def insert_user(self, user: User):
"""
Inserts a new user posting into the database collection.
Args:
user (User): The User object to be inserted.
Raises:
Exception: If an error occurs during insertion.
"""
self.collection.insert_one(user.model_dump())
self.logger.info(f"Inserted new user with username {user.username}.")
def insert_many_if_not_found(self, users: list[User]) -> tuple[list[User], list[User]]:
"""
Perform bulk upserts for a list of User objects into a MongoDB collection.
Only insert new users and return the list of newly inserted users.
"""
operations = []
new_users = [] # List to store the new users inserted into MongoDB
old_users = [] # List to store the new users inserted into MongoDB
for user in users:
user_dict = user.model_dump()
operations.append(
UpdateOne(
{"id": user.id}, # Match by `id`
# Only set fields if the user is being inserted (not updated)
{"$setOnInsert": user_dict},
upsert=True # Insert if not found, but do not update if already exists
)
)
if operations:
# Execute all operations in bulk
result = self.collection.bulk_write(operations)
self.logger.info(f"Matched: {result.matched_count}, Upserts: {
result.upserted_count}, Modified: {result.modified_count}")
# Get the newly inserted users (those that were upserted)
# The `upserted_count` corresponds to how many new documents were inserted
for i, user in enumerate(users):
if result.upserted_count > 0 and i < result.upserted_count:
new_users.append(user)
else:
old_users.append(user)
return old_users, new_users

View File

@ -1,67 +1,26 @@
import os
from telegram import Update, ReplyKeyboardMarkup, ReplyKeyboardRemove
from telegram.ext import Application, CommandHandler, ConversationHandler, \
MessageHandler, filters, ContextTypes
from telegram import Update
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, Updater
from jobspy.scrapers.site import Site
from config.settings import settings
from jobspy.scrapers.utils import create_logger
from telegram_handler import TelegramDefaultHandler
from telegram_handler.button_callback.telegram_callback_handler import TelegramCallHandler
from telegram_handler.telegram_start_handler import start_conv_handler
logger = create_logger("Main")
_api_token = os.getenv("TELEGRAM_API_TOKEN")
_api_token = settings.telegram_api_token
application = Application.builder().token(_api_token).build()
title_filters: list[str] = ["test", "qa", "Lead", "Full-Stack", "Full Stack", "Fullstack", "Frontend", "Front-end",
"Front End", "DevOps", "Physical", "Staff",
"automation", "BI ", "Principal", "Architect", "Android", "Machine Learning", "Student",
"Data Engineer", "DevSecOps"]
async def stop(update, context):
logger.info("Stop polling from telegram")
application.stop_running()
if __name__ == "__main__":
logger.info("Starting initialize ")
search_term = "software engineer"
locations = ["Tel Aviv, Israel", "Ramat Gan, Israel",
"Central, Israel", "Rehovot ,Israel"]
tg_callback_handler = TelegramCallHandler()
tg_handler_all = TelegramDefaultHandler(sites=[Site.LINKEDIN, Site.GLASSDOOR, Site.INDEED, Site.GOOZALI],
locations=locations,
title_filters=title_filters,
search_term=search_term)
application.add_handler(CommandHandler("find", tg_handler_all.handle))
# Goozali
tg_handler_goozali = TelegramDefaultHandler(sites=[Site.GOOZALI],
locations=locations,
title_filters=title_filters,
search_term=search_term)
application.add_handler(CommandHandler(
Site.GOOZALI.value, tg_handler_goozali.handle))
# GlassDoor
tg_handler_glassdoor = TelegramDefaultHandler(sites=[Site.GLASSDOOR],
locations=locations,
title_filters=title_filters,
search_term=search_term)
application.add_handler(CommandHandler(
Site.GLASSDOOR.value, tg_handler_glassdoor.handle))
# LinkeDin
tg_handler_linkedin = TelegramDefaultHandler(sites=[Site.LINKEDIN],
locations=locations,
title_filters=title_filters,
search_term=search_term)
application.add_handler(CommandHandler(
Site.LINKEDIN.value, tg_handler_linkedin.handle))
# Indeed
tg_handler_indeed = TelegramDefaultHandler(sites=[Site.INDEED],
locations=locations,
title_filters=title_filters,
search_term=search_term)
application.add_handler(CommandHandler(
Site.INDEED.value, tg_handler_indeed.handle))
application.add_handler(CallbackQueryHandler(
tg_callback_handler.button_callback))
application.add_handler(CommandHandler('stop', stop))
application.add_handler(start_conv_handler)
# application.add_handler(CommandHandler('start', start_handler.handle))
logger.info("Run polling from telegram")
application.run_polling(allowed_updates=Update.ALL_TYPES)

View File

@ -0,0 +1,174 @@
from enum import Enum
from telegram import Update, Chat, KeyboardButton, ReplyKeyboardMarkup, ReplyKeyboardRemove
from telegram.ext import (
ContextTypes, ConversationHandler, CommandHandler, MessageHandler, filters,
)
from db.User import User
from db.position_repository import position_repository
from db.user_repository import UserRepository
from jobspy.scrapers.utils import create_logger
from telegram_bot import TelegramBot
from telegram_handler.telegram_handler import TelegramHandler
class Flow(Enum):
POSITION = 0
ADDRESS = 1
FILTERS = 2
EXPERIENCE = 3
RETRY = 4
class TelegramStartHandler(TelegramHandler):
def __init__(self):
self.telegram_bot = TelegramBot()
self.user_repository = UserRepository()
self.logger = create_logger("TelegramStartHandler")
self.positions = position_repository.find_all()
self.temp_user = None
self.last_state = None
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> Flow:
"""Starts the conversation and asks the user about their position."""
chat: Chat = update.message.chat
user = User(full_name=chat.full_name, username=chat.username, chat_id=chat.id)
self.user_repository.insert_user(user)
buttons = [[KeyboardButton(position.name)] for position in self.positions]
reply_markup = ReplyKeyboardMarkup(buttons, one_time_keyboard=True,
input_field_placeholder=Flow.POSITION.name)
await update.message.reply_text(
"Hi! My name is Professor Bot. I will hold a conversation with you. "
"Send /cancel to stop talking to me.\n\n"
"What Position are you looking for?",
reply_markup=reply_markup,
)
return Flow.POSITION
async def position(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> Flow:
"""Stores the selected position and asks for a photo."""
self.last_state = Flow.POSITION
user = update.message.from_user
self.logger.info("Position of %s: %s", user.first_name, update.message.text)
position = next((p for p in self.positions if p.name == update.message.text), None)
if not position:
await update.message.reply_text("Position not found")
buttons2 = [[KeyboardButton(position.name)] for position in self.positions]
reply_markup = ReplyKeyboardMarkup(buttons2, one_time_keyboard=True,
input_field_placeholder=Flow.POSITION.name)
await update.message.reply_text(
"What Position are you looking for?",
reply_markup=reply_markup,
)
return Flow.POSITION
await update.message.reply_text(
"I see! Please send me a photo of yourself, "
"so I know what you look like, or send /skip if you don't want to.",
reply_markup=ReplyKeyboardRemove(),
)
return Flow.ADDRESS
async def address(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Stores the photo and asks for a location."""
user = update.message.from_user
photo_file = await update.message.photo[-1].get_file()
await photo_file.download_to_drive("user_photo.jpg")
self.logger.info("Photo of %s: %s", user.first_name, "user_photo.jpg")
await update.message.reply_text(
"Gorgeous! Now, send me your location please, or send /skip if you don't want to."
)
return Flow.FILTERS.value
async def filter(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Stores the location and asks for some info about the user."""
user = update.message.from_user
user_location = update.message.location
self.logger.info(
"Location of %s: %f / %f", user.first_name, user_location.latitude, user_location.longitude
)
await update.message.reply_text(
"Maybe I can visit you sometime! At last, tell me something about yourself."
)
return Flow.EXPERIENCE.value
async def skip_filter(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Skips the location and asks for info about the user."""
user = update.message.from_user
self.logger.info("User %s did not send a location.", user.first_name)
await update.message.reply_text(
"You seem a bit paranoid! At last, tell me something about yourself."
)
return Flow.EXPERIENCE.value
async def experience(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Stores the info about the user and ends the conversation."""
user = update.message.from_user
self.logger.info("Bio of %s: %s", user.first_name, update.message.text)
await update.message.reply_text("Thank you! I hope we can talk again some day.")
return ConversationHandler.END
async def cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Cancels and ends the conversation."""
user = update.message.from_user
self.logger.info("User %s canceled the conversation.", user.first_name)
await update.message.reply_text(
"Bye! I hope we can talk again some day.", reply_markup=ReplyKeyboardRemove()
)
return ConversationHandler.END
async def handle(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
self.logger.info("start handling")
# chat: Chat = update.message.chat
# chat.id - 368620919
# chat.username - 'Qw1zeR'
# chat.full_name - 'Qw1zeR'
# user = User(full_name=chat.full_name, username=chat.username, chat_id=chat.id)
# self.user_repository.insert_user(user)
# fields = field_repository.find_all() # Get all fields from the database
# buttons = [[KeyboardButton(field.name)] for field in fields]
# reply_markup = ReplyKeyboardMarkup(buttons, one_time_keyboard=True)
#
# await update.message.reply_text("Please select your field:", reply_markup=reply_markup)
# await self.telegram_bot.set_message_reaction(
# update.message.message_id, ReactionEmoji.FIRE)
# site_names = [site.name for site in self.sites_to_scrap]
# site_names_print = ", ".join(site_names)
# await self.telegram_bot.send_text(
# f"Start scarping: {site_names_print}")
# self.logger.info(f"Found {len(jobs)} jobs")
# self.jobRepository.insert_many_if_not_found(filtered_out_jobs)
# old_jobs, new_jobs = self.jobRepository.insert_many_if_not_found(jobs)
# for newJob in new_jobs:
# await self.telegram_bot.send_job(newJob)
# self.logger.info(f"Found {len(old_jobs)} old jobs")
# await self.telegram_bot.send_text(
# f"Finished scarping: {site_names_print}")
self.logger.info("finished handling")
start_handler = TelegramStartHandler()
start_conv_handler = ConversationHandler(
entry_points=[CommandHandler("start", start_handler.start)],
states={
Flow.POSITION: [MessageHandler(filters.TEXT, start_handler.position)]
# Flow.SAVE_POSITION: [MessageHandler(filters.TEXT, start_handler.position)]
# Flow.ADDRESS: [MessageHandler(filters.PHOTO, photo), CommandHandler("skip", skip_photo)],
# Flow.FILTERS: [
# MessageHandler(filters.LOCATION, location),
# CommandHandler("skip", skip_location),
# ],
# Flow.EXPERIENCE: [MessageHandler(filters.TEXT & ~filters.COMMAND, bio)],
},
fallbacks=[CommandHandler("cancel", start_handler.cancel)],
)