enforce single-job blocking

This commit is contained in:
2026-03-31 13:45:17 -04:00
parent 033d9dd167
commit 667b06fe4a
3 changed files with 278 additions and 76 deletions

209
youdis.py
View File

@@ -48,32 +48,69 @@ def load_authorized_users():
return authorized_users
authorized_users = load_authorized_users()
title = ''
async def send_message(ctx, message):
await ctx.author.send(message)
def download_video(url, options):
with yt_dlp.YoutubeDL(options) as ydl:
ydl.download(url)
def create_hook(ctx,loop):
def hook(d):
global title
status = d.get('status')
if status == 'error':
msg = f'error; video probably already exists, have you checked archive.txt'
asyncio.run_coroutine_threadsafe(send_message(ctx,msg),loop)
elif d.get('info_dict').get('title') != title:
title = d.get('info_dict').get('title')
playlist_index = d.get('info_dict').get('playlist_index')
playlist_count = d.get('info_dict').get('playlist_count')
filename = d.get('filename')
url = d.get('info_dict').get('webpage_url')
msg = f'{status} {playlist_index} of {playlist_count}: {filename} <{url}>'
asyncio.run_coroutine_threadsafe(send_message(ctx,msg),loop)
return hook
active_job_lock = threading.Lock()
active_job = None
async def send_message(ctx, message):
await ctx.author.send(message)
def claim_active_job(job):
global active_job
with active_job_lock:
if active_job is not None:
return active_job
active_job = job
return None
def get_active_job():
with active_job_lock:
return active_job
def clear_active_job(job):
global active_job
with active_job_lock:
if active_job is job:
active_job = None
def download_video(url, options):
with yt_dlp.YoutubeDL(options) as ydl:
ydl.download(url)
def create_hook(ctx, loop, cancel_event):
seen_updates = set()
def hook(d):
if cancel_event.is_set():
raise yt_dlp.utils.DownloadCancelled('download canceled by /interrupt')
status = d.get('status')
info = d.get('info_dict') or {}
if status not in {'downloading', 'finished'}:
return
filename = d.get('filename') or info.get('_filename') or info.get('title')
update_key = (status, filename)
if update_key in seen_updates:
return
seen_updates.add(update_key)
playlist_index = info.get('playlist_index')
playlist_count = info.get('playlist_count')
url = info.get('webpage_url')
prefix = status
if playlist_index and playlist_count:
prefix = f'{status} {playlist_index} of {playlist_count}'
msg = f'{prefix}: {filename}'
if url:
msg = f'{msg} <{url}>'
asyncio.run_coroutine_threadsafe(send_message(ctx, msg), loop)
return hook
@interactions.slash_command(name="youtube",description="download video from youtube to server")
@interactions.slash_option(
@@ -84,52 +121,77 @@ def create_hook(ctx,loop):
)
async def youtube(ctx: interactions.SlashContext, url:str):
print(f'{ctx.author.id} requested {url}')
loop = asyncio.get_running_loop()
hook = create_hook(ctx,loop)
# use api_to_cli and paste cli options to get the output you need
yoptions = {
'format':'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'fragment_tries': 10,
'restrictfilenames':True,
'paths': {'home':'/downloads'},
'retries':10,
'writeinfojson':False,
'allow_playlist_files':True,
'noplaylist':True,
'download_archive':'/config/archive.txt',
'progress_hooks':[hook],
'outtmpl': '%(uploader)s/%(playlist_title)s/%(playlist_index)s%(playlist_index& - )s%(title)s.%(ext)s',
'outtmpl_na_placeholder':'',
}
# check that user is authorized
if str(ctx.author.id) not in authorized_users:
if ctx.author.id == 127831327012683776:
await ctx.author.send('potato stop')
await ctx.author.send('you are not authorized to use this command. message my owner to be added.')
return
else:
await ctx.channel.send(f'Downloading from <{url}>. Status updates via DM.')
#await ctx.defer() #if you need up to 15m to respond
# 1/2 - download in separate thread, else progress_hook blocks downstream async ctx.send
download_thread = threading.Thread(target=download_video, args=(url,yoptions))
download_thread.start()
await asyncio.to_thread(download_thread.join)
# 2/2 - replace the above with this next try:
#try:
# await asyncio.to_thread(download_video, url, yoptions)
#except Exception as e:
# print(f"download failed: {e}")
# await ctx.author.send(f"download failed: {str(e)}")
@interactions.slash_command(name="interrupt",description="cancel current job")
@interactions.check(interactions.is_owner())
async def _interrupt(ctx):
# interrupt here
print('interrupting current job - not implemented')
await ctx.author.send('interrupting current job - not implemented')
return
loop = asyncio.get_running_loop()
cancel_event = threading.Event()
hook = create_hook(ctx, loop, cancel_event)
job = {
'requester_id': str(ctx.author.id),
'request_url': url,
'cancel_event': cancel_event,
}
existing_job = claim_active_job(job)
if existing_job:
await ctx.author.send(
f'already downloading for <@{existing_job["requester_id"]}>. '
'single-job mode is enabled right now; try again after it finishes.'
)
return
# use api_to_cli and paste cli options to get the output you need
yoptions = {
'format':'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'fragment_tries': 10,
'restrictfilenames':True,
'paths': {'home':'/downloads'},
'retries':10,
'writeinfojson':False,
'allow_playlist_files':True,
'noplaylist':True,
'download_archive':'/config/archive.txt',
'progress_hooks':[hook],
'outtmpl': '%(uploader)s/%(playlist_title)s/%(playlist_index)s%(playlist_index& - )s%(title)s.%(ext)s',
'outtmpl_na_placeholder':'',
}
await ctx.channel.send(f'Downloading from <{url}>. Status updates via DM. Single-job mode is enabled.')
try:
await asyncio.to_thread(download_video, url, yoptions)
except yt_dlp.utils.DownloadCancelled as exc:
print(f'download canceled: {exc}')
await ctx.author.send(f'download canceled: {exc}')
except yt_dlp.utils.DownloadError as exc:
print(f'download failed: {exc}')
await ctx.author.send(f'download failed: {exc}')
except Exception as exc:
print(f'unexpected download failure: {exc}')
await ctx.author.send(f'unexpected download failure: {exc}')
else:
await ctx.author.send(f'download complete for <{url}>')
finally:
clear_active_job(job)
@interactions.slash_command(name="interrupt",description="cancel current job")
@interactions.check(interactions.is_owner())
async def _interrupt(ctx):
job = get_active_job()
if not job:
await ctx.author.send('no active download to interrupt')
return
job['cancel_event'].set()
print(f'interrupt requested for {job["request_url"]}')
await ctx.author.send(
f'interrupt requested for <{job["request_url"]}>; '
'cancellation is coarse and will stop on the next yt-dlp progress update'
)
@interactions.slash_command(name="adduser",description="authorize target user")
@interactions.slash_option(
@@ -166,13 +228,8 @@ async def _removeuser(ctx: interactions.SlashContext, user:interactions.OptionTy
await ctx.author.send(f'deauthorized {user.mention}')
else:
await ctx.author.send(f'{user.mention} is not currently authorized')
async def dl_hook(d):
msg = f'{d["status"]} {d["filename"]}'
print(msg)
await ctx.author.send(msg)
api_token = getenv('api_token')
if not api_token:
raise ValueError('API token not set. Retrieve from your Discord bot.')
api_token = getenv('api_token')
if not api_token:
raise ValueError('API token not set. Retrieve from your Discord bot.')
bot.start(api_token)