Commit 40830af4 authored by Administrator's avatar Administrator

将流程改为两步

parent 598326eb
...@@ -56,6 +56,5 @@ gunicorn start:app -c ./gunicorn.conf.py ...@@ -56,6 +56,5 @@ gunicorn start:app -c ./gunicorn.conf.py
mac调试 mac调试
source /Users/zhouchengbo/Projects/active311 source /Users/zhouchengbo/Projects/active311
<!-- python test.py --wmdigit ./inputs/20231103/3068_1698978622.mp4 --force --lang=en --vad=0 -->
直接flask起来后调
flask --app start run --debug flask --app start run --debug
<!-- python test.py --wmdigit ./inputs/20231103/3068_1698978622.mp4 --force --lang=en --vad=0 -->
...@@ -3,6 +3,7 @@ import os ...@@ -3,6 +3,7 @@ import os
import re import re
import datetime import datetime
import time import time
import json
import ffmpeg import ffmpeg
import numpy as np import numpy as np
...@@ -496,10 +497,11 @@ def combine_video_with_subtitle(video_file, subtitle_file): ...@@ -496,10 +497,11 @@ def combine_video_with_subtitle(video_file, subtitle_file):
print(f"开始给视频加字幕(合并)") print(f"开始给视频加字幕(合并)")
result_file = add_anything(video_file, 'sub') result_file = add_anything(video_file, 'sub')
font_name = 'STHeitiSC-Light' font_name = 'STHeitiSC-Light'
font_size = 8 font_size = 9
font_color = '000000' font_color = '000000'
back_color = '00FFFF' # back_color = '00FFFF' # 黄色
border_style = 3 back_color = 'FFFFFF' # 白色
border_style = 1 # 1=描边,3=整个字体背景
outline = 1 outline = 1
shadow = 0 shadow = 0
marginv= 50 marginv= 50
...@@ -526,3 +528,36 @@ def combine_video_with_subtitle(video_file, subtitle_file): ...@@ -526,3 +528,36 @@ def combine_video_with_subtitle(video_file, subtitle_file):
return result_file return result_file
else: else:
raise "combine_video_with_subtitle 给视频加字幕异常" raise "combine_video_with_subtitle 给视频加字幕异常"
def gen_subjson_from_subs(subs):
# 定义字典列表,用于存储每个字幕项的信息
subtitle_list = []
# 遍历每个字幕项,将其转换为字典形式,添加到字典列表中
for i, subtitle in enumerate(subs):
subtitle_dict = {
'index': str(i+1),
'start': str(subtitle.start),
'end': str(subtitle.end),
'content': subtitle.content
}
subtitle_list.append(subtitle_dict)
return subtitle_list
def gen_srt_from_subjson(sub_json, srt_file):
def format_time(time):
return time.replace('.', ',')
# 遍历每个字幕项
with open(srt_file, "w", encoding="utf-8") as f:
for subtitle in sub_json:
# 获取字幕索引、开始时间、结束时间和内容
index = subtitle['index']
start = format_time(subtitle['start'])
end = format_time(subtitle['end'])
content = subtitle['content']
# 构建单个字幕的文本
subtitle_text = f"{index}\n{start} --> {end}\n{content}\n\n"
# 将单个字幕添加到总体字幕文本中
f.write(subtitle_text)
...@@ -32,22 +32,22 @@ class Cutter: ...@@ -32,22 +32,22 @@ class Cutter:
with open(fns["srt"], encoding=self.args.encoding) as f: with open(fns["srt"], encoding=self.args.encoding) as f:
subs = list(srt.parse(f.read())) subs = list(srt.parse(f.read()))
if fns["md"]: # if fns["md"]:
md = utils.MD(fns["md"], self.args.encoding) # md = utils.MD(fns["md"], self.args.encoding)
# if not md.done_editing(): # # if not md.done_editing():
# return # # return
index = [] # index = []
for mark, sent in md.tasks(): # for mark, sent in md.tasks():
# print(mark, sent) # # print(mark, sent)
# if not mark: # # if not mark:
# continue # # continue
m = re.match(r"\[(\d+)", sent.strip()) # m = re.match(r"\[(\d+)", sent.strip())
if m: # if m:
index.append(int(m.groups()[0])) # index.append(int(m.groups()[0]))
subs = [s for s in subs if s.index in index] # subs = [s for s in subs if s.index in index]
logging.info(f'Cut {fns["media"]} based on {fns["srt"]} and {fns["md"]}') # logging.info(f'Cut {fns["media"]} based on {fns["srt"]} and {fns["md"]}')
else: # else:
logging.info(f'Cut {fns["media"]} based on {fns["srt"]}') # logging.info(f'Cut {fns["media"]} based on {fns["srt"]}')
segments = [] segments = []
# Avoid disordered subtitles # Avoid disordered subtitles
......
...@@ -49,14 +49,16 @@ class Transcribe: ...@@ -49,14 +49,16 @@ class Transcribe:
speech_array_indices = self._detect_voice_activity(audio) speech_array_indices = self._detect_voice_activity(audio)
transcribe_results = self._transcribe(input, audio, speech_array_indices) transcribe_results = self._transcribe(input, audio, speech_array_indices)
output = name + ".srt" srt_fn = name + ".srt"
md_fn = name + ".md"
# print(transcribe_results) # print(transcribe_results)
self._save_srt(output, transcribe_results) srt_json = self._save_srt(srt_fn, transcribe_results)
logging.info(f"Transcribed {input} to {output}") logging.info(f"Transcribed {input} to {srt_fn}")
self._save_md(name + ".md", output, input, bool(self.args.wmdigit)) self._save_md(md_fn, srt_fn, input, bool(self.args.wmdigit))
logging.info(f'Saved texts to {name + ".md"} to mark sentences') logging.info(f'Saved texts to {md_fn} to mark sentences')
return md_fn, srt_fn, srt_json
except Exception as e: except Exception as e:
if retry == 3: if retry == 1:
raise RuntimeError(f"Failed to Transcribing {e}") raise RuntimeError(f"Failed to Transcribing {e}")
else: else:
time.sleep(1) time.sleep(1)
...@@ -122,13 +124,17 @@ class Transcribe: ...@@ -122,13 +124,17 @@ class Transcribe:
def _save_srt(self, output, transcribe_results): def _save_srt(self, output, transcribe_results):
subs = self.whisper_model.gen_srt(transcribe_results) subs = self.whisper_model.gen_srt(transcribe_results)
# print(subs) # print(subs)
# 把字幕中的中文去掉 # 把翻译后的字幕中的中文去掉,有的翻译的不好
if self.args.lang not in ("zh","Japanese"): if self.args.lang not in ("zh","Japanese"):
for s in subs: for s in subs:
s.content = utils.remove_chinese(s.content) s.content = utils.remove_chinese(s.content)
# 生成字幕文件
with open(output, "wb") as f: with open(output, "wb") as f:
f.write(srt.compose(subs).encode(self.args.encoding, "replace")) f.write(srt.compose(subs).encode(self.args.encoding, "replace"))
# 生成字幕json
sub_json = utils.gen_subjson_from_subs(subs)
# print(sub_json)
return sub_json
def _save_md(self, md_fn, srt_fn, video_fn, is_auto_edit=False): def _save_md(self, md_fn, srt_fn, video_fn, is_auto_edit=False):
with open(srt_fn, encoding=self.args.encoding) as f: with open(srt_fn, encoding=self.args.encoding) as f:
......
...@@ -13,63 +13,83 @@ def validate_request(): ...@@ -13,63 +13,83 @@ def validate_request():
error("参数错误: 缺少instances参数") error("参数错误: 缺少instances参数")
instance = request.json['instances'] instance = request.json['instances']
if len(instance) <= 0 or 'video' not in instance[0] or 'lang' not in instance[0]: if len(instance) <= 0 or 'video' not in instance[0] or 'steps' not in instance[0] or 'lang' not in instance[0]:
error('参数错误: instances缺少:video,lang') error('参数错误: instances需要: video, steps, lang')
video = instance[0]['video'] video = instance[0]['video']
steps = instance[0]['steps']
lang = instance[0]['lang'] lang = instance[0]['lang']
with_sub = True if instance[0]['with_sub'] else False
if len(video) <= 0: if len(video) <= 0:
error('参数错误: video 参数不可为空') error('参数错误: video 参数不可为空')
if len(steps) <= 0 or ('step1' not in steps and 'step2' not in steps):
error('参数错误: steps 参数不可为空,取值为:step1、step2')
if len(lang) <= 0: if len(lang) <= 0:
error('参数错误: lang 参数不可为空') error('参数错误: lang 参数不可为空')
with_sub = instance[0]['with_sub'] if 'with_sub' in instance[0] and instance[0]['with_sub'] else False
# if not video.startswith('http'): srt_json_in = ""
# error('video 必须是网络路径') if len(steps) == 1 and 'step2' in steps:
if 'srt_json_in' not in instance[0]:
error('参数错误: instances需要: srt_json_in')
srt_json_in = instance[0]['srt_json_in']
if len(srt_json_in) <= 0:
error('参数错误: srt_json_in 参数不可为空')
return video, lang, with_sub return video, steps, lang, with_sub, srt_json_in
# 主线 # 主线
def video_cut_pipeline(logger, args, whispermodel): def video_cut_pipeline(logger, args, whispermodel):
# print(args) # print(args)
time_record = [] time_record = []
media_file, lang, with_sub = validate_request() media_file, steps, lang, with_sub, srt_json_in = validate_request()
all_start_time = time.time() all_start_time = time.time()
srt_fn = utils.change_ext(media_file, "srt")
md_fn = utils.change_ext(media_file, "md")
args.lang = lang args.lang = lang
srt_json_out = ""
final_video_fn = ""
if 'step1' in steps:
# 1、从视频生成字幕
time_record, srt_json_out = step1(time_record, logger, args, whispermodel, media_file, lang)
if 'step2' in steps:
srt_json = srt_json_out if 'step1' in steps else srt_json_in
# 2、从字幕生成cut视频
time_record, final_video_fn = step2(time_record, logger, args, media_file, with_sub, srt_json)
time_record.append(f"所有步骤处理完毕。耗时: {time.time() - all_start_time:.4f} 秒")
for i in time_record:
print(i)
# 如果只有step1则返回字幕,如果只有step2或者1和2都有,则返回最终结果
if len(steps) == 1 and steps[0] == 'step1':
return srt_json_out
else:
return final_video_fn
# 1、视频生成srt和md
def step1(time_record, logger, args, whispermodel, media_file, lang):
start_time = time.time() start_time = time.time()
if lang != "zh": if lang != "zh":
prompt = f"Subtitles must be fully translated into {lang}" prompt = f"Subtitles must be fully translated into {lang}"
else: else:
prompt = "" prompt = ""
logger.info(f"Transcribe {media_file} lang={lang} promt={prompt}") logger.info(f"Transcribe {media_file} lang={lang} promt={prompt}")
args.inputs = [media_file] args.inputs = [media_file]
wmdigit_transcribe.Transcribe(args, whispermodel).run() md_fn, srt_fn, srt_json_out = wmdigit_transcribe.Transcribe(args, whispermodel).run()
time_record.append(f"视频生成srt和md。耗时: {time.time() - start_time:.4f} 秒") time_record.append(f"视频生成srt和md。耗时: {time.time() - start_time:.4f} 秒")
return time_record, srt_json_out
# 2、从字幕生成cut视频
def step2(time_record, logger, args, media_file, with_sub, srt_json_in):
start_time = time.time() start_time = time.time()
srt_fn = utils.change_ext(media_file, "srt")
md_fn = utils.change_ext(media_file, "md")
# 根据 srt_json_in 重新生成 srt 文件
utils.gen_srt_from_subjson(srt_json_in, srt_fn)
logger.info(f"Cut {media_file} srt={srt_fn} sub={with_sub}")
args.inputs = [media_file, md_fn, srt_fn] args.inputs = [media_file, md_fn, srt_fn]
final_video_fn, new_srt_fn = wmdigit_cut.Cutter(args).run() final_video_fn, new_srt_fn = wmdigit_cut.Cutter(args).run()
time_record.append(f"从字幕生成cut视频。耗时: {time.time() - start_time:.4f} 秒") time_record.append(f"从字幕生成cut视频。耗时: {time.time() - start_time:.4f} 秒")
# 加字幕
# 3、加字幕
if with_sub: if with_sub:
start_time = time.time() start_time = time.time()
final_video_fn = utils.combine_video_with_subtitle(final_video_fn, new_srt_fn) final_video_fn = utils.combine_video_with_subtitle(final_video_fn, new_srt_fn)
time_record.append(f"加字幕耗时: {time.time() - start_time:.4f} 秒") time_record.append(f"加字幕耗时: {time.time() - start_time:.4f} 秒")
return time_record, final_video_fn
time_record.append(f"所有步骤处理完毕。耗时: {time.time() - all_start_time:.4f} 秒") \ No newline at end of file
for i in time_record:
print(i)
return final_video_fn, srt_fn
...@@ -19,20 +19,14 @@ app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:WMdigit.2018@rm-2z ...@@ -19,20 +19,14 @@ app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:WMdigit.2018@rm-2z
db.init_app(app) db.init_app(app)
logger.info("start services") logger.info("start services")
# 全局路径
root = './'
app_root = os.path.join(root, 'app')
input_root = os.path.join(root, 'inputs')
output_root = os.path.join(root, 'outputs')
# 预加载模型 # 预加载模型
args, whispermodel = main_args(logger, app.config['DEBUG']) args, whispermodel = main_args(logger, app.config['DEBUG'])
# 对外接口 # 对外接口
@app.route('/wm_video_cut', methods=['POST']) @app.route('/wm_video_cut', methods=['POST'])
def wm_video_cut(): def wm_video_cut():
final_video_url, srt_url = video_cut_pipeline(logger, args, whispermodel) result = video_cut_pipeline(logger, args, whispermodel)
return jsonify({"result": {"final_video_url": final_video_url, "srt_url": srt_url}}) return jsonify({"result": result})
@app.route('/upload_file', methods=['POST']) @app.route('/upload_file', methods=['POST'])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment