From 1f2b1b3892505c0e1002d2f0d7a7d540cb9035a0 Mon Sep 17 00:00:00 2001 From: Yariv Menachem Date: Thu, 2 Jan 2025 16:19:05 +0200 Subject: [PATCH] init position state works --- src/db/Position.py | 5 + src/db/User.py | 13 ++ src/db/job_repository.py | 3 +- src/db/monogo_db.py | 2 + src/db/position_repository.py | 106 +++++++++++ src/db/user_repository.py | 99 ++++++++++ src/main.py | 59 +----- .../telegram_start_handler.py | 174 ++++++++++++++++++ 8 files changed, 409 insertions(+), 52 deletions(-) create mode 100644 src/db/Position.py create mode 100644 src/db/User.py create mode 100644 src/db/position_repository.py create mode 100644 src/db/user_repository.py create mode 100644 src/telegram_handler/telegram_start_handler.py diff --git a/src/db/Position.py b/src/db/Position.py new file mode 100644 index 0000000..a7a98a7 --- /dev/null +++ b/src/db/Position.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class Position(BaseModel): + name: str diff --git a/src/db/User.py b/src/db/User.py new file mode 100644 index 0000000..490bb52 --- /dev/null +++ b/src/db/User.py @@ -0,0 +1,13 @@ +from datetime import datetime +from typing import Optional, Union + +from pydantic import BaseModel + +from db.Position import Position + + +class User(BaseModel): + full_name: str + username: str + chat_id: Union[int,str] + field: Optional[Position] = None diff --git a/src/db/job_repository.py b/src/db/job_repository.py index 290031f..53607fd 100644 --- a/src/db/job_repository.py +++ b/src/db/job_repository.py @@ -3,9 +3,9 @@ from typing import Optional from dotenv import load_dotenv from pymongo import UpdateOne -from .monogo_db import MongoDB from jobspy import create_logger from jobspy.jobs import JobPost +from .monogo_db import mongo_client load_dotenv() @@ -21,7 +21,6 @@ class JobRepository: self = super().__new__(cls) cls._instance = self self.logger = create_logger("JobRepository") - mongo_client = MongoDB() self.collection = mongo_client.db["jobs"] return cls._instance diff --git a/src/db/monogo_db.py b/src/db/monogo_db.py index bcac938..eea7d84 100644 --- a/src/db/monogo_db.py +++ b/src/db/monogo_db.py @@ -32,3 +32,5 @@ class MongoDB: self.db = client[database_name] logger.info("Succeed connect to MongoDB") return cls._instance + +mongo_client = MongoDB() diff --git a/src/db/position_repository.py b/src/db/position_repository.py new file mode 100644 index 0000000..af068b8 --- /dev/null +++ b/src/db/position_repository.py @@ -0,0 +1,106 @@ +from typing import Optional + +from dotenv import load_dotenv +from pymongo import UpdateOne + +from jobspy import create_logger +from .Position import Position +from .monogo_db import mongo_client + +load_dotenv() + + +class PositionRepository: + _instance = None + + def __new__(cls): + + if cls._instance is not None: + return cls._instance + + self = super().__new__(cls) + cls._instance = self + self.logger = create_logger("PositionRepository") + self.collection = mongo_client.db["field"] + return cls._instance + + def find_all(self) -> list[Position]: + positions = list(self.collection.find({})) + return [Position(**position) for position in positions] + + def find_by_id(self, position_id: str) -> Optional[Position]: + """ + Finds a position document in the collection by its ID. + + Args: + position_id: The ID of the position to find. + + Returns: + The position document if found, otherwise None. + """ + result = self.collection.find_one({"id": position_id}) + return Position(**result) + + def update(self, position: Position) -> bool: + """ + Updates a Position in the database. + + Args: + position: A dictionary representing the Position data. + + Returns: + True if the update was successful, False otherwise. + """ + result = self.collection.update_one({"id": position.id}, {"$set": position.model_dump()}) + return result.modified_count > 0 + + def insert_position(self, position: Position): + """ + Inserts a new position posting into the database collection. + + Args: + position (Position): The Position object to be inserted. + + Raises: + Exception: If an error occurs during insertion. + """ + self.collection.insert_one(position.model_dump()) + self.logger.info(f"Inserted new position with name {position.name}.") + + def insert_many_if_not_found(self, positions: list[Position]) -> tuple[list[Position], list[Position]]: + """ + Perform bulk upserts for a list of Position objects into a MongoDB collection. + Only insert new positions and return the list of newly inserted positions. + """ + operations = [] + new_positions = [] # List to store the new positions inserted into MongoDB + old_positions = [] # List to store the new positions inserted into MongoDB + for position in positions: + position_dict = position.model_dump() + operations.append( + UpdateOne( + {"id": position.id}, # Match by `id` + # Only set positions if the position is being inserted (not updated) + {"$setOnInsert": position_dict}, + upsert=True # Insert if not found, but do not update if already exists + ) + ) + + if operations: + # Execute all operations in bulk + result = self.collection.bulk_write(operations) + self.logger.info(f"Matched: {result.matched_count}, Upserts: { + result.upserted_count}, Modified: {result.modified_count}") + + # Get the newly inserted positions (those that were upserted) + # The `upserted_count` corresponds to how many new documents were inserted + for i, position in enumerate(positions): + if result.upserted_count > 0 and i < result.upserted_count: + new_positions.append(position) + else: + old_positions.append(position) + + return old_positions, new_positions + + +position_repository = PositionRepository() diff --git a/src/db/user_repository.py b/src/db/user_repository.py new file mode 100644 index 0000000..70c592a --- /dev/null +++ b/src/db/user_repository.py @@ -0,0 +1,99 @@ +from typing import Optional + +from dotenv import load_dotenv +from pymongo import UpdateOne + +from jobspy import create_logger +from .User import User +from .monogo_db import mongo_client + +load_dotenv() + + +class UserRepository: + _instance = None + + def __new__(cls): + + if cls._instance is not None: + return cls._instance + + self = super().__new__(cls) + cls._instance = self + self.logger = create_logger("UserRepository") + self.collection = mongo_client.db["user"] + return cls._instance + + def find_by_id(self, user_id: str) -> Optional[User]: + """ + Finds a user document in the collection by its ID. + + Args: + user_id: The ID of the user to find. + + Returns: + The user document if found, otherwise None. + """ + result = self.collection.find_one({"id": user_id}) + return User(**result) + + def update(self, user: User) -> bool: + """ + Updates a User in the database. + + Args: + user: A dictionary representing the User data. + + Returns: + True if the update was successful, False otherwise. + """ + result = self.collection.update_one({"id": user.id}, {"$set": user.model_dump()}) + return result.modified_count > 0 + + def insert_user(self, user: User): + """ + Inserts a new user posting into the database collection. + + Args: + user (User): The User object to be inserted. + + Raises: + Exception: If an error occurs during insertion. + """ + self.collection.insert_one(user.model_dump()) + self.logger.info(f"Inserted new user with username {user.username}.") + + def insert_many_if_not_found(self, users: list[User]) -> tuple[list[User], list[User]]: + """ + Perform bulk upserts for a list of User objects into a MongoDB collection. + Only insert new users and return the list of newly inserted users. + """ + operations = [] + new_users = [] # List to store the new users inserted into MongoDB + old_users = [] # List to store the new users inserted into MongoDB + for user in users: + user_dict = user.model_dump() + operations.append( + UpdateOne( + {"id": user.id}, # Match by `id` + # Only set fields if the user is being inserted (not updated) + {"$setOnInsert": user_dict}, + upsert=True # Insert if not found, but do not update if already exists + ) + ) + + if operations: + # Execute all operations in bulk + result = self.collection.bulk_write(operations) + self.logger.info(f"Matched: {result.matched_count}, Upserts: { + result.upserted_count}, Modified: {result.modified_count}") + + # Get the newly inserted users (those that were upserted) + # The `upserted_count` corresponds to how many new documents were inserted + for i, user in enumerate(users): + if result.upserted_count > 0 and i < result.upserted_count: + new_users.append(user) + else: + old_users.append(user) + + return old_users, new_users diff --git a/src/main.py b/src/main.py index 200c277..c52ef24 100644 --- a/src/main.py +++ b/src/main.py @@ -1,67 +1,26 @@ -import os +from telegram import Update, ReplyKeyboardMarkup, ReplyKeyboardRemove +from telegram.ext import Application, CommandHandler, ConversationHandler, \ + MessageHandler, filters, ContextTypes -from telegram import Update -from telegram.ext import Application, CommandHandler, CallbackQueryHandler, Updater - -from jobspy.scrapers.site import Site +from config.settings import settings from jobspy.scrapers.utils import create_logger -from telegram_handler import TelegramDefaultHandler -from telegram_handler.button_callback.telegram_callback_handler import TelegramCallHandler +from telegram_handler.telegram_start_handler import start_conv_handler logger = create_logger("Main") -_api_token = os.getenv("TELEGRAM_API_TOKEN") +_api_token = settings.telegram_api_token application = Application.builder().token(_api_token).build() title_filters: list[str] = ["test", "qa", "Lead", "Full-Stack", "Full Stack", "Fullstack", "Frontend", "Front-end", "Front End", "DevOps", "Physical", "Staff", "automation", "BI ", "Principal", "Architect", "Android", "Machine Learning", "Student", "Data Engineer", "DevSecOps"] - -async def stop(update, context): - logger.info("Stop polling from telegram") - application.stop_running() - if __name__ == "__main__": logger.info("Starting initialize ") search_term = "software engineer" locations = ["Tel Aviv, Israel", "Ramat Gan, Israel", "Central, Israel", "Rehovot ,Israel"] - tg_callback_handler = TelegramCallHandler() - tg_handler_all = TelegramDefaultHandler(sites=[Site.LINKEDIN, Site.GLASSDOOR, Site.INDEED, Site.GOOZALI], - locations=locations, - title_filters=title_filters, - search_term=search_term) - application.add_handler(CommandHandler("find", tg_handler_all.handle)) - # Goozali - tg_handler_goozali = TelegramDefaultHandler(sites=[Site.GOOZALI], - locations=locations, - title_filters=title_filters, - search_term=search_term) - application.add_handler(CommandHandler( - Site.GOOZALI.value, tg_handler_goozali.handle)) - # GlassDoor - tg_handler_glassdoor = TelegramDefaultHandler(sites=[Site.GLASSDOOR], - locations=locations, - title_filters=title_filters, - search_term=search_term) - application.add_handler(CommandHandler( - Site.GLASSDOOR.value, tg_handler_glassdoor.handle)) - # LinkeDin - tg_handler_linkedin = TelegramDefaultHandler(sites=[Site.LINKEDIN], - locations=locations, - title_filters=title_filters, - search_term=search_term) - application.add_handler(CommandHandler( - Site.LINKEDIN.value, tg_handler_linkedin.handle)) - # Indeed - tg_handler_indeed = TelegramDefaultHandler(sites=[Site.INDEED], - locations=locations, - title_filters=title_filters, - search_term=search_term) - application.add_handler(CommandHandler( - Site.INDEED.value, tg_handler_indeed.handle)) - application.add_handler(CallbackQueryHandler( - tg_callback_handler.button_callback)) - application.add_handler(CommandHandler('stop', stop)) + application.add_handler(start_conv_handler) + + # application.add_handler(CommandHandler('start', start_handler.handle)) logger.info("Run polling from telegram") application.run_polling(allowed_updates=Update.ALL_TYPES) diff --git a/src/telegram_handler/telegram_start_handler.py b/src/telegram_handler/telegram_start_handler.py new file mode 100644 index 0000000..67f3de6 --- /dev/null +++ b/src/telegram_handler/telegram_start_handler.py @@ -0,0 +1,174 @@ +from enum import Enum + +from telegram import Update, Chat, KeyboardButton, ReplyKeyboardMarkup, ReplyKeyboardRemove +from telegram.ext import ( + ContextTypes, ConversationHandler, CommandHandler, MessageHandler, filters, +) + +from db.User import User +from db.position_repository import position_repository +from db.user_repository import UserRepository +from jobspy.scrapers.utils import create_logger +from telegram_bot import TelegramBot +from telegram_handler.telegram_handler import TelegramHandler + + +class Flow(Enum): + POSITION = 0 + ADDRESS = 1 + FILTERS = 2 + EXPERIENCE = 3 + RETRY = 4 + + +class TelegramStartHandler(TelegramHandler): + + def __init__(self): + self.telegram_bot = TelegramBot() + self.user_repository = UserRepository() + self.logger = create_logger("TelegramStartHandler") + self.positions = position_repository.find_all() + self.temp_user = None + self.last_state = None + + async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> Flow: + """Starts the conversation and asks the user about their position.""" + chat: Chat = update.message.chat + user = User(full_name=chat.full_name, username=chat.username, chat_id=chat.id) + self.user_repository.insert_user(user) + + buttons = [[KeyboardButton(position.name)] for position in self.positions] + reply_markup = ReplyKeyboardMarkup(buttons, one_time_keyboard=True, + input_field_placeholder=Flow.POSITION.name) + await update.message.reply_text( + "Hi! My name is Professor Bot. I will hold a conversation with you. " + "Send /cancel to stop talking to me.\n\n" + "What Position are you looking for?", + reply_markup=reply_markup, + ) + + return Flow.POSITION + + async def position(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> Flow: + """Stores the selected position and asks for a photo.""" + self.last_state = Flow.POSITION + user = update.message.from_user + self.logger.info("Position of %s: %s", user.first_name, update.message.text) + position = next((p for p in self.positions if p.name == update.message.text), None) + if not position: + await update.message.reply_text("Position not found") + buttons2 = [[KeyboardButton(position.name)] for position in self.positions] + reply_markup = ReplyKeyboardMarkup(buttons2, one_time_keyboard=True, + input_field_placeholder=Flow.POSITION.name) + await update.message.reply_text( + "What Position are you looking for?", + reply_markup=reply_markup, + ) + return Flow.POSITION + + await update.message.reply_text( + "I see! Please send me a photo of yourself, " + "so I know what you look like, or send /skip if you don't want to.", + reply_markup=ReplyKeyboardRemove(), + ) + + return Flow.ADDRESS + + async def address(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Stores the photo and asks for a location.""" + user = update.message.from_user + photo_file = await update.message.photo[-1].get_file() + await photo_file.download_to_drive("user_photo.jpg") + self.logger.info("Photo of %s: %s", user.first_name, "user_photo.jpg") + await update.message.reply_text( + "Gorgeous! Now, send me your location please, or send /skip if you don't want to." + ) + + return Flow.FILTERS.value + + async def filter(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Stores the location and asks for some info about the user.""" + user = update.message.from_user + user_location = update.message.location + self.logger.info( + "Location of %s: %f / %f", user.first_name, user_location.latitude, user_location.longitude + ) + await update.message.reply_text( + "Maybe I can visit you sometime! At last, tell me something about yourself." + ) + + return Flow.EXPERIENCE.value + + async def skip_filter(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Skips the location and asks for info about the user.""" + user = update.message.from_user + self.logger.info("User %s did not send a location.", user.first_name) + await update.message.reply_text( + "You seem a bit paranoid! At last, tell me something about yourself." + ) + + return Flow.EXPERIENCE.value + + async def experience(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Stores the info about the user and ends the conversation.""" + user = update.message.from_user + self.logger.info("Bio of %s: %s", user.first_name, update.message.text) + await update.message.reply_text("Thank you! I hope we can talk again some day.") + + return ConversationHandler.END + + async def cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Cancels and ends the conversation.""" + user = update.message.from_user + self.logger.info("User %s canceled the conversation.", user.first_name) + await update.message.reply_text( + "Bye! I hope we can talk again some day.", reply_markup=ReplyKeyboardRemove() + ) + + return ConversationHandler.END + + async def handle(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + self.logger.info("start handling") + # chat: Chat = update.message.chat + # chat.id - 368620919 + # chat.username - 'Qw1zeR' + # chat.full_name - 'Qw1zeR' + # user = User(full_name=chat.full_name, username=chat.username, chat_id=chat.id) + # self.user_repository.insert_user(user) + # fields = field_repository.find_all() # Get all fields from the database + # buttons = [[KeyboardButton(field.name)] for field in fields] + # reply_markup = ReplyKeyboardMarkup(buttons, one_time_keyboard=True) + # + # await update.message.reply_text("Please select your field:", reply_markup=reply_markup) + # await self.telegram_bot.set_message_reaction( + # update.message.message_id, ReactionEmoji.FIRE) + # site_names = [site.name for site in self.sites_to_scrap] + # site_names_print = ", ".join(site_names) + # await self.telegram_bot.send_text( + # f"Start scarping: {site_names_print}") + # self.logger.info(f"Found {len(jobs)} jobs") + # self.jobRepository.insert_many_if_not_found(filtered_out_jobs) + # old_jobs, new_jobs = self.jobRepository.insert_many_if_not_found(jobs) + # for newJob in new_jobs: + # await self.telegram_bot.send_job(newJob) + # self.logger.info(f"Found {len(old_jobs)} old jobs") + # await self.telegram_bot.send_text( + # f"Finished scarping: {site_names_print}") + self.logger.info("finished handling") + + +start_handler = TelegramStartHandler() +start_conv_handler = ConversationHandler( + entry_points=[CommandHandler("start", start_handler.start)], + states={ + Flow.POSITION: [MessageHandler(filters.TEXT, start_handler.position)] + # Flow.SAVE_POSITION: [MessageHandler(filters.TEXT, start_handler.position)] + # Flow.ADDRESS: [MessageHandler(filters.PHOTO, photo), CommandHandler("skip", skip_photo)], + # Flow.FILTERS: [ + # MessageHandler(filters.LOCATION, location), + # CommandHandler("skip", skip_location), + # ], + # Flow.EXPERIENCE: [MessageHandler(filters.TEXT & ~filters.COMMAND, bio)], + }, + fallbacks=[CommandHandler("cancel", start_handler.cancel)], +)