#!/usr/bin/env python3 # # ntfy-bell - run a command, forwarding OSC 9 notifications and BEL to ntfy # # Usage: # ntfy-bell [args...] import fcntl import os import pty import select import shlex import shutil import signal import sys import termios import time import tty import urllib.request from typing import Optional BEL = 0x07 OSC9_PREFIX = b"\x1b]9;" # ESC ] 9 ; ST = b"\x1b\\" # ESC \\ (string terminator) def notify_ntfy(topic: str, title: str, message: str) -> None: url = f"https://ntfy.sh/{topic}" data = message.encode("utf-8", errors="replace") req = urllib.request.Request(url, data=data, method="POST") req.add_header("Title", title) with urllib.request.urlopen(req, timeout=5): pass def _extract_osc9_from_buffer(buf: bytearray) -> Optional[str]: start = buf.find(OSC9_PREFIX) if start == -1: if len(buf) > 65536: del buf[:-4096] return None if start > 0: del buf[:start] payload_start = len(OSC9_PREFIX) bel_pos = buf.find(bytes([BEL]), payload_start) st_pos = buf.find(ST, payload_start) end_pos: Optional[int] = None term_len = 0 if bel_pos != -1 and (st_pos == -1 or bel_pos < st_pos): end_pos = bel_pos term_len = 1 elif st_pos != -1: end_pos = st_pos term_len = len(ST) if end_pos is None: if len(buf) > 65536: del buf[:-4096] return None payload = bytes(buf[payload_start:end_pos]) del buf[: end_pos + term_len] return payload.decode("utf-8", errors="replace").strip("\r\n") def _build_child_argv(cmd: list[str]) -> list[str]: cmd0 = cmd[0] if "/" in cmd0: return cmd if shutil.which(cmd0): return cmd shell = os.environ.get("SHELL") or "/bin/sh" shell_name = os.path.basename(shell) joined = shlex.join(cmd) if shell_name == "zsh": shell_cmd = ( 'source "${ZDOTDIR:-$HOME}/.zshrc" >/dev/null 2>&1; ' f"eval {shlex.quote(joined)}" ) return [shell, "-c", shell_cmd] return [shell, "-ic", joined] def _copy_winsize(src_fd: int, dst_fd: int) -> None: try: winsize = fcntl.ioctl(src_fd, termios.TIOCGWINSZ, b"\0" * 8) fcntl.ioctl(dst_fd, termios.TIOCSWINSZ, winsize) except OSError: pass def main() -> int: topic = os.environ.get("BELL2NTFY_TOPIC") if not topic: print("Error: environment variable BELL2NTFY_TOPIC not set", file=sys.stderr) return 2 if len(sys.argv) < 2: print(f"usage: {sys.argv[0]} [args...]", file=sys.stderr) return 2 cmd = sys.argv[1:] child_argv = _build_child_argv(cmd) pid, master_fd = pty.fork() if pid == 0: try: os.execvp(child_argv[0], child_argv) except FileNotFoundError: print(f"{cmd[0]}: command not found", file=sys.stderr) os._exit(127) except PermissionError: print(f"{cmd[0]}: permission denied", file=sys.stderr) os._exit(126) stdin_fd = None stdin_is_tty = sys.stdin.isatty() old_tty_attrs = None old_winch_handler = None try: stdin_fd = sys.stdin.fileno() except OSError: stdin_fd = None if stdin_fd is not None and stdin_is_tty: try: old_tty_attrs = termios.tcgetattr(stdin_fd) tty.setraw(stdin_fd) except termios.error: old_tty_attrs = None _copy_winsize(stdin_fd, master_fd) def _on_winch(_signum: int, _frame: object) -> None: _copy_winsize(stdin_fd, master_fd) try: os.kill(pid, signal.SIGWINCH) except OSError: pass old_winch_handler = signal.getsignal(signal.SIGWINCH) signal.signal(signal.SIGWINCH, _on_winch) osc_last_sent = 0.0 bel_last_sent = 0.0 cooldown_s = 1.5 parse_buf = bytearray() stdin_eof_sent = False try: while True: read_fds = [master_fd] if stdin_fd is not None: read_fds.append(stdin_fd) try: rlist, _, _ = select.select(read_fds, [], []) except InterruptedError: continue if stdin_fd is not None and stdin_fd in rlist: try: user_data = os.read(stdin_fd, 4096) except OSError: user_data = b"" if user_data: try: os.write(master_fd, user_data) except OSError: break else: if not stdin_is_tty and not stdin_eof_sent: try: os.write(master_fd, b"\x04") except OSError: pass stdin_eof_sent = True stdin_fd = None if master_fd in rlist: try: data = os.read(master_fd, 4096) except OSError: break if not data: break try: os.write(sys.stdout.fileno(), data) except OSError: break parse_buf.extend(data) while True: msg = _extract_osc9_from_buffer(parse_buf) if msg is None: break now = time.time() if now - osc_last_sent >= cooldown_s: osc_last_sent = now try: notify_ntfy( topic=topic, title="Terminal notification", message=msg if msg else f"OSC 9 notification from: {' '.join(cmd)}", ) except Exception: pass if bytes([BEL]) in data and parse_buf.find(OSC9_PREFIX) == -1: now = time.time() if now - bel_last_sent >= cooldown_s: bel_last_sent = now try: notify_ntfy( topic=topic, title="Terminal bell", message=f"BEL from: {' '.join(cmd)}", ) except Exception: pass finally: if stdin_fd is not None and stdin_is_tty and old_tty_attrs is not None: try: termios.tcsetattr(stdin_fd, termios.TCSADRAIN, old_tty_attrs) except termios.error: pass if old_winch_handler is not None: signal.signal(signal.SIGWINCH, old_winch_handler) try: os.close(master_fd) except OSError: pass _, status = os.waitpid(pid, 0) if os.WIFEXITED(status): return os.WEXITSTATUS(status) if os.WIFSIGNALED(status): return 128 + os.WTERMSIG(status) return 1 if __name__ == "__main__": raise SystemExit(main())