import asyncio import os from dotenv import load_dotenv from discord import Intents, Client, Interaction, SelectOption, Message, app_commands, ui from misc import extract_url_from_string, render_progress_bar import ytdlp load_dotenv() discord_token = os.getenv("DISCORD_TOKEN") out_path = os.getenv("OUT_PATH") temp_path = os.getenv("TEMP_PATH") class DownloadVideo(ui.View): def __init__(self, url: str): super().__init__() self.url = url self.video_options = [] self.audio_options = [] self.selected_video = None self.selected_audio = None self._populate_format_options() self.video_select = ui.Select( placeholder="Choose a video format...", options=self.video_options, min_values=1, max_values=1, custom_id="video_select" ) self.video_select.callback = self.on_video_select self.add_item(self.video_select) self.audio_select = ui.Select( placeholder="Choose an audio format...", options=self.audio_options, min_values=1, max_values=1, custom_id="audio_select", disabled=True # Start disabled ) self.audio_select.callback = self.on_audio_select self.add_item(self.audio_select) def _populate_format_options(self): try: formats = ytdlp.get_formats(self.url) # Sort video options by height and bitrate sorted_videos = sorted( formats['video_options'], key=lambda v: (v.get('height', 0), v.get('tbr', 0)), reverse=True ) self.video_options = [ SelectOption( label=f"{v['resolution']} {v['height']}p ({v['ext']}, {int(v['tbr'] or 0)}kbps)", value=v['format_id'] ) for v in sorted_videos[:25] # limit ] self.audio_options = [ SelectOption( label=f"{a['format']} ({a['ext']}, {int(a['tbr'] or 0)}kbps" + (f", {a['language']}" if a.get('language') else "") + ")", value=a['format_id'] ) for a in formats['audio_options'][:25] # limit ] # Ensure at least one option for each select if not self.video_options: self.video_options = [SelectOption(label="No video formats found", value="none", default=True, description="No video formats available", disabled=True)] if not self.audio_options: self.audio_options = [SelectOption(label="No audio formats found", value="none", default=True, description="No audio formats available", disabled=True)] except Exception as e: print(f"Error fetching formats: {e}") self.video_options = [SelectOption(label="Error loading formats", value="none", default=True, description="Error loading formats", disabled=True)] self.audio_options = [SelectOption(label="Error loading formats", value="none", default=True, description="Error loading formats", disabled=True)] async def on_video_select(self, interaction: Interaction): self.selected_video = self.video_select.values[0] # Disable video select, enable audio select self.video_select.disabled = True self.audio_select.disabled = False await interaction.response.edit_message( content="Now select an audio format:", view=self ) async def on_audio_select(self, interaction: Interaction): self.selected_audio = self.audio_select.values[0] format_string = f"{self.selected_video}+{self.selected_audio}" # Optionally, disable both selects after selection self.audio_select.disabled = True # Get labels for summary video_label = next((o.label for o in self.video_options if o.value == self.selected_video), self.selected_video) audio_label = next((o.label for o in self.audio_options if o.value == self.selected_audio), self.selected_audio) format_string = f"{self.selected_video}+{self.selected_audio}" # Show progress bar await interaction.response.edit_message( content=f"Downloading video: {render_progress_bar(0)}", view=self ) progress_queue = asyncio.Queue() stop_event = asyncio.Event() async def progress_worker(msg): last_percent = 0 while not stop_event.is_set(): try: percent = await asyncio.wait_for(progress_queue.get(), timeout=2) # Only update every 10% or when at 100% if percent // 10 > last_percent // 10 or percent == 100: await msg.edit(content=f"Downloading video: {render_progress_bar(percent)}") last_percent = percent except asyncio.TimeoutError: continue # Get message to edit msg = await interaction.original_response() worker_task = asyncio.create_task(progress_worker(msg)) async def progress_callback(percent): await progress_queue.put(percent) await asyncio.to_thread( ytdlp.download_video, self.url, format_string, out_path, temp_path, progress_callback, asyncio.get_event_loop() ) stop_event.set() await worker_task # Show completion and summary await msg.edit(content="✅ Download complete!\n" ) # Send public message await interaction.followup.send( f"## Video Downloaded\n" f"**URL**: {self.url}\n" f"**Quality**: {video_label} + {audio_label}\n" f"**Requested By**: <@{interaction.user.id}>" ) # Bot setup intents = Intents.default() intents.message_content = True client = Client(intents=intents) tree = app_commands.CommandTree(client) @client.event async def on_ready() -> None: """Called when the bot is ready.""" print(f"Logged in as {client.user}") await tree.sync() # /download command @app_commands.allowed_contexts(dms=True, private_channels=True) @app_commands.allowed_installs(users=True) @tree.command(name="download", description="Download video and save it to 'youtube-vids'") async def download_command(interaction: Interaction, url: str) -> None: """Handles the /download command.""" await interaction.response.defer(ephemeral=True, thinking=True) view = DownloadVideo(url) await interaction.followup.send( content="Select a video format:", view=view, ephemeral=True ) # direct URL download from message context @app_commands.allowed_contexts(dms=True, private_channels=True) @app_commands.allowed_installs(users=True) @tree.context_menu(name="download video") async def download_context(interaction: Interaction, message: Message) -> None: """Handles requests to download a URL from an already posted message""" await interaction.response.defer(ephemeral=True, thinking=True) url = extract_url_from_string(message.clean_content) if url is not None: view = DownloadVideo(url) await interaction.followup.send( content="Select a video format:", view=view, ephemeral=True ) else: await interaction.followup.send( content="No URL was found in message, sorry :-(", ephemeral=True ) if __name__ == "__main__": client.run(discord_token)