All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 2m53s
216 lines
7.9 KiB
Python
216 lines
7.9 KiB
Python
import asyncio
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from discord import Intents, Client, Interaction, SelectOption, Message, app_commands, ui
|
|
|
|
from misc import extract_url_from_string, render_progress_bar
|
|
import ytdlp
|
|
|
|
load_dotenv()
|
|
|
|
discord_token = os.getenv("DISCORD_TOKEN")
|
|
out_path = os.getenv("OUT_PATH")
|
|
temp_path = os.getenv("TEMP_PATH")
|
|
|
|
class DownloadVideo(ui.View):
|
|
def __init__(self, url: str):
|
|
super().__init__()
|
|
self.url = url
|
|
self.video_options = []
|
|
self.audio_options = []
|
|
self.selected_video = None
|
|
self.selected_audio = None
|
|
self._populate_format_options()
|
|
|
|
self.video_select = ui.Select(
|
|
placeholder="Choose a video format...",
|
|
options=self.video_options,
|
|
min_values=1,
|
|
max_values=1,
|
|
custom_id="video_select"
|
|
)
|
|
self.video_select.callback = self.on_video_select
|
|
self.add_item(self.video_select)
|
|
|
|
self.audio_select = ui.Select(
|
|
placeholder="Choose an audio format...",
|
|
options=self.audio_options,
|
|
min_values=1,
|
|
max_values=1,
|
|
custom_id="audio_select",
|
|
disabled=True # Start disabled
|
|
)
|
|
self.audio_select.callback = self.on_audio_select
|
|
self.add_item(self.audio_select)
|
|
|
|
def _populate_format_options(self):
|
|
try:
|
|
formats = ytdlp.get_formats(self.url)
|
|
|
|
# Sort video options by height and bitrate
|
|
sorted_videos = sorted(
|
|
formats['video_options'],
|
|
key=lambda v: (v.get('height', 0), v.get('tbr', 0)),
|
|
reverse=True
|
|
)
|
|
|
|
self.video_options = [
|
|
SelectOption(
|
|
label=f"{v['resolution']} {v['height']}p ({v['ext']}, {int(v['tbr'] or 0)}kbps)",
|
|
value=v['format_id']
|
|
)
|
|
for v in sorted_videos[:25] # limit
|
|
]
|
|
self.audio_options = [
|
|
SelectOption(
|
|
label=f"{a['format']} ({a['ext']}, {int(a['tbr'] or 0)}kbps"
|
|
+ (f", {a['language']}" if a.get('language') else "") + ")",
|
|
value=a['format_id']
|
|
)
|
|
for a in formats['audio_options'][:25] # limit
|
|
]
|
|
# Ensure at least one option for each select
|
|
if not self.video_options:
|
|
self.video_options = [SelectOption(label="No video formats found", value="none", default=True, description="No video formats available", disabled=True)]
|
|
if not self.audio_options:
|
|
self.audio_options = [SelectOption(label="No audio formats found", value="none", default=True, description="No audio formats available", disabled=True)]
|
|
except Exception as e:
|
|
print(f"Error fetching formats: {e}")
|
|
self.video_options = [SelectOption(label="Error loading formats", value="none", default=True, description="Error loading formats", disabled=True)]
|
|
self.audio_options = [SelectOption(label="Error loading formats", value="none", default=True, description="Error loading formats", disabled=True)]
|
|
|
|
async def on_video_select(self, interaction: Interaction):
|
|
self.selected_video = self.video_select.values[0]
|
|
# Disable video select, enable audio select
|
|
self.video_select.disabled = True
|
|
self.audio_select.disabled = False
|
|
await interaction.response.edit_message(
|
|
content="Now select an audio format:",
|
|
view=self
|
|
)
|
|
|
|
async def on_audio_select(self, interaction: Interaction):
|
|
self.selected_audio = self.audio_select.values[0]
|
|
format_string = f"{self.selected_video}+{self.selected_audio}"
|
|
# Optionally, disable both selects after selection
|
|
self.audio_select.disabled = True
|
|
|
|
# Get labels for summary
|
|
video_label = next((o.label for o in self.video_options if o.value == self.selected_video), self.selected_video)
|
|
audio_label = next((o.label for o in self.audio_options if o.value == self.selected_audio), self.selected_audio)
|
|
format_string = f"{self.selected_video}+{self.selected_audio}"
|
|
|
|
# Show progress bar
|
|
await interaction.response.edit_message(
|
|
content=f"Downloading video: {render_progress_bar(0)}",
|
|
view=self
|
|
)
|
|
|
|
progress_queue = asyncio.Queue()
|
|
stop_event = asyncio.Event()
|
|
|
|
async def progress_worker(msg):
|
|
last_percent = 0
|
|
while not stop_event.is_set():
|
|
try:
|
|
percent = await asyncio.wait_for(progress_queue.get(), timeout=2)
|
|
# Only update every 10% or when at 100%
|
|
if percent // 10 > last_percent // 10 or percent == 100:
|
|
await msg.edit(content=f"Downloading video: {render_progress_bar(percent)}")
|
|
last_percent = percent
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
|
|
# Get message to edit
|
|
msg = await interaction.original_response()
|
|
|
|
worker_task = asyncio.create_task(progress_worker(msg))
|
|
|
|
error = None
|
|
|
|
async def progress_callback(percent):
|
|
await progress_queue.put(percent)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
def threaded_download():
|
|
nonlocal error
|
|
try:
|
|
ytdlp.download_video(
|
|
self.url, format_string, out_path, temp_path, progress_callback, loop
|
|
)
|
|
except Exception as e:
|
|
error = e
|
|
await asyncio.to_thread(threaded_download)
|
|
|
|
stop_event.set()
|
|
await worker_task
|
|
|
|
if error:
|
|
await msg.edit(content=f"❌ Download failed: {error}")
|
|
return
|
|
|
|
# Show completion and summary
|
|
await msg.edit(content="✅ Download complete!\n"
|
|
)
|
|
|
|
# Send public message
|
|
await interaction.followup.send(
|
|
f"## Video Downloaded\n"
|
|
f"**URL**: {self.url}\n"
|
|
f"**Quality**: {video_label} + {audio_label}\n"
|
|
f"**Requested By**: <@{interaction.user.id}>"
|
|
)
|
|
|
|
|
|
# Bot setup
|
|
intents = Intents.default()
|
|
intents.message_content = True
|
|
client = Client(intents=intents)
|
|
tree = app_commands.CommandTree(client)
|
|
|
|
@client.event
|
|
async def on_ready() -> None:
|
|
"""Called when the bot is ready."""
|
|
print(f"Logged in as {client.user}")
|
|
await tree.sync()
|
|
|
|
# /download command
|
|
@app_commands.allowed_contexts(dms=True, private_channels=True)
|
|
@app_commands.allowed_installs(users=True)
|
|
@tree.command(name="download", description="Download video and save it to 'youtube-vids'")
|
|
async def download_command(interaction: Interaction, url: str) -> None:
|
|
"""Handles the /download command."""
|
|
await interaction.response.defer(ephemeral=True, thinking=True)
|
|
|
|
view = DownloadVideo(url)
|
|
await interaction.followup.send(
|
|
content="Select a video format:",
|
|
view=view,
|
|
ephemeral=True
|
|
)
|
|
|
|
# direct URL download from message context
|
|
@app_commands.allowed_contexts(dms=True, private_channels=True)
|
|
@app_commands.allowed_installs(users=True)
|
|
@tree.context_menu(name="download video")
|
|
async def download_context(interaction: Interaction, message: Message) -> None:
|
|
"""Handles requests to download a URL from an already posted message"""
|
|
await interaction.response.defer(ephemeral=True, thinking=True)
|
|
|
|
url = extract_url_from_string(message.clean_content)
|
|
if url is not None:
|
|
view = DownloadVideo(url)
|
|
await interaction.followup.send(
|
|
content="Select a video format:",
|
|
view=view,
|
|
ephemeral=True
|
|
)
|
|
else:
|
|
await interaction.followup.send(
|
|
content="No URL was found in message, sorry :-(",
|
|
ephemeral=True
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
client.run(discord_token) |