""" api """ import os import time from datetime import datetime from flask import Blueprint, g, current_app from lin import DocResponse, login_required, NotFound from app.api import api, AuthorizationBearerSecurity from app.exception import APIParaError, HandleError from app.api.video_cut.model.video_cut import VideoCut from app.api.video_cut.schema.video_cut import VideoCutInSchema from app.schema import MySuccess from autocut import wmdigit_cut, wmdigit_transcribe, utils video_cut_api = Blueprint("video_cut", __name__) @video_cut_api.route("/test", methods=["POST"]) @api.validate( resp=DocResponse(r=MySuccess), tags=["video_cut"], ) def test(json: VideoCutInSchema): return MySuccess( data={"result": ''} ) @video_cut_api.route("/video_cut", methods=["POST"]) # @login_required @api.validate( resp=DocResponse(r=MySuccess), # security=[AuthorizationBearerSecurity], tags=["video_cut"], ) def video_cut(json: VideoCutInSchema): if not g.source_video_url or not g.lang: raise APIParaError source_video_url = g.source_video_url.strip() lang = g.lang.strip() # 创建记录 rec = VideoCut.create( **{'source_video_url': source_video_url, 'lang': lang, 'status': 'created', 'process_info': '待处理|'}, commit=True) # 同步立刻处理 try: p = handle_one_record(rec) except Exception as e: raise HandleError(str(e)) return MySuccess( data=[p] ) def handle_one_record(record): try: all_start_time = time.time() process_info = '' class Args: pass args = Args() media_file = record['source_video_url'] lang = record['lang'] record.update(**{'status': 'processing', 'process_info': process_info}, commit=True) # 1、视频生成srt和md start_time = time.time() srt_fn = utils.change_ext(media_file, "srt") md_fn = utils.change_ext(media_file, "md") # 如果目标语言不是中文,则提示whisper翻译全部字幕 if lang != "zh": prompt = f"Subtitles must be fully translated into {lang}" else: prompt = "" current_app.logger.debug(f"Transcribe {media_file} lang={lang} promt={prompt}") args.inputs = [media_file] args.lang = lang args.wmdigit = True args.force = True args.vad = 0 wmdigit_transcribe.Transcribe(args).run() time_cost = f"{time.time() - start_time:.2f}" process_info = process_info + f"视频生成srt和md:{time_cost}s|" # record.update(**{'src_url': src_url, 'md_url': md_url, 'process_info': process_info}, commit=True) # # # 2、从字幕生成cut视频 # start_time = time.time() # final_video_url = wmdigit_cut(media_file, md_fn, srt_fn) # time_cost = f"{time.time() - start_time:.2f}" # process_info = process_info + f'从字幕生成cut视频:{time_cost}s|' # record.update(**{'final_video_url': final_video_url, 'process_info': process_info, 'status': 'done'}, commit=True) # # all_end_time = time.time() # process_info = process_info + f"所有步骤合计:{all_end_time - all_start_time:.2f}s" # record.update(**{'process_info': process_info}, commit=True) # current_app.logger.debug(process_info) # 返回更新后的记录 return record except Exception as e: str_e = str(e)[:200] process_info = process_info + f'处理失败:{str_e}' record.update(**{'status': 'fail', 'process_info': process_info}, commit=True) raise e