#video.py from app.libs.redprint import Redprint from flask import jsonify, request from app.app import mongo from app.app import jwt from bson.json_util import dumps from bson.objectid import ObjectId from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity import bcrypt import re from pymongo.errors import DuplicateKeyError from werkzeug.exceptions import BadRequest # 初始化红图 api = Redprint('video') @api.route('/get') def get_video(): return "I am video get method" @api.route('/getall') def get_all_video(): video_infos = list(mongo.db.videos.find()) # 将ObjectId转换为字符串,以便于JSON序列化 # 使用bson的json_util来序列化包含ObjectId的对象 # json_string = dumps(user_infos) for video_info in video_infos: video_info["_id"] = str(video_info["_id"]) return jsonify(video_infos) @api.route('/getBlobUrl', methods=['GET']) def get_blob_url_video(): try: # 从查询字符串中获取参数 query = ObjectId(request.args.get('query')) print(query) if not query: raise BadRequest("查询参数为空") video = mongo.db.videos.find_one({"_id": query}) if video: bolb_url = video["video_url"] print(bolb_url) if bolb_url: return jsonify({'queryRes': bolb_url}), 200 else: return jsonify({'queryRes': "None Result"}), 404 except BadRequest as e: return jsonify({"error": str(e)}), 400 except Exception as e: return jsonify({"error": "内部服务器错误"}), 500 @api.route('/search', methods=['GET']) def search_specify_video(): try: # 从查询字符串中获取参数 query = request.args.get('query') print(query) if not query: raise BadRequest("查询参数为空") # 搜索视频 如果由#开头则搜索id,否则搜索视频名称 video = {} videos = [] if query[0] == '#' : if (query == "#1000001#") : videos = mongo.db.videos.find() else: video = mongo.db.videos.find_one({"video_id": query}) else : paQuery = {"video_name": {"$regex": query, "$options": "i"}} videos = mongo.db.videos.find(paQuery) # queryVideo = {} queryRes = [] if video: queryVideo = {} queryVideo["duration"] = video["duration"] queryVideo["cover_url"] = video["cover_url"] queryVideo["video_name"] = video["video_name"] queryVideo["uploader_id"] = video["uploader_id"] # queryVideo["video_url"] = video["video_url"] queryVideo["video_id"] = str(video["_id"]) queryRes.append(queryVideo) for videoItem in videos: queryVideo = {} queryVideo["duration"] = videoItem["duration"] queryVideo["cover_url"] = videoItem["cover_url"] queryVideo["video_name"] = videoItem["video_name"] queryVideo["uploader_id"] = videoItem["uploader_id"] # queryVideo["video_url"] = video["video_url"] queryVideo["video_id"] = str(videoItem["_id"]) queryRes.append(queryVideo) if len(queryRes): return jsonify({'queryRes': queryRes}), 200 else: return jsonify({'queryRes': "None Result"}), 200 except BadRequest as e: return jsonify({"error": str(e)}), 400 except Exception as e: return jsonify({"error": "内部服务器错误"}), 500