#!/usr/bin/env python3 ''' youdis v1.1 bot for downloading youtube videos using yt-dlp discord-py-interactions 5.11.0 has new option requires python>=3.9 ''' # match_filter: info_dict -> Raise utils.DownloadCancelled(msg) ? interrupt import interactions from os import getenv from pathlib import Path import yt_dlp import json import asyncio import threading userFile = Path('/config/users.json') userFile.parent.mkdir(exist_ok=True, parents=True) bot = interactions.Client(intents=interactions.Intents.DEFAULT,default_scope=2147491904) def save_authorized_users(authorized_users): with open(userFile, 'w') as f: json.dump({'authorized_users': authorized_users}, f) def load_authorized_users(): if not userFile.exists(): save_authorized_users([]) print(f'users.json not found; saving to {userFile}') return [] try: with open(userFile, 'r') as f: data = json.load(f) except (json.JSONDecodeError, OSError): save_authorized_users([]) print(f'users.json invalid; resetting {userFile}') return [] authorized_users = data.get('authorized_users', []) if not isinstance(authorized_users, list): authorized_users = [] authorized_users = [str(user_id) for user_id in authorized_users] save_authorized_users(authorized_users) print(f'authorized_users:{authorized_users}') return authorized_users authorized_users = load_authorized_users() active_job_lock = threading.Lock() active_job = None async def send_message(ctx, message): await ctx.author.send(message) def claim_active_job(job): global active_job with active_job_lock: if active_job is not None: return active_job active_job = job return None def get_active_job(): with active_job_lock: return active_job def clear_active_job(job): global active_job with active_job_lock: if active_job is job: active_job = None def download_video(url, options): with yt_dlp.YoutubeDL(options) as ydl: ydl.download(url) def create_hook(ctx, loop, cancel_event): seen_updates = set() def hook(d): if cancel_event.is_set(): raise yt_dlp.utils.DownloadCancelled('download canceled by /interrupt') status = d.get('status') info = d.get('info_dict') or {} if status not in {'downloading', 'finished'}: return filename = d.get('filename') or info.get('_filename') or info.get('title') update_key = (status, filename) if update_key in seen_updates: return seen_updates.add(update_key) playlist_index = info.get('playlist_index') playlist_count = info.get('playlist_count') url = info.get('webpage_url') prefix = status if playlist_index and playlist_count: prefix = f'{status} {playlist_index} of {playlist_count}' msg = f'{prefix}: {filename}' if url: msg = f'{msg} <{url}>' asyncio.run_coroutine_threadsafe(send_message(ctx, msg), loop) return hook @interactions.slash_command(name="youtube",description="download video from youtube to server") @interactions.slash_option( name='url', opt_type=interactions.OptionType.STRING, required=True, description='url target' ) async def youtube(ctx: interactions.SlashContext, url:str): print(f'{ctx.author.id} requested {url}') # check that user is authorized if str(ctx.author.id) not in authorized_users: if ctx.author.id == 127831327012683776: await ctx.author.send('potato stop') await ctx.author.send('you are not authorized to use this command. message my owner to be added.') return loop = asyncio.get_running_loop() cancel_event = threading.Event() hook = create_hook(ctx, loop, cancel_event) job = { 'requester_id': str(ctx.author.id), 'request_url': url, 'cancel_event': cancel_event, } existing_job = claim_active_job(job) if existing_job: await ctx.author.send( f'already downloading for <@{existing_job["requester_id"]}>. ' 'single-job mode is enabled right now; try again after it finishes.' ) return # use api_to_cli and paste cli options to get the output you need yoptions = { 'format':'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', 'fragment_tries': 10, 'restrictfilenames':True, 'paths': {'home':'/downloads'}, 'retries':10, 'writeinfojson':False, 'allow_playlist_files':True, 'noplaylist':True, 'download_archive':'/config/archive.txt', 'progress_hooks':[hook], 'outtmpl': '%(uploader)s/%(playlist_title)s/%(playlist_index)s%(playlist_index& - )s%(title)s.%(ext)s', 'outtmpl_na_placeholder':'', } await ctx.channel.send(f'Downloading from <{url}>. Status updates via DM. Single-job mode is enabled.') try: await asyncio.to_thread(download_video, url, yoptions) except yt_dlp.utils.DownloadCancelled as exc: print(f'download canceled: {exc}') await ctx.author.send(f'download canceled: {exc}') except yt_dlp.utils.DownloadError as exc: print(f'download failed: {exc}') await ctx.author.send(f'download failed: {exc}') except Exception as exc: print(f'unexpected download failure: {exc}') await ctx.author.send(f'unexpected download failure: {exc}') else: await ctx.author.send(f'download complete for <{url}>') finally: clear_active_job(job) @interactions.slash_command(name="interrupt",description="cancel current job") @interactions.check(interactions.is_owner()) async def _interrupt(ctx): job = get_active_job() if not job: await ctx.author.send('no active download to interrupt') return job['cancel_event'].set() print(f'interrupt requested for {job["request_url"]}') await ctx.author.send( f'interrupt requested for <{job["request_url"]}>; ' 'cancellation is coarse and will stop on the next yt-dlp progress update' ) @interactions.slash_command(name="adduser",description="authorize target user") @interactions.slash_option( name="user", opt_type=interactions.OptionType.USER, required=True, description='enable this bot for target user', ) @interactions.check(interactions.is_owner()) async def _adduser(ctx: interactions.SlashContext, user:interactions.OptionType.USER): user_id = str(user.id) if user_id not in authorized_users: authorized_users.append(user_id) save_authorized_users(authorized_users) print(f'authorized {user_id}') await ctx.author.send(f'authorized {user.mention}') else: await ctx.author.send(f'{user.mention} is already authorized') @interactions.slash_command(name="removeuser",description="deauthorize target user") @interactions.slash_option( name="user", opt_type=interactions.OptionType.USER, required=True, description='disable this bot for target user', ) @interactions.check(interactions.is_owner()) async def _removeuser(ctx: interactions.SlashContext, user:interactions.OptionType.USER): user_id = str(user.id) if user_id in authorized_users: authorized_users.remove(user_id) save_authorized_users(authorized_users) print(f'deauthorized {user_id}') await ctx.author.send(f'deauthorized {user.mention}') else: await ctx.author.send(f'{user.mention} is not currently authorized') api_token = getenv('api_token') if not api_token: raise ValueError('API token not set. Retrieve from your Discord bot.') bot.start(api_token)