Commit b1b24876 authored by Administrator's avatar Administrator

增加字幕的功能

parent 805ecc9f
......@@ -50,7 +50,12 @@ docker run --gpus all --runtime=nvidia -v ./inputs/:/app/inputs/ -v ./outputs/:/
ffmpeg -hwaccels
<!-- 调试命令 -->
python start.py
<!-- python start.py -->
gunicorn start:app -c ./gunicorn.conf.py
flask --app start run --debug
\ No newline at end of file
mac调试
source /Users/zhouchengbo/Projects/active311
<!-- python test.py --wmdigit ./inputs/20231103/3068_1698978622.mp4 --force --lang=en --vad=0 -->
直接flask起来后调
flask --app start run --debug
import logging
import os
import re
import datetime
import time
import ffmpeg
import numpy as np
......@@ -9,7 +11,8 @@ import srt
from moviepy.editor import VideoFileClip
import asyncio
import edge_tts
import time
from textwrap import wrap
import subprocess
def load_audio(file: str, sr: int = 16000) -> np.ndarray:
......@@ -439,4 +442,87 @@ def remove_chinese(text):
print(f'有中文:{char}')
continue
result += char
return result
\ No newline at end of file
return result
def re_gen_subtitle_wrap(srt_file, fontsize, max_width, scaling, wrap_srt_file):
font_to_char_ratio = 2
subtitle_width_ratio = 0.9
average_char_width = float(font_to_char_ratio) * float(fontsize) * scaling # 以视频宽1024为基准来计算字体宽度
max_line_length = int(max_width * float(subtitle_width_ratio) / float(average_char_width)) # 用来控制最大宽度多少字符开始换行
with open(srt_file, 'r', encoding='utf-8') as f:
content = f.read()
pattern = re.compile(r'(\d+)\n(.*?) --> (.*?)\n(.*?)(?:\n\n|$)', re.DOTALL)
subtitles = re.findall(pattern, content)
with open(wrap_srt_file, "w", encoding="utf-8") as f:
for i, start, end, text in subtitles:
wrapped_text_lines = wrap(text, width=max_line_length)
adjusted_text = ""
if text != "< No Speech >":
for idx, line in enumerate(wrapped_text_lines):
adjusted_text += line
if idx < len(wrapped_text_lines) - 1:
adjusted_text += "\n"
# 写入序号
f.write(str(i) + '\n')
# 写入时间轴
f.write(start + ' --> ' + end + '\n')
# 写入字幕文本
f.write(adjusted_text + '\n')
# 写入空行
f.write('\n')
def gen_srt_from_new_segments(segments):
cc = opencc.OpenCC("t2s")
subs = []
def _add_sub(start, end, text):
subs.append(
srt.Subtitle(
index=0,
start=datetime.timedelta(seconds=start),
end=datetime.timedelta(seconds=end),
content=cc.convert(text.strip()),
)
)
for s in segments:
_add_sub(s["new_start"], s["new_end"], s["content"])
return subs
def combine_video_with_subtitle(video_file, subtitle_file):
print(f"开始给视频加字幕(合并)")
result_file = add_anything(video_file, 'sub')
font_name = 'STHeitiSC-Light'
font_size = 8
font_color = '000000'
back_color = '00FFFF'
border_style = 3
outline = 1
shadow = 0
marginv= 50
# 加载视频文件
clip = VideoFileClip(video_file)
# 获取视频宽度
video_width = clip.w
# 关闭视频
clip.close()
# 计算缩放比例,以视频宽1024为基准
scaling = float(video_width) / 1024
# 根据字号大小,重新生成自动换行的字幕文件
wrap_sub_fn = add_anything(subtitle_file, 'wrap')
re_gen_subtitle_wrap(subtitle_file, font_size, video_width, scaling, wrap_sub_fn)
# 设置字幕的style
style = f'FontName={font_name},FontSize={font_size},PrimaryColour=&H{font_color},OutlineColour=&H{back_color},' \
f'BorderStyle={border_style},Outline={outline},Shadow={shadow},MarginV={marginv}'
# 加字幕
cmdline = f"""ffmpeg -y -i {video_file} -vf "subtitles={wrap_sub_fn}:force_style='{style}'" -c:a copy {result_file}"""
print(cmdline)
result = subprocess.call(cmdline, shell=True)
# 0--success, 1-fail
if result == 0:
return result_file
else:
raise "combine_video_with_subtitle 给视频加字幕异常"
......@@ -144,7 +144,8 @@ class WhisperModel(AbstractWhisperModel):
if start > end:
continue
# mark any empty segment that is not very short
if start > prev_end + 1.0:
# if start > prev_end + 1.0:
if start > prev_end:
_add_sub(prev_end, start, "< No Speech >")
_add_sub(start, end, s["text"])
prev_end = end
......
......@@ -69,8 +69,14 @@ class Cutter:
new_duration = utils.get_mp3_duration(tts_fn)
else:
new_duration = v_duration
# 记录新的开始和结束时间,供后面生成字幕使用。
# 新的开始 = max(上一段的新的结束时间 vs 本段的旧的开始时间 孰大?)
# 新的结束 = 新的开始 + 新的历时
new_v_start = 0.000 if x.index == 1 else max(new_v_end, v_start)
new_v_end = round(new_v_start + new_duration, 3)
segments.append(
{"idx":x.index, "video_start": v_start, "video_end": v_end, "video_duration": v_duration, "content": x.content, "tts_fn": tts_fn, "new_duration": new_duration}
{"idx":x.index, "video_start": v_start, "video_end": v_end, "video_duration": v_duration, "content": x.content, "tts_fn": tts_fn,
"new_duration": new_duration, "new_start": new_v_start, "new_end": new_v_end}
)
media = editor.VideoFileClip(fns["media"])
......@@ -110,6 +116,12 @@ class Cutter:
tts_audio_data.close()
print(segments)
# 根据新的segments生成新的字幕,供后面使用
new_subs = utils.gen_srt_from_new_segments(segments)
new_subs_fn = utils.add_anything(fns["srt"], 'cut')
with open(new_subs_fn, "wb") as f:
f.write(srt.compose(new_subs).encode(self.args.encoding, "replace"))
# 生成字幕 end
final_clips = [editor.VideoFileClip(s["new_fn"]) for s in segments]
......@@ -129,8 +141,5 @@ class Cutter:
# 清除临时文件
utils.delete_files(os.path.dirname(fns['media']), f"{os.path.splitext(os.path.basename(fns['media']))[0]}_temp_")
# utils.delete_files(os.path.dirname(fns['media']), f"._{os.path.splitext(os.path.basename(fns['media']))[0]}")
# utils.delete_files(os.path.dirname(fns['media']), f".DS_Store")
# utils.delete_files(os.path.dirname(fns['media']), f"._.DS_Store")
utils.delete_files(os.path.dirname(fns['media']), f".")
return output_fn
\ No newline at end of file
return output_fn, new_subs_fn
\ No newline at end of file
......@@ -50,6 +50,7 @@ class Transcribe:
transcribe_results = self._transcribe(input, audio, speech_array_indices)
output = name + ".srt"
# print(transcribe_results)
self._save_srt(output, transcribe_results)
logging.info(f"Transcribed {input} to {output}")
self._save_md(name + ".md", output, input, bool(self.args.wmdigit))
......@@ -125,6 +126,7 @@ class Transcribe:
if self.args.lang not in ("zh","Japanese"):
for s in subs:
s.content = utils.remove_chinese(s.content)
with open(output, "wb") as f:
f.write(srt.compose(subs).encode(self.args.encoding, "replace"))
......
import argparse
import logging
import os
from autocut import utils
from autocut.type import WhisperMode, WhisperModel
def main():
parser = argparse.ArgumentParser(
description="Edit videos based on transcribed subtitles",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
logging.basicConfig(
format="[autocut:%(filename)s:L%(lineno)d] %(levelname)-6s %(message)s"
)
logging.getLogger().setLevel(logging.INFO)
parser.add_argument("inputs", type=str, nargs="+", help="Inputs filenames/folders")
parser.add_argument(
"-t",
"--transcribe",
help="Transcribe videos/audio into subtitles",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"-c",
"--cut",
help="Cut a video based on subtitles",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"-d",
"--daemon",
help="Monitor a folder to transcribe and cut",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"-s",
help="Convert .srt to a compact format for easier editing",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"-m",
"--to-md",
help="Convert .srt to .md for easier editing",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"--lang",
type=str,
default="zh",
choices=[
"zh",
"en",
"Afrikaans",
"Arabic",
"Armenian",
"Azerbaijani",
"Belarusian",
"Bosnian",
"Bulgarian",
"Catalan",
"Croatian",
"Czech",
"Danish",
"Dutch",
"Estonian",
"Finnish",
"French",
"Galician",
"German",
"Greek",
"Hebrew",
"Hindi",
"Hungarian",
"Icelandic",
"Indonesian",
"Italian",
"Japanese",
"Kannada",
"Kazakh",
"Korean",
"Latvian",
"Lithuanian",
"Macedonian",
"Malay",
"Marathi",
"Maori",
"Nepali",
"Norwegian",
"Persian",
"Polish",
"Portuguese",
"Romanian",
"Russian",
"Serbian",
"Slovak",
"Slovenian",
"Spanish",
"Swahili",
"Swedish",
"Tagalog",
"Tamil",
"Thai",
"Turkish",
"Ukrainian",
"Urdu",
"Vietnamese",
"Welsh",
],
help="The output language of transcription",
)
parser.add_argument(
"--prompt", type=str, default="", help="initial prompt feed into whisper"
)
parser.add_argument(
"--whisper-mode",
type=str,
default=WhisperMode.WHISPER.value,
choices=WhisperMode.get_values(),
help="Whisper inference mode: whisper: run whisper locally; openai: use openai api.",
)
parser.add_argument(
"--openai-rpm",
type=int,
default=3,
choices=[3, 50],
help="Openai Whisper API REQUESTS PER MINUTE(FREE USERS: 3RPM; PAID USERS: 50RPM). "
"More info: https://platform.openai.com/docs/guides/rate-limits/overview",
)
parser.add_argument(
"--whisper-model",
type=str,
default=WhisperModel.SMALL.value,
choices=WhisperModel.get_values(),
help="The whisper model used to transcribe.",
)
parser.add_argument(
"--bitrate",
type=str,
default="10m",
help="The bitrate to export the cutted video, such as 10m, 1m, or 500k",
)
parser.add_argument(
"--vad", help="If or not use VAD", choices=["1", "0", "auto"], default="auto"
)
parser.add_argument(
"--force",
help="Force write even if files exist",
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"--encoding", type=str, default="utf-8", help="Document encoding format"
)
parser.add_argument(
"--device",
type=str,
default=None,
choices=["cpu", "cuda"],
help="Force to CPU or GPU for transcribing. In default automatically use GPU if available.",
)
parser.add_argument(
"--wmdigit",
help="Convert video to different language",
action=argparse.BooleanOptionalAction,
)
args = parser.parse_args()
if args.wmdigit:
from autocut.wmdigit import Wmdigit
Wmdigit(args).run()
else:
logging.warning("No action, use -c, -t or -d")
if __name__ == "__main__":
main()
......@@ -5,7 +5,7 @@ from app.video_cut.autocut.type import WhisperMode, WhisperModel
def main_args(logger, debug):
logger.info('load augument')
logger.info('loading augument')
parser = argparse.ArgumentParser()
parser.add_argument("--inputs", type=str, help="Inputs filenames/folders")
parser.add_argument(
......@@ -137,8 +137,10 @@ def main_args(logger, debug):
# args.whisper_model = "large-v2"
args.whisper_model = "medium"
args.device = "cuda"
else:
args.whisper_model = "medium"
logger.info(f'load whisper_model: {args.whisper_model}, device: {args.device}')
logger.info(f'loading whisper_model: {args.whisper_model}, device: {args.device}')
whispermodel = whisper_model.WhisperModel(16000)
whispermodel.load(args.whisper_model, args.device)
logger.info(f'done.')
......
......@@ -18,6 +18,7 @@ def validate_request():
video = instance[0]['video']
lang = instance[0]['lang']
with_sub = True if instance[0]['with_sub'] == 'true' else False
if len(video) <= 0:
error('参数错误: video 参数不可为空')
......@@ -27,21 +28,21 @@ def validate_request():
# if not video.startswith('http'):
# error('video 必须是网络路径')
return video, lang
return video, lang, with_sub
# 主线
def video_cut_pipeline(logger, args, whispermodel):
# print(args)
time_record = []
media_file, lang = validate_request()
media_file, lang, with_sub = validate_request()
all_start_time = time.time()
srt_fn = utils.change_ext(media_file, "srt")
md_fn = utils.change_ext(media_file, "md")
args.lang = lang
# 1、视频生成srt和md
start_time = time.time()
srt_fn = utils.change_ext(media_file, "srt")
md_fn = utils.change_ext(media_file, "md")
# 如果目标语言不是中文,则提示whisper翻译全部字幕
if lang != "zh":
prompt = f"Subtitles must be fully translated into {lang}"
else:
......@@ -49,18 +50,22 @@ def video_cut_pipeline(logger, args, whispermodel):
logger.info(f"Transcribe {media_file} lang={lang} promt={prompt}")
args.inputs = [media_file]
args.lang = lang
wmdigit_transcribe.Transcribe(args, whispermodel).run()
time_record.append(f"视频生成srt和md。耗时: {time.time() - start_time:.4f} 秒")
# 2、从字幕生成cut视频
start_time = time.time()
args.inputs = [media_file, md_fn, srt_fn]
final_video_fn = wmdigit_cut.Cutter(args).run()
final_video_fn, new_srt_fn = wmdigit_cut.Cutter(args).run()
time_record.append(f"从字幕生成cut视频。耗时: {time.time() - start_time:.4f} 秒")
time_record.append(f"所有步骤处理完毕。耗时: {time.time() - all_start_time:.4f} 秒")
# 3、加字幕
if with_sub:
start_time = time.time()
final_video_fn = utils.combine_video_with_subtitle(final_video_fn, new_srt_fn)
time_record.append(f"加字幕耗时: {time.time() - start_time:.4f} 秒")
time_record.append(f"所有步骤处理完毕。耗时: {time.time() - all_start_time:.4f} 秒")
for i in time_record:
print(i)
......
"""
api
"""
import os
import time
from datetime import datetime
from flask import Blueprint, g, current_app
from lin import DocResponse, login_required, NotFound
from app.api import api, AuthorizationBearerSecurity
from app.exception import APIParaError, HandleError
from app.api.video_cut.model.video_cut import VideoCut
from app.api.video_cut.schema.video_cut import VideoCutInSchema
from app.schema import MySuccess
from autocut import wmdigit_cut, wmdigit_transcribe, utils
video_cut_api = Blueprint("video_cut", __name__)
@video_cut_api.route("/test", methods=["POST"])
@api.validate(
resp=DocResponse(r=MySuccess),
tags=["video_cut"],
)
def test(json: VideoCutInSchema):
return MySuccess(
data={"result": ''}
)
@video_cut_api.route("/video_cut", methods=["POST"])
# @login_required
@api.validate(
resp=DocResponse(r=MySuccess),
# security=[AuthorizationBearerSecurity],
tags=["video_cut"],
)
def video_cut(json: VideoCutInSchema):
if not g.source_video_url or not g.lang:
raise APIParaError
source_video_url = g.source_video_url.strip()
lang = g.lang.strip()
# 创建记录
rec = VideoCut.create(
**{'source_video_url': source_video_url, 'lang': lang,
'status': 'created', 'process_info': '待处理|'}, commit=True)
# 同步立刻处理
try:
p = handle_one_record(rec)
except Exception as e:
raise HandleError(str(e))
return MySuccess(
data=[p]
)
def handle_one_record(record):
try:
all_start_time = time.time()
process_info = ''
class Args:
pass
args = Args()
media_file = record['source_video_url']
lang = record['lang']
record.update(**{'status': 'processing', 'process_info': process_info}, commit=True)
# 1、视频生成srt和md
start_time = time.time()
srt_fn = utils.change_ext(media_file, "srt")
md_fn = utils.change_ext(media_file, "md")
# 如果目标语言不是中文,则提示whisper翻译全部字幕
if lang != "zh":
prompt = f"Subtitles must be fully translated into {lang}"
else:
prompt = ""
current_app.logger.debug(f"Transcribe {media_file} lang={lang} promt={prompt}")
args.inputs = [media_file]
args.lang = lang
args.wmdigit = True
args.force = True
args.vad = 0
wmdigit_transcribe.Transcribe(args).run()
time_cost = f"{time.time() - start_time:.2f}"
process_info = process_info + f"视频生成srt和md:{time_cost}s|"
# record.update(**{'src_url': src_url, 'md_url': md_url, 'process_info': process_info}, commit=True)
#
# # 2、从字幕生成cut视频
# start_time = time.time()
# final_video_url = wmdigit_cut(media_file, md_fn, srt_fn)
# time_cost = f"{time.time() - start_time:.2f}"
# process_info = process_info + f'从字幕生成cut视频:{time_cost}s|'
# record.update(**{'final_video_url': final_video_url, 'process_info': process_info, 'status': 'done'}, commit=True)
#
# all_end_time = time.time()
# process_info = process_info + f"所有步骤合计:{all_end_time - all_start_time:.2f}s"
# record.update(**{'process_info': process_info}, commit=True)
# current_app.logger.debug(process_info)
# 返回更新后的记录
return record
except Exception as e:
str_e = str(e)[:200]
process_info = process_info + f'处理失败:{str_e}'
record.update(**{'status': 'fail', 'process_info': process_info}, commit=True)
raise e
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment