Commit 2c8d4c77 authored by Administrator's avatar Administrator

init

parent 71e0d415
Pipeline #336 failed with stages
.dockerignore
**/__pycache__
inputs
logs
outputs
tests
**/checkpoints
/app/ai_gen_image/images
\ No newline at end of file
inputs/
outputs/
**/__pycache__
**/checkpoints
active
test/
/app/utils/config.py
**/.DS_Store
FROM harbor.5jstore.com:8020/ai/wm_generate_ai:v0.2
LABEL maintainer="zhouchengbo@wmdigit.com"
ARG DEBIAN_FRONTEND=noninteractive
WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple/
# 拷贝所有文件
ADD ./ /app
WORKDIR /app
# ENTRYPOINT [ "python", "start.py" ]
CMD ["gunicorn", "start:app", "-c", "./gunicorn.conf.py"]
FROM harbor.5jstore.com:8020/common/nvidia-cuda:nvidia-cuda11.3.0-python38-ubuntu18.04-pytorch1.12.1-v2
LABEL maintainer="zhouchengbo@wmdigit.com"
ARG DEBIAN_FRONTEND=noninteractive
# 安装系统依赖
RUN sed -i 's/archive.ubuntu.com/mirrors.tuna.tsinghua.edu.cn/g' /etc/apt/sources.list
RUN apt-get update && apt-get install -y fonts-wqy-zenhei
# 安装python模块
COPY requirements.txt /app/
WORKDIR /app
RUN pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple/
# Install GFPGAN
# RUN pip install git+https://github.com/TencentARC/GFPGAN
COPY ./app/ai_gen_video/GFPGAN-master /app/app/ai_gen_video/GFPGAN-master
WORKDIR /app/app/ai_gen_video/GFPGAN-master
RUN python setup.py install
# Install CLIP
# RUN pip install git+https://github.com/openai/CLIP.git
COPY ./app/ai_gen_image/CLIP-main /app/app/ai_gen_image/CLIP-main
WORKDIR /app/app/ai_gen_image/CLIP-main
RUN python setup.py install
ENV PATH="$PATH:/usr/local/python3/bin"
WORKDIR /app
FROM harbor.5jstore.com:8020/ai/wm_generate_ai:v0.1
LABEL maintainer="zhouchengbo@wmdigit.com"
ARG DEBIAN_FRONTEND=noninteractive
# 安装系统依赖
RUN apt-get update && apt-get install -y ninja-build
# 安装python模块
COPY requirements.txt /app/
WORKDIR /app
# RUN pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple/
RUN pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple
ENV PATH="$PATH:/usr/local/python3/bin"
WORKDIR /app
# wmdigit_video_cut
# AI Generate
视频自动翻译-后端
\ No newline at end of file
# 生成基础镜像,将所有依赖都放到基础镜像中
<!-- v0 -->
docker rmi harbor.5jstore.com:8020/ai/wm_generate_ai:v0
docker build -f Dockerfile_v0 -t harbor.5jstore.com:8020/ai/wm_generate_ai:v0 .
<!-- 0.1 根据v0手工生成,主要安装完整版的 cuda -->
docker rmi harbor.5jstore.com:8020/ai/wm_generate_ai:v0.1
docker run --gpus all --runtime=nvidia -it harbor.5jstore.com:8020/ai/wm_generate_ai:v0 /bin/bash
wget https://developer.download.nvidia.com/compute/cuda/11.3.0/local_installers/cuda_11.3.0_465.19.01_linux.run
sh cuda_11.3.0_465.19.01_linux.run
1、不选驱动
2、报警说发现一个已存在的cuda,是否update,选择否。
3、修改PATH等环境变量:
vi ~/.bashrc
export PATH=/usr/local/cuda-11.3/bin:$PATH
export LD_LIBRARY_PATH=/usr/local/cuda-11.3/lib64:$LD_LIBRARY_PATH
export CUDA_HOME=/usr/local/cuda-11.3
删除下载文件
rm cuda_11.3.0_465.19.01_linux.run
提交新镜像
docker commit c9c2a347491d harbor.5jstore.com:8020/ai/wm_generate_ai:v0.1
<!-- end -->
<!-- 0.2 在0.1基础上安装项目依赖 -->
docker rmi harbor.5jstore.com:8020/ai/wm_generate_ai:v0.2
docker build -f Dockerfile_v0.2 -t harbor.5jstore.com:8020/ai/wm_generate_ai:v0.2 .
# 生成主镜像
docker rmi harbor.5jstore.com:8020/ai/wm_generate_ai:v2
docker build -f Dockerfile -t harbor.5jstore.com:8020/ai/wm_generate_ai:v2 .
docker-compose up -d
上述命令会开启一个http服务,公布如下接口:
* http://localhost:8181/ai_generate_video
* http://localhost:8181/ai_generate_image
网关可根据接口做路由和负载
<!-- 调试docker镜像,去掉Dockerfile里最后一句启动命令 -->
docker run -it harbor.5jstore.com:8020/ai/wm_generate_ai:v0 /bin/bash
docker run -it harbor.5jstore.com:8020/ai/wm_generate_ai:v1 /bin/bash
docker run -it harbor.5jstore.com:8020/ai/wm_generate_ai:v2 /bin/bash
docker run --gpus all --runtime=nvidia -v ./inputs/:/app/inputs/ -v ./outputs/:/app/outputs/ -it harbor.5jstore.com:8020/common/ai_generate_video:proc_v4 /bin/bash
ffmpeg -hwaccels
<!-- 调试命令 -->
python start.py
gunicorn start:app -c ./gunicorn.conf.py
\ No newline at end of file
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class WMOption(db.Model):
id = db.Column(db.BigInteger, primary_key=True, autoincrement=True)
tenant_id = db.Column(db.BigInteger)
option_key = db.Column(db.String(100))
option_value = db.Column(db.String(500))
note = db.Column(db.String(255))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
from urllib import parse
import oss2
import datetime, os
from tqdm import tqdm
from moviepy.editor import VideoFileClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
import time
from app.models.wm_option import WMOption
def download_file(url, file_dir, file_name):
print(f"开始下载:{url}")
try:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
# 下载
local_file_path = os.path.join(file_dir, file_name)
print(local_file_path)
# 发起 GET 请求,获取文件大小
response = requests.get(url, headers={}, stream=True, timeout=30)
print('发起 GET 请求,获取文件大小')
file_size = int(response.headers.get('content-length', 0))
print(f'file_size:{file_size}')
# 下载文件,并显示下载进度条
chunk_size = 1024
with open(local_file_path, 'wb') as file, tqdm(
desc='Downloading file', total=file_size, unit='B', unit_scale=True, unit_divisor=1024,
miniters=1, ascii=True) as progress_bar:
for data in response.iter_content(chunk_size=chunk_size):
# 更新进度条
progress_bar.update(len(data))
# 写入文件
file.write(data)
print(f"下载完成:{local_file_path}")
return local_file_path
except Exception as e:
err_info = "download_file 异常:" + str(e)
print(err_info)
raise err_info
def download_file2(url, file_dir, file_name):
try:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
local_file_path = os.path.join(file_dir, file_name)
response = requests.get(url, stream=True)
response.raise_for_status() # 抛出HTTPError异常
with open(local_file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
print(f"文件下载成功:{local_file_path}")
return local_file_path
except requests.exceptions.RequestException as e:
print(f"下载文件时出现错误:{e}")
raise e
def upload_to_oss(local_file_path, file_name_format):
print(f"上传文件:{local_file_path} 到阿里云OSS")
access_key = "LTAIrA1H432TFke7"
access_secret = "BZDCuyIYd4lAOX5UOh8wK93GuUynkR"
oss_endpoint = "oss-cn-beijing.aliyuncs.com"
oss_bucket = "wm-video-pic"
oss_folder = "wm_gen_video"
try:
# 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
auth = oss2.Auth(access_key, access_secret)
# 填写Bucket名称。
bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket)
# 上传文件到OSS。
now = datetime.datetime.now()
datetime_str = now.strftime('%Y%m%d%H%M%S')
oss_file_path = oss_folder + '/' + datetime_str + '/' + file_name_format
# 上传文件
with open(local_file_path, 'rb') as file:
result = bucket.put_object(oss_file_path, file)
if result.status == 200:
oss_url = f"https://{oss_bucket}.{oss_endpoint}/{oss_file_path}"
print(f"阿里云OSS上传成功,链接:{oss_url}")
return oss_url
else:
print(f"上传阿里云OSS失败")
raise "上传阿里云OSS失败"
except Exception as e:
err_info = "上传阿里云OSS异常:" + str(e)
print(err_info)
if 'Connection aborted' in str(e):
print('上传oss被中断,重试')
return upload_to_oss(local_file_path, file_name_format)
else:
raise err_info
def get_video_length(video_file):
try:
# 加载视频文件
video = VideoFileClip(video_file)
# 获取视频长度,单位为秒
duration = video.duration
# 释放资源
video.close()
return round(duration, 2)
except Exception as e:
err_info = "get_video_length 异常:" + str(e)
raise err_info
def get_audio_length(audio_file):
try:
# 加载文件
audio = AudioFileClip(audio_file)
# 获取长度,单位为秒
duration = audio.duration
# 释放资源
audio.close()
return round(duration, 2)
except Exception as e:
err_info = "get_audio_length 异常:" + str(e)
raise err_info
def get_wm_option(wm_key):
wm_option = WMOption.query.filter_by(option_key=wm_key).first()
if not wm_option:
return ""
else:
return wm_option.option_value
import json
import time
import redis, requests
from app.utils.config import REDIS_HOST, REDIS_PASWORD, REDIS_PORT, REDIS_DB
# 初始化redis
redis_client = redis.Redis(host=REDIS_HOST, password=REDIS_PASWORD, port=REDIS_PORT, db=REDIS_DB)
def translate2en_by_wmcms(content, retry=0):
# 如果失败,每隔retry_sleep秒重试一次,共retry_all_times次
retry_sleep = 2
retry_all_times = 10
print("从缓存获取 wmcms_token")
wmcms_token = redis_client.get('wmcms_token')
if not wmcms_token:
print(f"login_wmcms 获取 wmcms_token")
wmcms_token = login_wmcms()
redis_client.set('wmcms_token', wmcms_token, ex=3600) # 有效期1小时,3600秒
else:
wmcms_token = wmcms_token.decode('utf-8')
req_info = "====== begin request ======>\n" + str(content)
# print(req_info)
print(req_info)
try:
resp = requests.post('https://wmminiportal.wmdigit.com/wmcms/chatgpt',
json={"module": "work", "user": 27, "ask": str(content), "topic": 186},
headers={'content-type': 'application/json;charset=utf8',
'Authorization': 'Bearer ' + wmcms_token},
verify=True)
# print('Status code:', resp.status_code)
# print('Headers:', resp.headers)
# print('Response body:', resp.text)
resp_dic = json.loads(resp.text)
resp_info = '====== response is ======>\n' + str(resp_dic)
# print(resp_info)
print(resp_info)
if 'answer' in resp_dic:
mess = resp_dic['answer']
mess = mess.strip().strip('\n')
return str(mess)
elif 'code' in resp_dic:
if resp_dic['code'] == 10041 or resp_dic['code'] == 10051:
print(f"access token 损坏或过期,重新 login 获取 wmcms_token")
wmcms_token = login_wmcms()
redis_client.set('wmcms_token', wmcms_token, ex=3600) # 有效期1小时,3600秒
return translate2en_by_wmcms(content)
elif resp_dic['code'] == 10200:
print(resp_dic['message'])
return translate2en_by_wmcms(content)
else:
print("与wmcms通讯失败:" + str(resp_dic))
raise str(resp_dic)
else:
print("与wmcms通讯失败:" + str(resp_dic))
raise str(resp_dic)
except Exception as e:
retry += 1
if retry == retry_all_times:
raise f"经过{retry_all_times}次重试,chatgpt依然失败。" + str(e)
print(f"请求chatgpt翻译失败:{str(e)},{retry_sleep}秒后第{retry}次重试……")
time.sleep(retry_sleep)
return translate2en_by_wmcms(content, retry)
def login_wmcms():
try:
url = 'https://wmminiportal.wmdigit.com/wmcms/user/login'
para = {"captcha": "", "username": "wm_video", "password": "WMdigit.2018"}
req_info = "====== begin request ======>\n" + str(url) + '\n' + str(para)
# print(req_info)
print(req_info)
req = requests.post(url,
json=para,
headers={'content-type': 'application/json'},
verify=True)
req_dic = json.loads(req.text)
resp_info = '====== response is ======>\n' + str(req_dic)
# print(resp_info)
print(resp_info)
if 'access_token' in req_dic:
return req_dic['access_token']
else:
print("login_wmcms失败:" + str(req_dic))
raise "login_wmcms失败:" + str(req_dic)
except Exception as e:
print("login_wmcms失败:" + str(e))
raise "login_wmcms失败:" + str(e)
if __name__ == "__main__":
translate2en_by_wmcms("请你作为一个翻译器,将”阳光,美女,沙滩“翻译为英文,只返回翻译的内容。")
\ No newline at end of file
from enum import Enum
from typing import TypedDict, Literal
SPEECH_ARRAY_INDEX = TypedDict("SPEECH_ARRAY_INDEX", {"start": float, "end": float})
LANG = Literal[
"zh",
"en",
"Afrikaans",
"Arabic",
"Armenian",
"Azerbaijani",
"Belarusian",
"Bosnian",
"Bulgarian",
"Catalan",
"Croatian",
"Czech",
"Danish",
"Dutch",
"Estonian",
"Finnish",
"French",
"Galician",
"German",
"Greek",
"Hebrew",
"Hindi",
"Hungarian",
"Icelandic",
"Indonesian",
"Italian",
"Japanese",
"Kannada",
"Kazakh",
"Korean",
"Latvian",
"Lithuanian",
"Macedonian",
"Malay",
"Marathi",
"Maori",
"Nepali",
"Norwegian",
"Persian",
"Polish",
"Portuguese",
"Romanian",
"Russian",
"Serbian",
"Slovak",
"Slovenian",
"Spanish",
"Swahili",
"Swedish",
"Tagalog",
"Tamil",
"Thai",
"Turkish",
"Ukrainian",
"Urdu",
"Vietnamese",
"Welsh",
]
class WhisperModel(Enum):
TINY = "tiny"
BASE = "base"
SMALL = "small"
MEDIUM = "medium"
LARGE = "large"
LARGE_V2 = "large-v2"
@staticmethod
def get_values():
return [i.value for i in WhisperModel]
class WhisperMode(Enum):
WHISPER = "whisper"
OPENAI = "openai"
FASTER = "faster"
@staticmethod
def get_values():
return [i.value for i in WhisperMode]
import logging
import os
import re
import ffmpeg
import numpy as np
import opencc
import srt
from moviepy.editor import VideoFileClip
import asyncio
import edge_tts
import time
def load_audio(file: str, sr: int = 16000) -> np.ndarray:
try:
out, _ = (
ffmpeg.input(file, threads=0)
.output("-", format="s16le", acodec="pcm_s16le", ac=1, ar=sr)
.run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True)
)
# out, _ = (
# ffmpeg.input(file, threads=0)
# .output("./20231103/temp.wav", format="wav", acodec="pcm_s16le", ac=1, ar=sr)
# .run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True)
# )
except ffmpeg.Error as e:
raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e
return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0
def is_video(filename):
_, ext = os.path.splitext(filename)
return ext in [".mp4", ".mov", ".mkv", ".avi", ".flv", ".f4v", ".webm"]
def is_audio(filename):
_, ext = os.path.splitext(filename)
return ext in [".ogg", ".wav", ".mp3", ".flac", ".m4a"]
def change_ext(filename, new_ext):
# Change the extension of filename to new_ext
base, _ = os.path.splitext(filename)
if not new_ext.startswith("."):
new_ext = "." + new_ext
return base + new_ext
def add_cut(filename):
# Add cut mark to the filename
base, ext = os.path.splitext(filename)
if base.endswith("_cut"):
base = base[:-4] + "_" + base[-4:]
else:
base += "_cut"
return base + ext
def add_anything(filename, anything):
base, ext = os.path.splitext(filename)
base += f"_{anything}"
return base + ext
# a very simple markdown parser
class MD:
def __init__(self, filename, encoding):
self.lines = []
self.EDIT_DONE_MAKR = "<-- Mark if you are done editing."
self.encoding = encoding
self.filename = filename
if filename:
self.load_file()
def load_file(self):
if os.path.exists(self.filename):
with open(self.filename, encoding=self.encoding) as f:
self.lines = f.readlines()
def clear(self):
self.lines = []
def write(self):
with open(self.filename, "wb") as f:
f.write("\n".join(self.lines).encode(self.encoding, "replace"))
def tasks(self):
# get all tasks with their status
ret = []
for l in self.lines:
mark, task = self._parse_task_status(l)
if mark is not None:
ret.append((mark, task))
return ret
def done_editing(self):
for m, t in self.tasks():
if m and self.EDIT_DONE_MAKR in t:
return True
return False
def add(self, line):
self.lines.append(line)
def add_task(self, mark, contents):
self.add(f'- [{"x" if mark else " "}] {contents.strip()}')
def add_done_editing(self, mark):
self.add_task(mark, self.EDIT_DONE_MAKR)
def add_video(self, video_fn):
ext = os.path.splitext(video_fn)[1][1:]
self.add(
f'\n<video controls="true" allowfullscreen="true"> <source src="{video_fn}" type="video/{ext}"> </video>\n'
)
def _parse_task_status(self, line):
# return (is_marked, rest) or (None, line) if not a task
m = re.match(r"- +\[([ x])\] +(.*)", line)
if not m:
return None, line
return m.groups()[0].lower() == "x", m.groups()[1]
def check_exists(output, force):
if os.path.exists(output):
if force:
logging.info(f"{output} exists. Will overwrite it")
else:
logging.info(
f"{output} exists, skipping... Use the --force flag to overwrite"
)
return True
return False
def expand_segments(segments, expand_head, expand_tail, total_length):
# Pad head and tail for each time segment
results = []
for i in range(len(segments)):
t = segments[i]
start = max(t["start"] - expand_head, segments[i - 1]["end"] if i > 0 else 0)
end = min(
t["end"] + expand_tail,
segments[i + 1]["start"] if i < len(segments) - 1 else total_length,
)
results.append({"start": start, "end": end})
return results
def remove_short_segments(segments, threshold):
# Remove segments whose length < threshold
return [s for s in segments if s["end"] - s["start"] > threshold]
def merge_adjacent_segments(segments, threshold):
# Merge two adjacent segments if their distance < threshold
results = []
i = 0
while i < len(segments):
s = segments[i]
for j in range(i + 1, len(segments)):
if segments[j]["start"] < s["end"] + threshold:
s["end"] = segments[j]["end"]
i = j
else:
break
i += 1
results.append(s)
return results
def compact_rst(sub_fn, encoding):
cc = opencc.OpenCC("t2s")
base, ext = os.path.splitext(sub_fn)
COMPACT = "_compact"
if ext != ".srt":
logging.fatal("only .srt file is supported")
if base.endswith(COMPACT):
# to original rst
with open(sub_fn, encoding=encoding) as f:
lines = f.readlines()
subs = []
for l in lines:
items = l.split(" ")
if len(items) < 4:
continue
subs.append(
srt.Subtitle(
index=0,
start=srt.srt_timestamp_to_timedelta(items[0]),
end=srt.srt_timestamp_to_timedelta(items[2]),
content=" ".join(items[3:]).strip(),
)
)
with open(base[: -len(COMPACT)] + ext, "wb") as f:
f.write(srt.compose(subs).encode(encoding, "replace"))
else:
# to a compact version
with open(sub_fn, encoding=encoding) as f:
subs = srt.parse(f.read())
with open(base + COMPACT + ext, "wb") as f:
for s in subs:
f.write(
f"{srt.timedelta_to_srt_timestamp(s.start)} --> {srt.timedelta_to_srt_timestamp(s.end)} "
f"{cc.convert(s.content.strip())}\n".encode(encoding, "replace")
)
def trans_srt_to_md(encoding, force, srt_fn, video_fn=None):
base, ext = os.path.splitext(srt_fn)
if ext != ".srt":
logging.fatal("only .srt file is supported")
md_fn = base + ext.split(".")[0] + ".md"
check_exists(md_fn, force)
with open(srt_fn, encoding=encoding) as f:
subs = srt.parse(f.read())
md = MD(md_fn, encoding)
md.clear()
md.add_done_editing(False)
if video_fn:
if not is_video(video_fn):
logging.fatal(f"{video_fn} may not be a video")
md.add_video(os.path.basename(video_fn))
md.add(
f"\nTexts generated from [{os.path.basename(srt_fn)}]({os.path.basename(srt_fn)})."
"Mark the sentences to keep for autocut.\n"
"The format is [subtitle_index,duration_in_second] subtitle context.\n\n"
)
for s in subs:
sec = s.start.seconds
pre = f"[{s.index},{sec // 60:02d}:{sec % 60:02d}]"
md.add_task(False, f"{pre:11} {s.content.strip()}")
md.write()
def change_video_duration(input_video_path, output_video_path, target_duration):
# 加载原始视频
video = VideoFileClip(input_video_path)
# 计算需要改变的倍速
original_duration = video.duration
speed = original_duration / target_duration
# 改变视频的播放速度并生成新的视频文件
video = video.speedx(speed)
video.write_videofile(output_video_path)
def text_to_audio_by_edge_tts(text, local_voice_file, lang, retry=1):
logging.info(f"开始edge-tts生成语音:{text}")
voice = "zh-CN-XiaoxiaoNeural"
if lang == "en":
voice = "en-US-JennyNeural"
if lang == "Afrikaans":
voice = "af-ZA-AdriNeural"
if lang == "Arabic":
voice = "ar-AE-FatimaNeural"
if lang == "Armenian":
voice = "ar-AE-HamdanNeural"
if lang == "Azerbaijani":
voice = "az-AZ-BanuNeural"
if lang == "Belarusian":
voice = "bs-BA-VesnaNeural"
if lang == "Bosnian":
voice = "bn-IN-TanishaaNeural"
if lang == "Bulgarian":
voice = "bg-BG-KalinaNeural"
if lang == "Catalan":
voice = "ca-ES-JoanaNeural"
if lang == "Croatian":
voice = "cy-GB-NiaNeural"
if lang == "Czech":
voice = "cs-CZ-VlastaNeural"
if lang == "Danish":
voice = "da-DK-ChristelNeural"
if lang == "Dutch":
voice = "de-AT-IngridNeural"
if lang == "Estonian":
voice = "es-ES-ElviraNeural"
if lang == "Finnish":
voice = "fi-FI-NooraNeural"
if lang == "French":
voice = "fr-FR-DeniseNeural"
if lang == "Galician":
voice = "ga-IE-OrlaNeural"
if lang == "German":
voice = "gl-ES-SabelaNeural"
if lang == "Greek":
voice = "gu-IN-DhwaniNeural"
if lang == "Hebrew":
voice = "he-IL-HilaNeural"
if lang == "Hindi":
voice = "hi-IN-SwaraNeural"
if lang == "Hungarian":
voice = "hu-HU-NoemiNeural"
if lang == "Icelandic":
voice = "is-IS-GudrunNeural"
if lang == "Indonesian":
voice = "id-ID-GadisNeural"
if lang == "Italian":
voice = "it-IT-ElsaNeural"
if lang == "Japanese":
voice = "ja-JP-NanamiNeural"
if lang == "Kannada":
voice = "ka-GE-EkaNeural"
if lang == "Kazakh":
voice = "kk-KZ-AigulNeural"
if lang == "Korean":
voice = "ko-KR-SunHiNeural"
if lang == "Latvian":
voice = "lo-LA-KeomanyNeural"
if lang == "Lithuanian":
voice = "lt-LT-OnaNeural"
if lang == "Macedonian":
voice = "mk-MK-MarijaNeural"
if lang == "Malay":
voice = "ml-IN-SobhanaNeural"
if lang == "Marathi":
voice = "mr-IN-AarohiNeural"
if lang == "Maori":
voice = "mr-IN-AarohiNeural"
if lang == "Nepali":
voice = "ne-NP-HemkalaNeural"
if lang == "Norwegian":
voice = "nl-BE-DenaNeural"
if lang == "Persian":
voice = "ps-AF-LatifaNeural"
if lang == "Polish":
voice = "pl-PL-ZofiaNeural"
if lang == "Portuguese":
voice = "pt-BR-FranciscaNeural"
if lang == "Romanian":
voice = "ro-RO-AlinaNeural"
if lang == "Russian":
voice = "ru-RU-DmitryNeural"
if lang == "Serbian":
voice = "sr-RS-SophieNeural"
if lang == "Slovak":
voice = "sl-SI-PetraNeural"
if lang == "Slovenian":
voice = "sl-SI-PetraNeural"
if lang == "Spanish":
voice = "sw-KE-ZuriNeural"
if lang == "Swahili":
voice = "sw-KE-ZuriNeural"
if lang == "Swedish":
voice = "sw-KE-ZuriNeural"
if lang == "Tagalog":
voice = "ta-LK-SaranyaNeural"
if lang == "Tamil":
voice = "ta-SG-VenbaNeural"
if lang == "Thai":
voice = "th-TH-PremwadeeNeural"
if lang == "Turkish":
voice = "tr-TR-EmelNeural"
if lang == "Ukrainian":
voice = "uk-UA-PolinaNeural"
if lang == "Urdu":
voice = "ur-IN-GulNeural"
if lang == "Vietnamese":
voice = "vi-VN-HoaiMyNeural"
if lang == "Welsh":
voice = "en-US-JennyNeural"
rate = '+10%'
volume = '+0%'
async def _main() -> None:
communicate = edge_tts.Communicate(text=text, voice=voice, rate=rate, volume=volume)
with open(local_voice_file, "wb") as file:
async for chunk in communicate.stream():
if chunk["type"] == "audio":
file.write(chunk["data"])
# logging.info(f"提交edge-tts")
try:
asyncio.run(_main())
except:
if retry == 10:
raise RuntimeError(f"Failed to generate voice from edge-tts: {text}")
else:
time.sleep(1)
logging.info(f"Retry {i} to generate voice from edge-tts: {text}")
retry += 1
text_to_audio_by_edge_tts(text, local_voice_file, retry=1)
i = 1
while True:
if os.path.exists(local_voice_file):
return True
else:
logging.info(f"轮询等待edge-tts异步生成结果:第{i}次")
time.sleep(1)
i += 1
if i > 60:
logging.error("轮询等待edge-tts生成语音和字幕异常")
raise RuntimeError(f"Failed to generate voice from edge-tts")
else:
continue
def get_mp3_duration(file_path):
from mutagen.mp3 import MP3
audio = MP3(file_path)
duration_in_seconds = audio.info.length
return duration_in_seconds
def delete_files(current_dir, prefix):
files = os.listdir(current_dir) # 获取当前目录下的所有文件和文件夹
for file in files:
if file.startswith(prefix):
file_path = os.path.join(current_dir, file)
try:
os.remove(file_path)
print(f"文件 {file_path} 删除成功")
except OSError as e:
print(f"删除文件 {file_path} 失败: {e}")
def remove_chinese(text):
result = ''
for char in text:
if '\u4e00' <= char <= '\u9fff':
print(f'有中文:{char}')
continue
result += char
return result
\ No newline at end of file
import datetime
import logging
import os
from abc import ABC, abstractmethod
from typing import Literal, Union, List, Any, TypedDict
import numpy as np
import opencc
import srt
from pydub import AudioSegment
from tqdm import tqdm
from .type import SPEECH_ARRAY_INDEX, LANG
# whisper sometimes generate traditional chinese, explicitly convert
cc = opencc.OpenCC("t2s")
class AbstractWhisperModel(ABC):
def __init__(self, mode, sample_rate=16000):
self.mode = mode
self.whisper_model = None
self.sample_rate = sample_rate
@abstractmethod
def load(self, *args, **kwargs):
pass
@abstractmethod
def transcribe(self, *args, **kwargs):
pass
@abstractmethod
def _transcribe(self, *args, **kwargs):
pass
@abstractmethod
def gen_srt(self, transcribe_results: List[Any]) -> List[srt.Subtitle]:
pass
class WhisperModel(AbstractWhisperModel):
def __init__(self, sample_rate=16000):
super().__init__("whisper", sample_rate)
self.device = None
def load(
self,
model_name: Literal[
"tiny", "base", "small", "medium", "large", "large-v2"
] = "small",
device: Union[Literal["cpu", "cuda"], None] = None,
):
self.device = device
import whisper
self.whisper_model = whisper.load_model(model_name, device)
def _transcribe(self, audio, seg, lang, prompt):
r = self.whisper_model.transcribe(
audio[int(seg["start"]) : int(seg["end"])],
task="transcribe",
language=lang,
initial_prompt=prompt,
)
r["origin_timestamp"] = seg
return r
def transcribe(
self,
audio: np.ndarray,
speech_array_indices: List[SPEECH_ARRAY_INDEX],
lang: LANG,
prompt: str,
):
res = []
if self.device == "cpu" and len(speech_array_indices) > 1:
from multiprocessing import Pool
pbar = tqdm(total=len(speech_array_indices))
pool = Pool(processes=4)
sub_res = []
# TODO, a better way is merging these segments into a single one, so whisper can get more context
for seg in speech_array_indices:
sub_res.append(
pool.apply_async(
self._transcribe,
(
self.whisper_model,
audio,
seg,
lang,
prompt,
),
callback=lambda x: pbar.update(),
)
)
pool.close()
pool.join()
pbar.close()
res = [i.get() for i in sub_res]
else:
for seg in (
speech_array_indices
if len(speech_array_indices) == 1
else tqdm(speech_array_indices)
):
r = self.whisper_model.transcribe(
audio[int(seg["start"]) : int(seg["end"])],
task="transcribe",
language=lang,
initial_prompt=prompt,
verbose=False if len(speech_array_indices) == 1 else None,
)
r["origin_timestamp"] = seg
res.append(r)
return res
def gen_srt(self, transcribe_results):
subs = []
def _add_sub(start, end, text):
subs.append(
srt.Subtitle(
index=0,
start=datetime.timedelta(seconds=start),
end=datetime.timedelta(seconds=end),
content=cc.convert(text.strip()),
)
)
prev_end = 0
for r in transcribe_results:
origin = r["origin_timestamp"]
for s in r["segments"]:
start = s["start"] + origin["start"] / self.sample_rate
end = min(
s["end"] + origin["start"] / self.sample_rate,
origin["end"] / self.sample_rate,
)
if start > end:
continue
# mark any empty segment that is not very short
if start > prev_end + 1.0:
_add_sub(prev_end, start, "< No Speech >")
_add_sub(start, end, s["text"])
prev_end = end
return subs
class OpenAIModel(AbstractWhisperModel):
max_single_audio_bytes = 25 * 2**20 # 25MB
split_audio_bytes = 23 * 2**20 # 23MB, 2MB for safety(header, etc.)
rpm = 3
def __init__(self, rpm: int, sample_rate=16000):
super().__init__("openai_whisper-1", sample_rate)
self.rpm = rpm
if (
os.environ.get("OPENAI_API_KEY") is None
and os.environ.get("OPENAI_API_KEY_PATH") is None
):
raise Exception("OPENAI_API_KEY is not set")
def load(self, model_name: Literal["whisper-1"] = "whisper-1"):
try:
import openai
except ImportError:
raise Exception(
"Please use openai mode(pip install '.[openai]') or all mode(pip install '.[all]')"
)
from functools import partial
self.whisper_model = partial(openai.Audio.transcribe, model=model_name)
def transcribe(
self,
input: srt,
audio: np.ndarray,
speech_array_indices: List[SPEECH_ARRAY_INDEX],
lang: LANG,
prompt: str,
) -> List[srt.Subtitle]:
res = []
name, _ = os.path.splitext(input)
raw_audio = AudioSegment.from_file(input)
ms_bytes = len(raw_audio[:1].raw_data)
audios: List[
TypedDict(
"AudioInfo", {"input": str, "audio": AudioSegment, "start_ms": float}
)
] = []
i = 0
for index in speech_array_indices:
start = int(index["start"]) / self.sample_rate * 1000
end = int(index["end"]) / self.sample_rate * 1000
audio_seg = raw_audio[start:end]
if len(audio_seg.raw_data) < self.split_audio_bytes:
temp_file = f"{name}_temp_{i}.wav"
audios.append(
{"input": temp_file, "audio": audio_seg, "start_ms": start}
)
else:
logging.info(
f"Long audio with a size({len(audio_seg.raw_data)} bytes) greater than 25M({25 * 2 ** 20} bytes) "
"will be segmented"
"due to Openai's API restrictions on files smaller than 25M"
)
split_num = len(audio_seg.raw_data) // self.split_audio_bytes + 1
for j in range(split_num):
temp_file = f"{name}_{i}_temp_{j}.wav"
split_audio = audio_seg[
j
* self.split_audio_bytes
// ms_bytes : (j + 1)
* self.split_audio_bytes
// ms_bytes
]
audios.append(
{
"input": temp_file,
"audio": split_audio,
"start_ms": start + j * self.split_audio_bytes // ms_bytes,
}
)
i += 1
if len(audios) > 1:
from multiprocessing import Pool
pbar = tqdm(total=len(audios))
pool = Pool(processes=min(8, self.rpm))
sub_res = []
for audio in audios:
sub_res.append(
pool.apply_async(
self._transcribe,
(
audio["input"],
audio["audio"],
prompt,
lang,
audio["start_ms"],
),
callback=lambda x: pbar.update(),
)
)
pool.close()
pool.join()
pbar.close()
for subs in sub_res:
subtitles = subs.get()
res.extend(subtitles)
else:
res = self._transcribe(
audios[0]["input"],
audios[0]["audio"],
prompt,
lang,
audios[0]["start_ms"],
)
return res
def _transcribe(
self, input: srt, audio: AudioSegment, prompt: str, lang: LANG, start_ms: float
):
audio.export(input, "wav")
subtitles = self.whisper_model(
file=open(input, "rb"), prompt=prompt, language=lang, response_format="srt"
)
os.remove(input)
return list(
map(
lambda x: (
setattr(
x, "start", x.start + datetime.timedelta(milliseconds=start_ms)
),
setattr(
x, "end", x.end + datetime.timedelta(milliseconds=start_ms)
),
x,
)[-1],
list(srt.parse(subtitles)),
)
)
def gen_srt(self, transcribe_results: List[srt.Subtitle]):
if len(transcribe_results) == 0:
return []
if len(transcribe_results) == 1:
return transcribe_results
subs = [transcribe_results[0]]
for subtitle in transcribe_results[1:]:
if subtitle.start - subs[-1].end > datetime.timedelta(seconds=1):
subs.append(
srt.Subtitle(
index=0,
start=subs[-1].end,
end=subtitle.start,
content="< No Speech >",
)
)
subs.append(subtitle)
return subs
class FasterWhisperModel(AbstractWhisperModel):
def __init__(self, sample_rate=16000):
super().__init__("faster-whisper", sample_rate)
self.device = None
def load(
self,
model_name: Literal[
"tiny", "base", "small", "medium", "large", "large-v2"
] = "small",
device: Union[Literal["cpu", "cuda"], None] = None,
):
try:
from faster_whisper import WhisperModel
except ImportError:
raise Exception(
"Please use faster mode(pip install '.[faster]') or all mode(pip install '.[all]')"
)
self.device = device if device else "cpu"
self.whisper_model = WhisperModel(model_name, self.device)
def _transcribe(self):
raise Exception("Not implemented")
def transcribe(
self,
audio: np.ndarray,
speech_array_indices: List[SPEECH_ARRAY_INDEX],
lang: LANG,
prompt: str,
):
res = []
for seg in speech_array_indices:
segments, info = self.whisper_model.transcribe(
audio[int(seg["start"]) : int(seg["end"])],
task="transcribe",
language=lang,
initial_prompt=prompt,
vad_filter=False,
)
segments = list(segments) # The transcription will actually run here.
r = {"origin_timestamp": seg, "segments": segments, "info": info}
res.append(r)
return res
def gen_srt(self, transcribe_results):
subs = []
def _add_sub(start, end, text):
subs.append(
srt.Subtitle(
index=0,
start=datetime.timedelta(seconds=start),
end=datetime.timedelta(seconds=end),
content=cc.convert(text.strip()),
)
)
prev_end = 0
for r in transcribe_results:
origin = r["origin_timestamp"]
for seg in r["segments"]:
s = dict(start=seg.start, end=seg.end, text=seg.text)
start = s["start"] + origin["start"] / self.sample_rate
end = min(
s["end"] + origin["start"] / self.sample_rate,
origin["end"] / self.sample_rate,
)
if start > end:
continue
# mark any empty segment that is not very short
if start > prev_end + 1.0:
_add_sub(prev_end, start, "< No Speech >")
_add_sub(start, end, s["text"])
prev_end = end
return subs
import copy
import glob
import logging
import os
import time
from . import wmdigit_cut, wmdigit_transcribe, utils
class Wmdigit:
def __init__(self, args):
self.args = args
def run(self):
assert len(self.args.inputs) == 1, "Must provide a single file"
self._pipeline()
def _pipeline(self):
media_file = self.args.inputs[0]
assert utils.is_video(media_file), "Must provide a video file"
args = copy.deepcopy(self.args)
srt_fn = utils.change_ext(media_file, "srt")
md_fn = utils.change_ext(media_file, "md")
# 1、视频生成srt和md
args.inputs = [media_file]
# 如果目标语言不是中文,则提示whisper翻译全部字幕
if args.lang and args.lang != "zh":
args.prompt = f"Subtitles must be fully translated into {args.lang}"
logging.info(f"Transcribe {media_file} lang={args.lang} promt={args.prompt}")
wmdigit_transcribe.Transcribe(args).run()
# 2、从md生成cut视频
args.inputs = [media_file, md_fn, srt_fn]
wmdigit_cut.Cutter(args).run()
import logging
import os
import re
import srt
from moviepy import editor
from . import utils
# Cut media
class Cutter:
def __init__(self, args):
self.args = args
def run(self):
fns = {"srt": None, "media": None, "md": None}
for fn in self.args.inputs:
ext = os.path.splitext(fn)[1][1:]
fns[ext if ext in fns else "media"] = fn
assert fns["media"], "must provide a media filename"
assert fns["srt"], "must provide a srt filename"
output_fn = utils.change_ext(utils.add_cut(fns['media']), "mp4")
output_fn = utils.add_anything(output_fn, self.args.lang)
print(output_fn)
if utils.check_exists(output_fn, self.args.force):
return
with open(fns["srt"], encoding=self.args.encoding) as f:
subs = list(srt.parse(f.read()))
if fns["md"]:
md = utils.MD(fns["md"], self.args.encoding)
# if not md.done_editing():
# return
index = []
for mark, sent in md.tasks():
# print(mark, sent)
# if not mark:
# continue
m = re.match(r"\[(\d+)", sent.strip())
if m:
index.append(int(m.groups()[0]))
subs = [s for s in subs if s.index in index]
logging.info(f'Cut {fns["media"]} based on {fns["srt"]} and {fns["md"]}')
else:
logging.info(f'Cut {fns["media"]} based on {fns["srt"]}')
segments = []
# Avoid disordered subtitles
subs.sort(key=lambda x: x.start)
# print(subs)
base, _ = os.path.splitext(fns['media'])
for x in subs:
v_start = 0.000 if x.index == 1 else x.start.total_seconds()
v_end = x.end.total_seconds()
v_duration = round(v_end - v_start, 3)
if x.content == "< No Speech >":
tts_fn = ""
new_duration = v_duration
else:
# edge-tts 生成新的音频
tts_fn = f"{base}_temp_{x.index}_tts.mp3"
res = utils.text_to_audio_by_edge_tts(x.content, tts_fn, self.args.lang)
if res:
new_duration = utils.get_mp3_duration(tts_fn)
else:
new_duration = v_duration
segments.append(
{"idx":x.index, "video_start": v_start, "video_end": v_end, "video_duration": v_duration, "content": x.content, "tts_fn": tts_fn, "new_duration": new_duration}
)
media = editor.VideoFileClip(fns["media"])
# 发现视频最后一段没说话就没了,并且稍微剪短一点,防止cut的时候报错,这里改一下看看 by zcb
segments[-1]["video_end"] = media.duration - 0.5
# 切视频,并且将视频变化到新的长度,再合并新的音频
clips = [media.subclip(s["video_start"], s["video_end"]) for s in segments]
for i, clip in enumerate(clips, start=0):
# 先把 原clip 存下来
old_clip_fn = f"{base}_temp_{i+1}_old.mp4"
if segments[i]["content"] == "< No Speech >":
# 没说话的clip保留声音
clip.write_videofile(
old_clip_fn, audio_codec="aac", bitrate=self.args.bitrate
)
segments[i]["new_fn"] = old_clip_fn
else:
# 说话的clip去掉声音
clip.write_videofile(
old_clip_fn, bitrate=self.args.bitrate
)
# 然后将其改变时长,生成new clip
new_clip_fn = f"{base}_temp_{i+1}_new_no_audio.mp4"
utils.change_video_duration(old_clip_fn, new_clip_fn, segments[i]["new_duration"])
# 把new clip和tts音频合并
new_clip_data = editor.VideoFileClip(new_clip_fn)
tts_audio_data = editor.AudioFileClip(segments[i]["tts_fn"])
new_clip_with_audio = new_clip_data.without_audio().set_audio(tts_audio_data)
new_clip_with_audio_fn = f"{base}_temp_{i+1}_new_with_audio.mp4"
new_clip_with_audio.write_videofile(
new_clip_with_audio_fn, audio_codec="aac", bitrate=self.args.bitrate
)
segments[i]["new_fn"] = new_clip_with_audio_fn
new_clip_data.close()
tts_audio_data.close()
print(segments)
final_clips = [editor.VideoFileClip(s["new_fn"]) for s in segments]
final_clip: editor.VideoClip = editor.concatenate_videoclips(final_clips)
logging.info(
f"Reduced duration from {media.duration:.1f} to {final_clip.duration:.1f}"
)
# final_clip = final_clip.fx(editor.afx.audio_normalize)
# an alternative to birate is use crf, e.g. ffmpeg_params=['-crf', '18']
final_clip.write_videofile(
output_fn, audio_codec="aac", bitrate=self.args.bitrate
)
media.close()
logging.info(f"Saved media to {output_fn}")
# 清除临时文件
utils.delete_files(os.path.dirname(fns['media']), f"{os.path.splitext(os.path.basename(fns['media']))[0]}_temp_")
# utils.delete_files(os.path.dirname(fns['media']), f"._{os.path.splitext(os.path.basename(fns['media']))[0]}")
# utils.delete_files(os.path.dirname(fns['media']), f".DS_Store")
# utils.delete_files(os.path.dirname(fns['media']), f"._.DS_Store")
utils.delete_files(os.path.dirname(fns['media']), f".")
\ No newline at end of file
import logging
import os
import time
from typing import List, Any
import numpy as np
import srt
import torch
from . import utils, whisper_model
from .type import WhisperMode, SPEECH_ARRAY_INDEX
class Transcribe:
def __init__(self, args):
self.args = args
self.sampling_rate = 16000
self.whisper_model = None
self.vad_model = None
self.detect_speech = None
tic = time.time()
if self.whisper_model is None:
if self.args.whisper_mode == WhisperMode.WHISPER.value:
self.whisper_model = whisper_model.WhisperModel(self.sampling_rate)
self.whisper_model.load(self.args.whisper_model, self.args.device)
elif self.args.whisper_mode == WhisperMode.OPENAI.value:
self.whisper_model = whisper_model.OpenAIModel(
self.args.openai_rpm, self.sampling_rate
)
self.whisper_model.load()
elif self.args.whisper_mode == WhisperMode.FASTER.value:
self.whisper_model = whisper_model.FasterWhisperModel(
self.sampling_rate
)
self.whisper_model.load(self.args.whisper_model, self.args.device)
logging.info(f"Done Init model in {time.time() - tic:.1f} sec")
def run(self, retry=1):
for input in self.args.inputs:
logging.info(f"Transcribing {input}")
name, _ = os.path.splitext(input)
if utils.check_exists(name + ".md", self.args.force):
continue
try:
audio = utils.load_audio(input, sr=self.sampling_rate)
speech_array_indices = self._detect_voice_activity(audio)
transcribe_results = self._transcribe(input, audio, speech_array_indices)
output = name + ".srt"
self._save_srt(output, transcribe_results)
logging.info(f"Transcribed {input} to {output}")
self._save_md(name + ".md", output, input, bool(self.args.wmdigit))
logging.info(f'Saved texts to {name + ".md"} to mark sentences')
except:
if retry == 10:
raise RuntimeError(f"Failed to Transcribing {input}")
else:
time.sleep(1)
logging.info(f"Retry {retry} to Transcribing {input}")
retry += 1
self.run(retry)
def _detect_voice_activity(self, audio) -> List[SPEECH_ARRAY_INDEX]:
"""Detect segments that have voice activities"""
if self.args.vad == "0":
return [{"start": 0, "end": len(audio)}]
tic = time.time()
if self.vad_model is None or self.detect_speech is None:
# torch load limit https://github.com/pytorch/vision/issues/4156
torch.hub._validate_not_a_forked_repo = lambda a, b, c: True
self.vad_model, funcs = torch.hub.load(
repo_or_dir="/home/ubuntu/.cache/torch/hub/snakers4_silero-vad_master", model="silero_vad", source='local'
)
self.detect_speech = funcs[0]
speeches = self.detect_speech(
audio, self.vad_model, sampling_rate=self.sampling_rate
)
# Remove too short segments
speeches = utils.remove_short_segments(speeches, 1.0 * self.sampling_rate)
# Expand to avoid to tight cut. You can tune the pad length
speeches = utils.expand_segments(
speeches, 0.2 * self.sampling_rate, 0.0 * self.sampling_rate, audio.shape[0]
)
# Merge very closed segments
speeches = utils.merge_adjacent_segments(speeches, 0.5 * self.sampling_rate)
logging.info(f"Done voice activity detection in {time.time() - tic:.1f} sec")
return speeches if len(speeches) > 1 else [{"start": 0, "end": len(audio)}]
def _transcribe(
self,
input: str,
audio: np.ndarray,
speech_array_indices: List[SPEECH_ARRAY_INDEX],
) -> List[Any]:
tic = time.time()
print(speech_array_indices)
res = (
self.whisper_model.transcribe(
audio, speech_array_indices, self.args.lang, self.args.prompt
)
if self.args.whisper_mode == WhisperMode.WHISPER.value
or self.args.whisper_mode == WhisperMode.FASTER.value
else self.whisper_model.transcribe(
input, audio, speech_array_indices, self.args.lang, self.args.prompt
)
)
logging.info(f"Done transcription in {time.time() - tic:.1f} sec")
return res
def _save_srt(self, output, transcribe_results):
subs = self.whisper_model.gen_srt(transcribe_results)
# print(subs)
# 把字幕中的中文去掉
if self.args.lang not in ("zh","Japanese"):
for s in subs:
s.content = utils.remove_chinese(s.content)
with open(output, "wb") as f:
f.write(srt.compose(subs).encode(self.args.encoding, "replace"))
def _save_md(self, md_fn, srt_fn, video_fn, is_auto_edit=False):
with open(srt_fn, encoding=self.args.encoding) as f:
subs = srt.parse(f.read())
md = utils.MD(md_fn, self.args.encoding)
md.clear()
md.add_done_editing(is_auto_edit)
md.add_video(os.path.basename(video_fn))
md.add(
f"\nTexts generated from [{os.path.basename(srt_fn)}]({os.path.basename(srt_fn)})."
"Mark the sentences to keep for autocut.\n"
"The format is [subtitle_index,duration_in_second] subtitle context.\n\n"
)
for s in subs:
sec = s.start.seconds
pre = f"[{s.index},{sec // 60:02d}:{sec % 60:02d}]"
md.add_task(is_auto_edit, f"{pre:11} {s.content.strip()}")
md.write()
import argparse
import logging
import os
from autocut import utils
from autocut.type import WhisperMode, WhisperModel
def main():
parser = argparse.ArgumentParser(
description="Edit videos based on transcribed subtitles",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
logging.basicConfig(
format="[autocut:%(filename)s:L%(lineno)d] %(levelname)-6s %(message)s"
)
logging.getLogger().setLevel(logging.INFO)
parser.add_argument("inputs", type=str, nargs="+", help="Inputs filenames/folders")
parser.add_argument(
"-t",
"--transcribe",
help="Transcribe videos/audio into subtitles",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"-c",
"--cut",
help="Cut a video based on subtitles",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"-d",
"--daemon",
help="Monitor a folder to transcribe and cut",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"-s",
help="Convert .srt to a compact format for easier editing",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"-m",
"--to-md",
help="Convert .srt to .md for easier editing",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"--lang",
type=str,
default="zh",
choices=[
"zh",
"en",
"Afrikaans",
"Arabic",
"Armenian",
"Azerbaijani",
"Belarusian",
"Bosnian",
"Bulgarian",
"Catalan",
"Croatian",
"Czech",
"Danish",
"Dutch",
"Estonian",
"Finnish",
"French",
"Galician",
"German",
"Greek",
"Hebrew",
"Hindi",
"Hungarian",
"Icelandic",
"Indonesian",
"Italian",
"Japanese",
"Kannada",
"Kazakh",
"Korean",
"Latvian",
"Lithuanian",
"Macedonian",
"Malay",
"Marathi",
"Maori",
"Nepali",
"Norwegian",
"Persian",
"Polish",
"Portuguese",
"Romanian",
"Russian",
"Serbian",
"Slovak",
"Slovenian",
"Spanish",
"Swahili",
"Swedish",
"Tagalog",
"Tamil",
"Thai",
"Turkish",
"Ukrainian",
"Urdu",
"Vietnamese",
"Welsh",
],
help="The output language of transcription",
)
parser.add_argument(
"--prompt", type=str, default="", help="initial prompt feed into whisper"
)
parser.add_argument(
"--whisper-mode",
type=str,
default=WhisperMode.WHISPER.value,
choices=WhisperMode.get_values(),
help="Whisper inference mode: whisper: run whisper locally; openai: use openai api.",
)
parser.add_argument(
"--openai-rpm",
type=int,
default=3,
choices=[3, 50],
help="Openai Whisper API REQUESTS PER MINUTE(FREE USERS: 3RPM; PAID USERS: 50RPM). "
"More info: https://platform.openai.com/docs/guides/rate-limits/overview",
)
parser.add_argument(
"--whisper-model",
type=str,
default=WhisperModel.SMALL.value,
choices=WhisperModel.get_values(),
help="The whisper model used to transcribe.",
)
parser.add_argument(
"--bitrate",
type=str,
default="10m",
help="The bitrate to export the cutted video, such as 10m, 1m, or 500k",
)
parser.add_argument(
"--vad", help="If or not use VAD", choices=["1", "0", "auto"], default="auto"
)
parser.add_argument(
"--force",
help="Force write even if files exist",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"--encoding", type=str, default="utf-8", help="Document encoding format"
)
parser.add_argument(
"--device",
type=str,
default=None,
choices=["cpu", "cuda"],
help="Force to CPU or GPU for transcribing. In default automatically use GPU if available.",
)
parser.add_argument(
"--wmdigit",
help="Convert video to different language",
action=argparse.BooleanOptionalAction,
)
args = parser.parse_args()
if args.wmdigit:
from autocut.wmdigit import Wmdigit
Wmdigit(args).run()
else:
logging.warning("No action, use -c, -t or -d")
if __name__ == "__main__":
main()
"""
api
"""
import os
import time
from datetime import datetime
from flask import Blueprint, g, current_app
from lin import DocResponse, login_required, NotFound
from app.api import api, AuthorizationBearerSecurity
from app.exception import APIParaError, HandleError
from app.api.video_cut.model.video_cut import VideoCut
from app.api.video_cut.schema.video_cut import VideoCutInSchema
from app.schema import MySuccess
from autocut import wmdigit_cut, wmdigit_transcribe, utils
video_cut_api = Blueprint("video_cut", __name__)
@video_cut_api.route("/test", methods=["POST"])
@api.validate(
resp=DocResponse(r=MySuccess),
tags=["video_cut"],
)
def test(json: VideoCutInSchema):
return MySuccess(
data={"result": ''}
)
@video_cut_api.route("/video_cut", methods=["POST"])
# @login_required
@api.validate(
resp=DocResponse(r=MySuccess),
# security=[AuthorizationBearerSecurity],
tags=["video_cut"],
)
def video_cut(json: VideoCutInSchema):
if not g.source_video_url or not g.lang:
raise APIParaError
source_video_url = g.source_video_url.strip()
lang = g.lang.strip()
# 创建记录
rec = VideoCut.create(
**{'source_video_url': source_video_url, 'lang': lang,
'status': 'created', 'process_info': '待处理|'}, commit=True)
# 同步立刻处理
try:
p = handle_one_record(rec)
except Exception as e:
raise HandleError(str(e))
return MySuccess(
data=[p]
)
def handle_one_record(record):
try:
all_start_time = time.time()
process_info = ''
class Args:
pass
args = Args()
media_file = record['source_video_url']
lang = record['lang']
record.update(**{'status': 'processing', 'process_info': process_info}, commit=True)
# 1、视频生成srt和md
start_time = time.time()
srt_fn = utils.change_ext(media_file, "srt")
md_fn = utils.change_ext(media_file, "md")
# 如果目标语言不是中文,则提示whisper翻译全部字幕
if lang != "zh":
prompt = f"Subtitles must be fully translated into {lang}"
else:
prompt = ""
current_app.logger.debug(f"Transcribe {media_file} lang={lang} promt={prompt}")
args.inputs = [media_file]
args.lang = lang
args.wmdigit = True
args.force = True
args.vad = 0
wmdigit_transcribe.Transcribe(args).run()
time_cost = f"{time.time() - start_time:.2f}"
process_info = process_info + f"视频生成srt和md:{time_cost}s|"
# record.update(**{'src_url': src_url, 'md_url': md_url, 'process_info': process_info}, commit=True)
#
# # 2、从字幕生成cut视频
# start_time = time.time()
# final_video_url = wmdigit_cut(media_file, md_fn, srt_fn)
# time_cost = f"{time.time() - start_time:.2f}"
# process_info = process_info + f'从字幕生成cut视频:{time_cost}s|'
# record.update(**{'final_video_url': final_video_url, 'process_info': process_info, 'status': 'done'}, commit=True)
#
# all_end_time = time.time()
# process_info = process_info + f"所有步骤合计:{all_end_time - all_start_time:.2f}s"
# record.update(**{'process_info': process_info}, commit=True)
# current_app.logger.debug(process_info)
# 返回更新后的记录
return record
except Exception as e:
str_e = str(e)[:200]
process_info = process_info + f'处理失败:{str_e}'
record.update(**{'status': 'fail', 'process_info': process_info}, commit=True)
raise e
version: '3'
services:
pytorch:
image: "harbor.5jstore.com:8020/ai/wm_generate_ai:v1"
restart: always
runtime: nvidia
environment:
- TZ=Asia/Shanghai
volumes:
- ./inputs/:/app/inputs/
- ./outputs/:/app/outputs/
- ./app/ai_gen_video/checkpoints/:/app/app/ai_gen_video/checkpoints/
- ./app/ai_gen_image/checkpoints/:/app/app/ai_gen_image/checkpoints/
#- ./gunicorn.conf.py:/app/gunicorn.conf.py
#- ./start.py:/app/start.py
#- ./app/:/app/app/
ports:
- "8383:5000"
version: '3'
services:
pytorch:
image: "harbor.5jstore.com:8020/ai/wm_generate_ai:v2"
restart: always
runtime: nvidia
environment:
- TZ=Asia/Shanghai
volumes:
- ./inputs/:/app/inputs/
- ./outputs/:/app/outputs/
- ./app/ai_gen_video/checkpoints/:/app/app/ai_gen_video/checkpoints/
- ./app/ai_gen_video_v2/checkpoints/:/app/app/ai_gen_video_v2/checkpoints/
- ./app/ai_gen_image/checkpoints/:/app/app/ai_gen_image/checkpoints/
- ./app/ai_gen_image/images/:/app/app/ai_gen_image/images/
#- ./gunicorn.conf.py:/app/gunicorn.conf.py
#- ./start.py:/app/start.py
#- ./app/:/app/app/
ports:
- "8383:5000"
workers = 1 # 定义同时开启的处理请求的进程数量,根据网站流量适当调整
worker_class = "gevent" # 采用gevent库,支持异步处理请求,提高吞吐量
timeout = 600
bind = "0.0.0.0:5000"
accesslog = '-'
errorlog = '-'
loglevel = 'debug'
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import Flask, abort, request, jsonify
import datetime, os, sys, time
import logging
from app.models.wm_option import db
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['PERMANENT_SESSION_LIFETIME'] = 600 # 10 minutes timeout
app.config['TIMEOUT'] = 600
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:WMdigit.2018@rm-2zex762o7g65303359o.mysql.rds.aliyuncs.com:3306/lincmsprod'
db.init_app(app)
logger.info("start services")
# 全局路径
root = './'
app_root = os.path.join(root, 'app')
input_root = os.path.join(root, 'inputs')
output_root = os.path.join(root, 'outputs')
# 预加载模型
# 对外接口
@app.route('/ai_generate_video', methods=['GET'])
def ai_generate_video():
return jsonify({"result": "akakkakaka"})
if __name__ == "__main__":
# 将host设置为0.0.0.0,则外网用户也可以访问到这个服务
app.run(debug=True, host="0.0.0.0", use_reloader=False)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment