From 1021f0447f34de3e1ed61b241ac66d5b1fc1e0c9 Mon Sep 17 00:00:00 2001 From: turtlebasket <32886427+turtlebasket@users.noreply.github.com> Date: Wed, 4 Mar 2026 14:48:39 -0800 Subject: [PATCH] ntfy bell interceptor --- scripts/ntfy-bell | 267 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100755 scripts/ntfy-bell diff --git a/scripts/ntfy-bell b/scripts/ntfy-bell new file mode 100755 index 0000000..17af7e3 --- /dev/null +++ b/scripts/ntfy-bell @@ -0,0 +1,267 @@ +#!/usr/bin/env python3 +# +# ntfy-bell - run a command, forwarding OSC 9 notifications and BEL to ntfy +# +# Usage: +# ntfy-bell [args...] + +import fcntl +import os +import pty +import select +import shlex +import shutil +import signal +import sys +import termios +import time +import tty +import urllib.request +from typing import Optional + +BEL = 0x07 +OSC9_PREFIX = b"\x1b]9;" # ESC ] 9 ; +ST = b"\x1b\\" # ESC \\ (string terminator) + + +def notify_ntfy(topic: str, title: str, message: str) -> None: + url = f"https://ntfy.sh/{topic}" + data = message.encode("utf-8", errors="replace") + + req = urllib.request.Request(url, data=data, method="POST") + req.add_header("Title", title) + + with urllib.request.urlopen(req, timeout=5): + pass + + +def _extract_osc9_from_buffer(buf: bytearray) -> Optional[str]: + start = buf.find(OSC9_PREFIX) + if start == -1: + if len(buf) > 65536: + del buf[:-4096] + return None + + if start > 0: + del buf[:start] + + payload_start = len(OSC9_PREFIX) + bel_pos = buf.find(bytes([BEL]), payload_start) + st_pos = buf.find(ST, payload_start) + + end_pos: Optional[int] = None + term_len = 0 + + if bel_pos != -1 and (st_pos == -1 or bel_pos < st_pos): + end_pos = bel_pos + term_len = 1 + elif st_pos != -1: + end_pos = st_pos + term_len = len(ST) + + if end_pos is None: + if len(buf) > 65536: + del buf[:-4096] + return None + + payload = bytes(buf[payload_start:end_pos]) + del buf[: end_pos + term_len] + + return payload.decode("utf-8", errors="replace").strip("\r\n") + + +def _build_child_argv(cmd: list[str]) -> list[str]: + cmd0 = cmd[0] + + if "/" in cmd0: + return cmd + + if shutil.which(cmd0): + return cmd + + shell = os.environ.get("SHELL") or "/bin/sh" + shell_name = os.path.basename(shell) + joined = shlex.join(cmd) + + if shell_name == "zsh": + shell_cmd = ( + 'source "${ZDOTDIR:-$HOME}/.zshrc" >/dev/null 2>&1; ' + f"eval {shlex.quote(joined)}" + ) + return [shell, "-c", shell_cmd] + + return [shell, "-ic", joined] + + +def _copy_winsize(src_fd: int, dst_fd: int) -> None: + try: + winsize = fcntl.ioctl(src_fd, termios.TIOCGWINSZ, b"\0" * 8) + fcntl.ioctl(dst_fd, termios.TIOCSWINSZ, winsize) + except OSError: + pass + + +def main() -> int: + topic = os.environ.get("BELL2NTFY_TOPIC") + if not topic: + print("Error: environment variable BELL2NTFY_TOPIC not set", file=sys.stderr) + return 2 + + if len(sys.argv) < 2: + print(f"usage: {sys.argv[0]} [args...]", file=sys.stderr) + return 2 + + cmd = sys.argv[1:] + child_argv = _build_child_argv(cmd) + + pid, master_fd = pty.fork() + if pid == 0: + try: + os.execvp(child_argv[0], child_argv) + except FileNotFoundError: + print(f"{cmd[0]}: command not found", file=sys.stderr) + os._exit(127) + except PermissionError: + print(f"{cmd[0]}: permission denied", file=sys.stderr) + os._exit(126) + + stdin_fd = None + stdin_is_tty = sys.stdin.isatty() + old_tty_attrs = None + old_winch_handler = None + + try: + stdin_fd = sys.stdin.fileno() + except OSError: + stdin_fd = None + + if stdin_fd is not None and stdin_is_tty: + try: + old_tty_attrs = termios.tcgetattr(stdin_fd) + tty.setraw(stdin_fd) + except termios.error: + old_tty_attrs = None + + _copy_winsize(stdin_fd, master_fd) + + def _on_winch(_signum: int, _frame: object) -> None: + _copy_winsize(stdin_fd, master_fd) + try: + os.kill(pid, signal.SIGWINCH) + except OSError: + pass + + old_winch_handler = signal.getsignal(signal.SIGWINCH) + signal.signal(signal.SIGWINCH, _on_winch) + + osc_last_sent = 0.0 + bel_last_sent = 0.0 + cooldown_s = 1.5 + parse_buf = bytearray() + stdin_eof_sent = False + + try: + while True: + read_fds = [master_fd] + if stdin_fd is not None: + read_fds.append(stdin_fd) + + try: + rlist, _, _ = select.select(read_fds, [], []) + except InterruptedError: + continue + + if stdin_fd is not None and stdin_fd in rlist: + try: + user_data = os.read(stdin_fd, 4096) + except OSError: + user_data = b"" + + if user_data: + try: + os.write(master_fd, user_data) + except OSError: + break + else: + if not stdin_is_tty and not stdin_eof_sent: + try: + os.write(master_fd, b"\x04") + except OSError: + pass + stdin_eof_sent = True + stdin_fd = None + + if master_fd in rlist: + try: + data = os.read(master_fd, 4096) + except OSError: + break + + if not data: + break + + try: + os.write(sys.stdout.fileno(), data) + except OSError: + break + + parse_buf.extend(data) + + while True: + msg = _extract_osc9_from_buffer(parse_buf) + if msg is None: + break + + now = time.time() + if now - osc_last_sent >= cooldown_s: + osc_last_sent = now + try: + notify_ntfy( + topic=topic, + title="Terminal notification", + message=msg if msg else f"OSC 9 notification from: {' '.join(cmd)}", + ) + except Exception: + pass + + if bytes([BEL]) in data and parse_buf.find(OSC9_PREFIX) == -1: + now = time.time() + if now - bel_last_sent >= cooldown_s: + bel_last_sent = now + try: + notify_ntfy( + topic=topic, + title="Terminal bell", + message=f"BEL from: {' '.join(cmd)}", + ) + except Exception: + pass + + finally: + if stdin_fd is not None and stdin_is_tty and old_tty_attrs is not None: + try: + termios.tcsetattr(stdin_fd, termios.TCSADRAIN, old_tty_attrs) + except termios.error: + pass + + if old_winch_handler is not None: + signal.signal(signal.SIGWINCH, old_winch_handler) + + try: + os.close(master_fd) + except OSError: + pass + + _, status = os.waitpid(pid, 0) + + if os.WIFEXITED(status): + return os.WEXITSTATUS(status) + + if os.WIFSIGNALED(status): + return 128 + os.WTERMSIG(status) + + return 1 + + +if __name__ == "__main__": + raise SystemExit(main())