feat: queue de téléchargement séquentielle (un à la fois)
Docker / docker (push) Successful in 1m38s
Docker / docker (push) Successful in 1m38s
asyncio.Queue dans DownloadManager + worker unique démarré dans le lifespan. Les téléchargements s'exécutent un par un dans l'ordre d'arrivée. Suppression de BackgroundTasks (plus nécessaire). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+9
-20
@@ -8,7 +8,6 @@ from datetime import datetime
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import yt_dlp
|
import yt_dlp
|
||||||
from fastapi import BackgroundTasks
|
|
||||||
|
|
||||||
OUTPUT_DIR = "/data/Arte"
|
OUTPUT_DIR = "/data/Arte"
|
||||||
DB_PATH = "data/arte_dl.db"
|
DB_PATH = "data/arte_dl.db"
|
||||||
@@ -87,6 +86,7 @@ class DownloadManager:
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._active: dict[str, dict] = {}
|
self._active: dict[str, dict] = {}
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
|
self._queue: asyncio.Queue = asyncio.Queue()
|
||||||
self._init_db()
|
self._init_db()
|
||||||
|
|
||||||
def _init_db(self):
|
def _init_db(self):
|
||||||
@@ -147,7 +147,8 @@ class DownloadManager:
|
|||||||
).fetchone()
|
).fetchone()
|
||||||
return row is not None
|
return row is not None
|
||||||
|
|
||||||
def _insert_queued(self, url: str, title: str) -> str:
|
async def enqueue(self, url: str, title: str, subtitle: str,
|
||||||
|
year: int | None, category: str) -> str:
|
||||||
dl_id = str(uuid.uuid4())
|
dl_id = str(uuid.uuid4())
|
||||||
now = datetime.now().isoformat()
|
now = datetime.now().isoformat()
|
||||||
with _db() as conn:
|
with _db() as conn:
|
||||||
@@ -157,20 +158,15 @@ class DownloadManager:
|
|||||||
)
|
)
|
||||||
with self._lock:
|
with self._lock:
|
||||||
self._active[dl_id] = {"state": "queued", "progress": 0, "title": title}
|
self._active[dl_id] = {"state": "queued", "progress": 0, "title": title}
|
||||||
|
await self._queue.put((dl_id, url, title, subtitle, year, category))
|
||||||
return dl_id
|
return dl_id
|
||||||
|
|
||||||
def enqueue(self, url: str, title: str, subtitle: str, year: int | None,
|
async def start_worker(self):
|
||||||
category: str, bg: BackgroundTasks) -> str:
|
|
||||||
dl_id = self._insert_queued(url, title)
|
|
||||||
bg.add_task(self._run, dl_id, url, title, subtitle, year, category)
|
|
||||||
return dl_id
|
|
||||||
|
|
||||||
async def enqueue_direct(self, url: str, title: str, subtitle: str,
|
|
||||||
year: int | None, category: str) -> str:
|
|
||||||
dl_id = self._insert_queued(url, title)
|
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
loop.run_in_executor(None, self._run, dl_id, url, title, subtitle, year, category)
|
while True:
|
||||||
return dl_id
|
job = await self._queue.get()
|
||||||
|
dl_id, url, title, subtitle, year, category = job
|
||||||
|
await loop.run_in_executor(None, self._run, dl_id, url, title, subtitle, year, category)
|
||||||
|
|
||||||
def status(self, dl_id: str) -> dict:
|
def status(self, dl_id: str) -> dict:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
@@ -183,13 +179,6 @@ class DownloadManager:
|
|||||||
).fetchall()
|
).fetchall()
|
||||||
return [dict(r) for r in rows]
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
def already_downloaded(self, url: str) -> bool:
|
|
||||||
with _db() as conn:
|
|
||||||
row = conn.execute(
|
|
||||||
"SELECT id FROM downloads WHERE url=? AND state='done' LIMIT 1", (url,)
|
|
||||||
).fetchone()
|
|
||||||
return row is not None
|
|
||||||
|
|
||||||
# ----------------------------------------------------------------- private
|
# ----------------------------------------------------------------- private
|
||||||
|
|
||||||
def _set(self, dl_id: str, **kw):
|
def _set(self, dl_id: str, **kw):
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import os
|
|||||||
import re
|
import re
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
from fastapi import BackgroundTasks, FastAPI, HTTPException, Request
|
from fastapi import FastAPI, HTTPException, Request
|
||||||
from fastapi.responses import HTMLResponse, StreamingResponse
|
from fastapi.responses import HTMLResponse, StreamingResponse
|
||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
from fastapi.templating import Jinja2Templates
|
from fastapi.templating import Jinja2Templates
|
||||||
@@ -52,13 +52,14 @@ async def _auto_dl_loop():
|
|||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
task = asyncio.create_task(_auto_dl_loop())
|
tasks = [
|
||||||
|
asyncio.create_task(dm.start_worker()),
|
||||||
|
asyncio.create_task(_auto_dl_loop()),
|
||||||
|
]
|
||||||
yield
|
yield
|
||||||
task.cancel()
|
for t in tasks:
|
||||||
try:
|
t.cancel()
|
||||||
await task
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
app = FastAPI(title="Arte-dl", lifespan=lifespan)
|
app = FastAPI(title="Arte-dl", lifespan=lifespan)
|
||||||
@@ -133,10 +134,10 @@ class DownloadRequest(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
@app.post("/api/download")
|
@app.post("/api/download")
|
||||||
async def api_download(req: DownloadRequest, bg: BackgroundTasks):
|
async def api_download(req: DownloadRequest):
|
||||||
if not req.url:
|
if not req.url:
|
||||||
raise HTTPException(status_code=400, detail="url required")
|
raise HTTPException(status_code=400, detail="url required")
|
||||||
dl_id = dm.enqueue(req.url, req.title, req.subtitle, req.year, req.category, bg)
|
dl_id = await dm.enqueue(req.url, req.title, req.subtitle, req.year, req.category)
|
||||||
return {"id": dl_id}
|
return {"id": dl_id}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user