算法部署服务实战--代码篇

picture.image

点击下方 公众号 关注,一起进步,持续传达瓜货

picture.image

点击下方卡片,关注「集智书童」公众号

点击加入👉「集智书童」交流群

在上一篇介绍了算法服务的部署上,我们设计了整体的架构,以及后面的原因,下面我们开始就烟火算法服务的代码,来实战这个项目。

前言

大家好,我是张大刀。

上一篇,我们分析了算法服务的整体的一个框架,以及设计思路,感兴趣的可以点这里阅读,这一篇,我们就上一篇的设计思路,来看看代码端是怎么实现的。

1

多线程的实现

在不考虑实时性的情况下,一般用flask即可实现服务多线程的操作,根据上节课的代码架构:

picture.image

我们一步步的来开发,首先是模型层面

1. core/fire_detection:

模型层面的烟火检测模块,主要是烟火检测的前向推理过程,然后定义好输入输出接口,这样和一般检测模块的pipeline保持一致了。

如fire_detection.py:


              
              
                  

                
                  
                    
 
 
 

                                      
  
  
  
  
  
  
import sys, osimport cv2sys.path.insert(0, os.path.split(os.path.realpath(__file__))[0])import numpy as npimport torchfrom .utils import non_max_suppression, scale_coords, letterboxclass FireDetector:    def __init__(self, model_path, inference="onnx", devices="cpu", conf_thres=0.4, iou_thres=0.5, image_size=640, anchor = [[10,13, 16,30, 33,23],[30,61, 62,45, 59,119],[116,90, 156,198, 373,326]]):        self.model_path = model_path        self.inference = inference        self.devices = devices        self.iou_thres = iou_thres        self.conf_thres = conf_thres        self.img_size = image_size        self.anchor = anchor        self.class_name = {0:"fire", 1:"fog"}        if self.inference == 'onnx':            import onnxruntime as ort            self.sess = ort.InferenceSession(self.model_path, providers=['CUDAExecutionProvider'])        else:            import torch            from .models.experimental import attempt_load            self.model = attempt_load(self.model_path, map_location= self.devices)  # load FP32 model            if self.devices != 'cpu':                self.model(torch.zeros(1, 3, self.img_size, self.img_size).to(self.devices).type_as(next(self.model.parameters())))  # run once    def sigmoid(self, x):        return 1.0 / (np.exp(-x) + 1)    def make_grid(self, nx=20, ny=20):        yv, xv = torch.meshgrid([torch.arange(ny), torch.arange(nx)])        return torch.stack((xv, yv), 2).view((1, 1, ny, nx, 2)).float()    def preprocess(self, image_ori, imgsz):        # ---preprocess image for detection        image = cv2.cvtColor(image_ori, cv2.COLOR_BGR2RGB)        image = letterbox(image, imgsz, stride=32)[0]        image = image.astype(np.float32)        image = image / 255.0  # 0 - 255 to 0.0 - 1.0        image = np.transpose(image, [2, 0, 1])  # HWC to CHW, BGR to RGB        image = np.expand_dims(image, axis=0)        image = np.ascontiguousarray(image)        return image    def poseprocess(self, outputs, imgsz):        stride = [imgsz / output.shape[-2] for output in outputs]  # forward        anchor_new = np.array(self.anchor).reshape(len(self.anchor), 1, -1, 1, 1, 2)        z = []        for i, output in enumerate(outputs):            output = self.sigmoid(output)            _, _, width, height, _ = output.shape            grid = np.array(self.make_grid(width, height))            output[..., 0:2] = (output[..., 0:2] * 2. - 0.5 + grid) * stride[i]  # x,y            output[..., 2:4] = (output[..., 2:4] * 2) ** 2 * anchor_new[i]  # w, h            z.append(output.reshape(1, -1, 7))        pred = np.concatenate((z[0], z[1], z[2]), axis=1)        # nms        return pred    def __call__(self, image_ori):        # image_ori = data["data"]        image = self.preprocess(image_ori, self.img_size)        # print ("letterbox", image.shape)        if self.inference == "onnx":            outputs = []            for i in range(len(self.anchor)):                output = self.sess.run([self.sess.get_outputs()[i].name], input_feed={'images': image})[0]                outputs.append(output)            pred = self.poseprocess(outputs, self.img_size)        else:            pred = self.model(image)[0]        pred = non_max_suppression(pred, self.conf_thres, self.iou_thres)        pred_reformat = []        instance_id = 0        for i, det in enumerate(pred):  # detections per image            if len(det):                det[:, :4] = scale_coords(image.shape[2:], det[:, :4], image_ori.shape).round()                for *xyxy, conf, cls in reversed(det):                    # Apply Classifier                    xyxy = np.reshape(xyxy, (1, 4))                    xyxy_ = np.copy(xyxy).tolist()[0]                    xyxy_ = [int(i) for i in xyxy_]                    pred_reformat.append(                        {                            "name": self.class_name[cls.item()],                            "score": conf.item(),                            "bbox":{"x_min": xyxy_[0],                                    "y_min": xyxy_[1],                                    "x_max": xyxy_[2],                                    "y_max": xyxy_[3]}                        })        return pred_reformat
                                  

 
 
 
                  
                
              
            

2. core/cfg.py:

在完成模型的推理后,在cfg.py中定义好模型需要传入的一些参数:


              
              
                  

                
                  
                    
 
 
 

                                    
 
  `import os# variablerepo_dir = os.path.dirname(os.path.dirname(__file__))model_path = os.path.join(repo_dir, 'models_encrypted')if not os.path.exists(model_path):    model_path = os.path.join(repo_dir, 'models')# log 显示层级 # logging.DEBUG , logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL  log_level = "DEBUG",interface_version = "v1.1",det_cfg = dict(model_path=os.path.join(model_path, 'FireDetection_v1.0.0.onnx'), #onnx or torchscript        conf_thres=0.5,        iou_thres = 0.5,        devices="0",        inference='onnx',        image_size= 640)`
 
 
                                  

 
 
 
                  
                
              
            

因为我们这里只涉及到一个模型,没有模型间的串并联以及中间的逻辑后处理,所以没有functions 功能文件夹,同时配置文件比较简单,如果有逻辑后处理的话,参数也是放在cfg.py文件中,后处理的代码应该在functions功能文件夹里面完成。

3. service.py:

在烟火功能算法完成后,在最外层,则涉及到服务的开发,服务的开发,这里用了flask完成多线程,在service.py文件中:


              
              
                
 
 
 

import base64
import time
import dataclasses
from flask import Flask, request
from flask import jsonify
from functions.core import FireDetector
import numpy as np
import cv2
from functions import cfg

from webapi import *
from werkzeug.exceptions import HTTPException

app = Flask(name)

实例化模型对象

detector = FireDetector(**cfg.det_cfg)

@app.route('/fire_detector', methods=["POST"])
def fire_detector():
# get data
try:
# http的解析
if request.headers.get('Content-Type') is None:
raise AlgoError(AlgoErrorCodeEnum.ERROR_REQUEST_NOT_JSON, 'content-type error')
if request.headers.get('Content-Type').lower() != "application/json" and request.headers.get('Content-Type').lower() != "application/x-www-form-urlencoded":
raise AlgoError(AlgoErrorCodeEnum.ERROR_REQUEST_NOT_JSON, 'content-type error')
# 获取输入数据
post_data = request.get_json(silent=True)
# 解析输入数据
request_arg = RequestArgument.from_post_data(post_data) # paraser input data to format
algo_arg = AlgorithmArgument.from_request_arg(request_arg)
# 前向推理,输出结果
algo_result = detector(algo_arg.data)

# 返回结果   
except (ServiceError, AlgoError) as e:  
    response_arg = ResponseArgument(  
        req_id = "",  
        err_no=e.code,  
        err_msg=e.message,  
        result=None)  
    return jsonify(dataclasses.asdict(response_arg))  
except HTTPException as e:  
    response_arg = ResponseArgument(  
        req_id = "",  
        err_no=e.code,  
        err_msg=f'{e.name}: {e.description}',  
        result=None)  
    return jsonify(dataclasses.asdict(response_arg))  
except Exception as e:  
    response_arg = ResponseArgument(  
        req_id = "",  
        err_no=AlgoErrorCodeEnum.ERROR_UNKNOWN.code,  
        err_msg=AlgoErrorCodeEnum.ERROR_UNKNOWN.message,              
        result=None)  
    return jsonify(dataclasses.asdict(response_arg))  
response_arg = ResponseArgument(  
    err_no=ServiceErrorCodeEnum.SUCCESS.code,  
    err_msg=ServiceErrorCodeEnum.SUCCESS.message,  
    req_id = algo_arg.request_id,  
    result=algo_result)  
return jsonify(dataclasses.asdict(response_arg))  

if name == '__main__':
app.run(host='0.0.0.0',
port=2222,
debug=True)


 
 
 
              
            

4. webapi/arguments.py:

这里面涉及到webapi中将接受的请求数据解析成模型输入数据的arguments.py文件:


              
              
                  

                
                  
                    
 
 
 

                                      
  
  
  
  
  
  
import base64import jsonfrom collections import OrderedDictfrom typing import Union, Optional, Listimport cv2import dataclassesimport numpy as npfrom dataclasses import dataclassfrom pydantic import BaseModelfrom .error_code import *__all__ = [ 'RequestArgument', 'AlgorithmArgument',  'ResponseArgument']def normalize_image_shape(image):    if image.ndim == 2:        image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)    elif image.ndim == 3:        num_channels = image.shape[-1]        if num_channels == 1:            image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)        elif num_channels == 3:            pass        elif num_channels == 4:            image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)        else:            raise ValueError('Unsupported!')    else:        raise ValueError('Unsupported!')    return imageclass RequestArgument(BaseModel):    """接口文档的直接翻译    """    request_id: Optional[str] = None  # request id     data: Union[str, bytes]= None  # 图像文件数据, base64 编码    data_path: Optional[str] = None  # 图片路径    data_fmt: Optional[str] = None # 数据格式    params: Optional[str] = None # 输入参数    @staticmethod    def from_post_data(post_data):        try:            request_id = post_data.get('request_id')            data = post_data.get('data')            data_path = post_data.get('data_path')            data_fmt = post_data.get('data_fmt')            params = post_data.get('params')            # post_data ='imageBase64='+ image_data +"&" +"deviceId="+device_id+"&"+"taskId="+task_id # TODO         except:            raise AlgoError(AlgoErrorCodeEnum.ERROR_REQUEST_NOT_JSON)        if post_data is None:            raise AlgoError(AlgoErrorCodeEnum.ERROR_REQUEST_JSON_PARSE)        if request_id is None:            raise AlgoError(AlgoErrorCodeEnum.ERROR_MISSING_ARGS)        if (data is None and data_path is None) or (data is not None and data_path is not None):            raise AlgoError(AlgoErrorCodeEnum.ERROR_IVALID_ARG_VAL)        try:            # post_data = base64.b64decode(post_data.encode()).decode()            post_data_dict = dict(                request_id = request_id,                data = data,                data_path= data_path,                data_fmt= data_fmt,                params = params            )            return RequestArgument(**post_data_dict)        except Exception as e:            raise AlgoError(AlgoErrorCodeEnum.ERROR_REQUEST_JSON_PARSE)@dataclassclass AlgorithmArgument:    """将 RequestArgument 转换为算法的输入参数    """    request_id: Optional[str] = None  # request id     data: Union[str, bytes] = None # 图像文件数据, base64 编码    data_path: Optional[str] = None  # 图片路径    data_fmt: Optional[str] = None # 数据格式    params: Optional[str] = None # 输入参数    # class attribute 需要确认    IMAGE_MAX_WIDTH = 1920     IMAGE_MAX_HEIGHT = 1920    IMAGE_FORMAT=["JPG","jpg","png","jpeg"]    @classmethod    def convert_to_image(cls, data: Union[str, bytes, None],is_data=True) -> Optional[np.ndarray]:        if is_data:            try:                if isinstance(data, str):                    data = data.encode()                filename = base64.b64decode(data)            except Exception:                raise AlgoError(AlgoErrorCodeEnum.ERROR_INPUT_IMAGE_BASE64)            try:                image = cv2.imdecode(np.frombuffer(filename, dtype=np.uint8), -1)            except Exception as e:                print(e)                raise AlgoError(AlgoErrorCodeEnum.ERROR_INPUT_IMAGE_READ)        else:            try:                image = cv2.imdecode(np.fromfile(data, dtype=np.uint8), -1) #bgr            except Exception as e:                print(e)                raise AlgoError(AlgoErrorCodeEnum.ERROR_INPUT_IMAGE_READ)        if image.shape[0] > cls.IMAGE_MAX_HEIGHT or image.shape[1] > cls.IMAGE_MAX_WIDTH:            raise AlgoError(AlgoErrorCodeEnum.ERROR_INPUT_IMAGE_SIZE)        try:            image = normalize_image_shape(image)        except:            raise AlgoError(AlgoErrorCodeEnum.ERROR_INPUT_IMAGE_CN)        return image    @classmethod        def from_request_arg(cls, request_arg: RequestArgument):        algo_dict = {}        #with mdsjg.utils.ContextTimer('convert_base64_to_image'):        algo_dict['request_id'] = request_arg.request_id        if request_arg.data is not None:            algo_dict['data'] = AlgorithmArgument.convert_to_image(request_arg.data, is_data=True)        if request_arg.data_path is not None:            algo_dict['data'] = AlgorithmArgument.convert_to_image(request_arg.data_path, is_data=False)        if request_arg.data_fmt is not None:            if request_arg.data_fmt  not in cls.IMAGE_FORMAT:                raise AlgoError(AlgoErrorCodeEnum.ERROR_INPUT_IMAGE_FORMAT)            else:                algo_dict['data_fmt'] = request_arg.data_fmt        if request_arg.params is not None:            algo_dict['params'] = request_arg.params        return AlgorithmArgument(**algo_dict)    def to_request_arg(self):        raise NotImplementedError@dataclassclass ResponseArgument:    """服务的响应接口"""    err_no: int  # 错误码    err_msg: str  # 错误信息    request_id: str # 请求id    version: str # 接口版本    result: Optional[str] = None  # 算法结
                                  

 
 
 
                  
                
              
            

5. webapi/error_code.py:

错误码的定义,以及反馈在error_code.py文件中:


              
              
                
 
 
 

`from enum import Enum

all = ['ServiceErrorCodeEnum', 'AlgoErrorCodeEnum',
'ServiceError', 'AlgoError']

class ServiceErrorCodeEnum(Enum):
"""服务错误码枚举类"""
SUCCESS = (0, 'Service SUCCESS')
ERROR = (-1, 'Service ERROR')

@property  
def code(self):  
    """获取错误码"""  
    return self.value[0]  

@property  
def message(self):  
    """获取错误码信息"""  
    return self.value[1]  

class AlgoErrorCodeEnum(Enum):
"""算法错误码枚举类"""
SUCCESS = (0, 'SUCCESS')
ERROR_SERVICE_AVAILABLE = (-1, 'service temporarily unavailable')
ERROR_REQUEST_NOT_JSON = (-1000, 'request body should be json format')
ERROR_REQUEST_JSON_PARSE = (-1001, 'request json parse error')
ERROR_MISSING_ARGS = (-1002, 'missing required arguments')
ERROR_IVALID_ARG_VAL = (-1003, 'invalid argument value')
ERROR_ARGUMENT_FORMAT = (-1004, 'argument format error')
ERROR_INPUT_IMAGE_EMPTY = (-1100, 'input image is empty')
ERROR_INPUT_IMAGE_BASE64 = (-1101, 'input image base64 error')
ERROR_INPUT_IMAGE_READ = (-1102, 'input image read error')
ERROR_INPUT_IMAGE_CHECKSUM = (-1103, 'input image checksum error')
ERROR_INPUT_IMAGE = (-1104, 'input image error')
ERROR_INPUT_IMAGE_HEADER = (-1105, 'input image header error')
ERROR_INPUT_IMAGE_SIZE = (-1106, 'input image size is too large')
ERROR_INPUT_IMAGE_CN = (-1107, 'input image channel number error, only support 1,3,4')
ERROR_INPUT_IMAGE_FORMAT =(-1108, 'input image format error, only support "jpg,jpeg,png" format')
ERROR_PREDICT = (-1200, 'predict error')
ERROR_BATCH_PREDICT = (-1201, 'batch predict error')
ERROR_UNKNOWN = (9999, 'unknown error')

@property  
def code(self):  
    """获取错误码"""  
    return self.value[0]  

@property  
def message(self):  
    """获取错误码信息"""  
    return self.value[1]  

class ServiceError(Exception):
"""服务错误异常处理类"""
def __init__(self, error_code: ServiceErrorCodeEnum, extra_str: str=None):
self.name = error_code.name
self.code = error_code.code
if extra_str is None:
self.message = error_code.message
else:
self.message = f'{error_code.message}: {extra_str}'
Exception.init(self)

def \_\_repr\_\_(self):  
    return f'[{self.\_\_class\_\_.\_\_name\_\_} {self.code}] {self.message}'  

__str__ = __repr__  

class AlgoError(Exception):
"""算法错误异常处理类"""
def __init__(self, error_code: AlgoErrorCodeEnum, extra_str: str=None):
self.name = error_code.name
self.code = error_code.code
if extra_str is None:
self.message = error_code.message
else:
self.message = f'{error_code.message}: {extra_str}'
Exception.init(self)

def \_\_repr\_\_(self):  
    return f'[{self.\_\_class\_\_.\_\_name\_\_} {self.code}] {self.message}'  

__str__ = __repr__`


                  

 
 
 
              
            

以上则是整体的代码的定义,总结如下:

picture.image

如果将代码部署在服务器上,只需要运行:


              
              
                
 
 
 

python service.py


 
 
 
              
            

即将模型部署好,且端口为2222。

2

多进程的实现

flask框架无法实现python的多进程,只能通过fastapi这个库来实现,fastapi是基于asyncio的异步编程模型,当一个任务发起I/O请求的时候,程序会自动切换到执行其他任务,等到I/O结果返回时,再切换回来继续执行原任务。这种方法可以提高应用程序的吞吐量和性能。

这样其他的模块,均不需要变动,将service.py文件改成用fastapi部署,同时从asgiref.sync引入sync_to_async库实现异步操作,在service_fastapi文件中:


              
                
                  
                    
 
 
 

import base64import timeimport dataclassesfrom fastapi import FastAPIfrom fastapi import Requestfrom asgiref.sync import sync_to_asyncimport uvicornfrom functions.core import FireDetectorimport numpy as npimport cv2from functions import cfg from functions.utils import * from webapi import * from werkzeug.exceptions import HTTPExceptionapp = FastAPI()# 实例化模型对象detector = FireDetector(**cfg.det_cfg)## 配置日志记录器logger_message(cfg.log_level[0], "services start")@app.post('/fire_detector')async def main(request: Request): # get data try: # http的解析 if request.headers.get('Content-Type') is None: raise AlgoError(AlgoErrorCodeEnum.ERROR_REQUEST_NOT_JSON, 'content-type error') if request.headers.get('Content-Type').lower() != "application/json" and request.headers.get('Content-Type').lower() != "application/x-www-form-urlencoded": raise AlgoError(AlgoErrorCodeEnum.ERROR_REQUEST_NOT_JSON, 'content-type error') # 获取输入数据 try: post_data = await request.json() except: raise AlgoError(AlgoErrorCodeEnum.ERROR_REQUEST_JSON_PARSE, 'request.json') # 解析输入数据 request_arg = RequestArgument.from_post_data(post_data) algo_arg = AlgorithmArgument.from_request_arg(request_arg) logger_message(cfg.log_level[0], '{},{} start'.format(post_data.get('request_id'),time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))) logger_message(cfg.log_level[0], '{},{} start'.format(post_data.get('require_id'),time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))) logger_message(cfg.log_level[0], '{},{} end'.format(post_data.get('request_id'),time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))) logger_message("WARNING", '{},{} end'.format(post_data.get('request_id'),time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))) # logger_message("ERROR", '{},{} end'.format(post_data.get('request_id'),time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))) # 前向推理,输出结果 algo_result = await sync_to_async(detector)(algo_arg.data) # 返回结果 except (ServiceError, AlgoError) as e: response_arg = ResponseArgument( req_id = "", err_no=e.code, err_msg=e.message, result=None) return dataclasses.asdict(response_arg) except HTTPException as e: response_arg = ResponseArgument( req_id = "", err_no=e.code, err_msg=f'{e.name}: {e.description}', result=None) return dataclasses.asdict(response_arg) except Exception as e: response_arg = ResponseArgument( req_id = "", err_no=AlgoErrorCodeEnum.ERROR_UNKNOWN.code, err_msg=AlgoErrorCodeEnum.ERROR_UNKNOWN.message, result=None) return dataclasses.asdict(response_arg) response_arg = ResponseArgument( err_no=ServiceErrorCodeEnum.SUCCESS.code, err_msg=ServiceErrorCodeEnum.SUCCESS.message, req_id = algo_arg.request_id, result=algo_result) response_arg = dataclasses.asdict(response_arg) return response_argif __name__ == '__main__': # app.run(host='0.0.0.0', # port=8081, # debug=True) app.run(host='0.0.0.0', port=2222, debug=True)


 
 
 
                  
                
              
            

部署的时候,有gunicorn和uvicorn两个web服务器,但是因为fastapi采用的是最新的ASGI标准,gunicorn采用的WSGI标准,所以不能直接用gunicorn部署,Gunicorn可以作为进程管理器使用,并且可以设定进程的类型,Uvicorn可以作为Gunicorn的进程类型。使用这种组合,Gunicorn将充当进程管理器,监听端口和IP。它会将接收到的数据传输到运行Uvicorn类的工作进程,然后,Uvicorn将数据传输给FastAPI。

picture.image

这样的话,我们部署fastapi服务时,则可以通过以下命令运行:


              
              
                
 
 
 

gunicorn service\_fastapi:app --workers 2(进程数) --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000(端口号)


 
 
 
              
            

部署好的服务类似这种:

picture.image

以上模型服务部署部分就差不多了。

2

算法服务测试

算法服务部署好后,首先需要自测下,看看服务是否能通,一种方法是自己写脚本测试还有一种通过postman工具测试。如果服务跑通没有问题后,如果部署了多进程一般是对并发量有一定的要求,这样可以通过jmeter或者locust来对并发量进行压测。

01 脚本测试

根据定义好的服务接口,调用接口,输入接口参数,看是否返回结果的方式,看client.py代码:


              
              
                
 
 
 

                                  
  
  
  
  
  
import requests  
import base64  
import cv2  
import time  
  
HEADERS = {"Content-Type": "application/json"}  
url = "http://0.0.0.0:5678/fire\_detector"  
image_path = "data/20220518112259.png"  
with open(image_path, 'rb') as f:  
    img_data = f.read()  
    base64_data = base64.b64encode(img_data)  
    base64_data = base64_data.decode()  
    # base64\_data = str(base64\_data, 'utf-8')  
data = {  
    "request\_id": "1243445sdfge",  
    "data\_path": image_path,  
    # "data": base64\_data,  
    "data\_fmt":'jpg',  
    "parameter" :"{}"  
}  
  
  
response = requests.post(url, headers=HEADERS, json=data)  
print("request:", response.text)
                              

 
 
 
              
            

如果返回正确的结果,则说明服务没有问题,如果报http的问题一般是url或者headers设置不对的问题,如果是返回结果错误则是模型等的问题。

02 postman测试

postman测试,需要安装postman工具,这个在网上就可以找到,安装完成后,新建一个http请求,设置url的路径,以及请求参数:

picture.image

send后,下面会出现返回结果。

picture.image

比如这种的返回结果则说明,请求的参数需要改成json格式。

03 jmeter

在服务通过后,通过jmeter或者locust测试多进程的并发量。首先介绍下jmeter,jmeter的安装需要先安装java,安装的教程可以参考下网上,在安装好jmeter后,如果在windows系统上,可以通过界面来操作,这里主要介绍下在linux系统下,需要写jmx的脚本,脚本已经放在后台。写好jmx的脚本后,可以通过命令行:


              
              
                
 
 
 

jmeter -n -t fire_detector.jmx -l test.jtl


 
 
 
              
            

将结果保存在test.jtl中,同时可以通过linux界面:

picture.image

查看大概的并发量。关于jmeter的具体使用,一般测试的小伙伴会更关注一点,对于算法工程师知道有这个工具即可。

03 locust

locust 相对于jmeter,个人感觉更好用一点,不管是在环境配置,以及在编写测试脚本方面,对算法工程师更友好些。locust的安装可以看下网上,后面则是编写locustfile.py脚本:


              
              
                  

                
                  
                    
 
 
 

import timeimport requestsfrom locust import HttpUser, task, betweenimport datetimeimport timeimport numpy as npimport jsonimport osimport base64import globimport timeimport numbersimport numpy as npimport requestsdef load_bytes(filename, use_base64: bool = False) -> bytes: """Open the file in bytes mode, read it, and close the file. References: pathlib.Path.read_bytes """ with open(filename, 'rb') as f: data = f.read() if use_base64: data = base64.b64encode(data) return dataimage_name = "0124.jpg"import jsonimage_base64 = load_bytes(image_name, use_base64=True)post_json = { "request_id": "012345qwertyuiopasdfghjklzxcvbnm", "data":image_base64.decode(), "data_fmt": "jpg", "params":"{\"nc_thresh\": 0.7,\"zasi_thresh\": 0.0001,\"zzj_thresh\": 0.0001",}class QuickstartUser(HttpUser): wait_time = between(0, 0) @task(1) def demo(self): header = {"Content-Type": "application/json"} # { "Content-Type":"application/json"} req = self.client.post("/api",json=post_json ,headers=header, verify=False) if req.status_code == 200: print("success") # print('Response:', req.text) else: print("fails")


 
 
 
                  
                
              
            

通过命令行:


              
              
                
 
 
 

                                
                                    

                                    

                                    

                                    

                                    

                                    

                                   locust -f locustfile.py --host=http://xx.x.x.xx
                                  :
                                  xxxx
                                   --headless -u 
                                  50
                                   -r 
                                  5
                                   -t 
                                  120
                                  s --csv 
                                  10
                                  \_
                                  
   

 
                                
                              

 
 
 
              
            

可以通过linux界面:

picture.image

看到吞吐量在9左右。以上则为算法的服务部署以及算法端的自测,具体的整个稳定性等的测试,需要测试工程师专门的测试。

picture.image

[picture.image

一文全览 | 全览iPhone 12就可以实时推理的移动端ViT](https://mp.weixin.qq.com/s?__biz=MzU5OTA2Mjk5Mw==&mid=2247511332&idx=1&sn=3744ca7fb7d63fe5a19f6d6c190a1985&chksm=feb84d9ac9cfc48c132c14cc27098c734a5796a07f3331f3fbf80a5c076591e15c1f6b15ac1c&scene=21#wechat_redirect)

[picture.image

SAM-Med | 英伟达基于SAM提出医学图像标注效率神器](https://mp.weixin.qq.com/s?__biz=MzU5OTA2Mjk5Mw==&mid=2247511307&idx=1&sn=00619c7c77ab4e92ec531a7ac657320b&chksm=feb84db5c9cfc4a391050f63d844b9d7de380bd14b0be19e55aa3be963e9b2c3ce81383d3801&scene=21#wechat_redirect)

[picture.image

SegNetr来啦 | 超越UNeXit/U-Net/U-Net++/SegNet,精度更高模型更小的UNet家族](https://mp.weixin.qq.com/s?__biz=MzU5OTA2Mjk5Mw==&mid=2247511264&idx=1&sn=0c22c0554ce2c54132e3ae7220147ec8&chksm=feb84c5ec9cfc54815cac62bb48c976d1f60837b6f56940e286f5c65ac8aa6f5ffa1a9db81e9&scene=21#wechat_redirect)

扫码加入👉「集智书童」交流群

(备注: 方向+学校/公司+昵称 )

picture.image

picture.image

picture.image

picture.image

picture.image

picture.image

想要了解更多:

前沿AI视觉感知全栈知识👉「分类、检测、分割、关键点、车道线检测、3D视觉(分割、检测)、多模态、目标跟踪、NerF」

行业技术方案 👉「AI安防、AI医疗、AI自动驾驶」

AI模型部署落地实战 👉「CUDA、TensorRT、NCNN、OpenVINO、MNN、ONNXRuntime以及地平线框架」

欢迎扫描上方二维码,加入「集智书童-知识星球 」,日常分享论文、学习笔记、问题解决方案、部署方案以及全栈式答疑,期待交流!

免责声明

凡本公众号注明“来源:XXX(非集智书童)”的作品,均转载自其它媒体,版权归原作者所有,如有侵权请联系我们删除,谢谢。

点击下方“阅读原文 ”,

了解更多AI学习路上的 「武功秘籍」

0
0
0
0
评论
未登录
暂无评论