基于Sanic的微服务基础架构

技术

picture.image

介绍

使用python做web开发面临的一个最大的问题就是性能,在解决C10K问题上显的有点吃力。有些异步框架Tornado、Twisted、Gevent 等就是为了解决性能问题。这些框架在性能上有些提升,但是也出现了各种古怪的问题难以解决。

在python3.6中,官方的异步协程库asyncio正式成为标准。在保留便捷性的同时对性能有了很大的提升,已经出现许多的异步框架使用asyncio。

使用较早的异步框架是aiohttp,它提供了server端和client端,对asyncio做了很好的封装。但是开发方式和最流行的微框架flask不同,flask开发简单,轻量,高效。

微服务是最近最火开发模式,它解决了复杂性问题,提高开发效率,便于部署等优点。

正是结合这些优点, 以Sanic为基础,集成多个流行的库来搭建微服务。 Sanic框架是和Flask相似的异步协程框架,简单轻量,并且性能很高。

本项目就是以Sanic为基础搭建的微服务框架。

特点

  • 使用sanic异步框架,简单,轻量,高效。
  • 使用uvloop为核心引擎,使sanic在很多情况下单机并发甚至不亚于Golang。
  • 使用asyncpg为数据库驱动,进行数据库连接,执行sql语句执行。
  • 使用aiohttp为Client,对其他微服务进行访问。
  • 使用peewee为ORM,但是只是用来做模型设计和migration。
  • 使用opentracing为分布式追踪系统。
  • 使用unittest做单元测试,并且使用mock来避免访问其他微服务。
  • 使用swagger做API标准,能自动生成API文档。

使用

项目地址


      1. `sanic-ms:`https://github.com/songcser/sanic-ms``
2. 
3. `Example:`https://github.com/songcser/sanic-ms/tree/master/examples``


    

Swagger API

picture.image

Zipkin Server

picture.image

picture.image

服务端

使用sanic异步框架,有较高的性能,但是使用不当会造成blocking, 对于有IO请求的都要选用异步库。添加库要慎重。 sanic使用uvloop异步驱动,uvloop基于libuv使用Cython编写,性能比nodejs还要高。

功能说明

启动前


      1. `@app.listener('before_server_start')`
2. `async def before_srver_start(app, loop):`
3. `queue = asyncio.Queue()`
4. `app.queue = queue`
5. `loop.create_task(consume(queue, app.config.ZIPKIN_SERVER))`
6. `reporter = AioReporter(queue=queue)`
7. `tracer = BasicTracer(recorder=reporter)`
8. `tracer.register_required_propagators()`
9. `opentracing.tracer = tracer`
10. `app.db = await ConnectionPool(loop=loop).init(DB_CONFIG)`


    
  • 创建DB连接池
  • 创建Client连接
  • 创建queue, 消耗span,用于日志追踪
  • 创建opentracing.tracer进行日志追踪

中间件


      1. `@app.middleware('request')`
2. `async def cros(request):`
3. `if request.method == 'POST' or request.method == 'PUT':`
4. `request['data'] = request.json`
5. `span = before_request(request)`
6. `request['span'] = span`
7. `@app.middleware('response')`
8. `async def cors_res(request, response):`
9. `span = request['span'] if 'span' in request else None`
10. `if response is None:`
11. `return response`
12. `result = {'code': 0}`
13. `if not isinstance(response, HTTPResponse):`
14. `if isinstance(response, tuple) and len(response) == 2:`
15. `result.update({`
16. `'data': response[0],`
17. `'pagination': response[1]`
18. `})`
19. `else:`
20. `result.update({'data': response})`
21. `response = json(result)`
22. `if span:`
23. `span.set_tag('http.status_code', "200")`
24. `if span:`
25. `span.set_tag('component', request.app.name)`
26. `span.finish()`
27. `return response`


    
  • 创建span, 用于日志追踪
  • 对response进行封装,统一格式

异常处理

对抛出的异常进行处理,返回统一格式

任务

创建task消费queue中对span,用于日志追踪

异步处理

由于使用的是异步框架,可以将一些IO请求并行处理

Example:


      1. `async def async_request(datas):`
2. `# async handler request`
3. `results = await asyncio.gather(*[data[2] for data in datas])`
4. `for index, obj in enumerate(results):`
5. `data = datas[index]`
6. `data[0][data[1]] = results[index]`
7. `@user_bp.get('/<id:int>')`
8. `@doc.summary("get user info")`
9. `@doc.description("get user info by id")`
10. `@doc.produces(Users)`
11. `async def get_users_list(request, id):`
12. `async with request.app.db.acquire(request) as cur:`
13. `record = await cur.fetch(`
14. `""" SELECT * FROM users WHERE id = $1 """, id)`
15. `datas = [`
16. `[record, 'city_id', get_city_by_id(request, record['city_id'])]`
17. `[record, 'role_id', get_role_by_id(request, record['role_id'])]`
18. `]`
19. `await async_request(datas)`
20. `return record`


    

getcitybyid, getrolebyid是并行处理。

相关连接

sanic: https://github.com/channelcat/sanic

模型设计 & ORM


      1. `Peewee
  
 is
  a simple 
 and
  small ORM. 
 It
  has few (but expressive) concepts, making it easy to learn 
 and
  intuitive to 
 use
 。`


    

ORM使用peewee, 只是用来做模型设计和migration, 数据库操作使用asyncpg。

Example:


      1. `# models.py`
2. `class Users(Model):`
3. `id = PrimaryKeyField()`
4. `create_time = DateTimeField(verbose_name='create time',`
5. `default=datetime.datetime.utcnow)`
6. `name = CharField(max_length=128, verbose_name="user's name")`
7. `age = IntegerField(null=False, verbose_name="user's age")`
8. `sex = CharField(max_length=32, verbose_name="user's sex")`
9. `city_id = IntegerField(verbose_name='city for user', help_text=CityApi)`
10. `role_id = IntegerField(verbose_name='role for user', help_text=RoleApi)`
11. `class Meta:`
12. `db_table = 'users'`
13. `# migrations.py`
14. `from sanic_ms.migrations import MigrationModel, info, db`
15. `class UserMigration(MigrationModel):`
16. `_model = Users`
17. `# @info(version="v1")`
18. `# def migrate_v1(self):`
19. `#     migrate(self.add_column('sex'))`
20. `def migrations():`
21. `try:`
22. `um = UserMigration()`
23. `with db.transaction():`
24. `um.auto_migrate()`
25. `print("Success Migration")`
26. `except Exception as e:`
27. `raise e`
28. `if __name__ == '__main__':`
29. `migrations()`


    
  • 运行命令 python migrations.py
  • migrate_v1函数添加字段sex, 在BaseModel中要先添加name字段
  • info装饰器会创建表migrate_record来记录migrate,version每个model中必须唯一,使用version来记录是否执行过,还可以记录author,datetime
  • migrate函数必须以migrate_开头

相关连接


      1. `peewee:http:
 //docs.peewee-orm.com/en/latest/`


    

数据库操作


      1. `asyncpg 
 is
  the fastest driver among common 
 Python
 , 
 NodeJS
  
 and
  
 Go
  implementations`


    

使用asyncpg为数据库驱动, 对数据库连接进行封装, 执行数据库操作。

不使用ORM做数据库操作,一个原因是性能,ORM会有性能的损耗,并且无法使用asyncpg高性能库。另一个是单个微服务是很简单的,表结构不会很复杂,简单的SQL语句就可以处理来,没必要引入ORM。使用peewee只是做模型设计

Example:


      1. `sql = "SELECT * FROM users WHERE name=$1"`
2. `name = "test"`
3. `async with request.app.db.acquire(request) as cur:`
4. `data = await cur.fetchrow(sql, name)`
5. `async with request.app.db.transaction(request) as cur:`
6. `data = await cur.fetchrow(sql, name)`


    
  • acquire() 函数为非事务,对于只涉及到查询的使用非事务,可以提高查询效率
  • tansaction() 函数为事务操作,对于增删改必须使用事务操作
  • 传入request参数是为了获取到span,用于日志追踪
  • TODO 数据库读写分离

相关连接


      1. `asyncpg:https://github.com/MagicStack/asyncpg`
2. 
3. `benchmarks:https://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/`


    

客户端

使用aiohttp中的client,对客户端进行了简单的封装,用于微服务之间访问。


      1. `Don’t create a session per request. Most likely you need a session per application which performs all requests altogether.`
2. `A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.`


    

Example:


      1. `@app.listener('before_server_start')`
2. `async def before_srver_start(app, loop):`
3. `app.client =  Client(loop, url='http://host:port')`
4. `async def get_role_by_id(request, id):`
5. `cli = request.app.client.cli(request)`
6. `async with cli.get('/cities/{}'.format(id)) as res:`
7. `return await res.json()`
8. `@app.listener('before_server_stop')`
9. `async def before_server_stop(app, loop):`
10. `app.client.close()`


    

对于访问不同的微服务可以创建多个不同的client,这样每个client都会keep-alives

相关连接


      1. `aiohttp:http:
 //aiohttp.readthedocs.io/en/stable/client.html`


    

日志 & 分布式追踪系统

使用官方logging, 配置文件为logging.yml, sanic版本要0.6.0及以上。JsonFormatter将日志转成json格式,用于输入到ES


      1. `Enter
  
 OpenTracing
 : 
 by
  offering consistent, expressive, vendor-neutral 
 APIs
  
 for
  popular platforms, 
 OpenTracing
  makes it easy 
 for
  developers to add (
 or
  
 switch
 ) tracing implementations 
 with
  an O(
 1
 ) configuration change. 
 OpenTracing
  also offers a lingua franca 
 for
  OSS instrumentation 
 and
  platform-specific tracing helper libraries. 
 Please
  refer to the 
 Semantic
  
 Specification
 .`


    

装饰器logger


      1. `@logger(type='method', category='test', detail='detail', description="des", tracing=True, level=logging.INFO)`
2. `async def get_city_by_id(request, id):`
3. `cli = request.app.client.cli(request)`


    
  • type: 日志类型,如 method, route
  • category: 日志类别,默认为app的name
  • detail: 日志详细信息
  • description: 日志描述,默认为函数的注释
  • tracing: 日志追踪,默认为True
  • level: 日志级别,默认为INFO

分布式追踪系统

  • OpenTracing是以Dapper,Zipkin等分布式追踪系统为依据, 建立了统一的标准。
  • Opentracing跟踪每一个请求,记录请求所经过的每一个微服务,以链条的方式串联起来,对分析微服务的性能瓶颈至关重要。
  • 使用opentracing框架,但是在输出时转换成zipkin格式。 因为大多数分布式追踪系统考虑到性能问题,都是使用的thrift进行通信的,本着简单,Restful风格的精神,没有使用RPC通信。以日志的方式输出, 可以使用fluentd, logstash等日志收集再输入到Zipkin。Zipkin是支持HTTP输入的。
  • 生成的span先无阻塞的放入queue中,在task中消费队列的span。后期可以添加上采样频率。
  • 对于DB,Client都加上了tracing

相关连接


      1. `opentracing:https://github.com/opentracing/opentracing-python`
2. 
3. `zipkin:https://github.com/openzipkin/zipkin`
4. 
5. `jaeger:https://uber.github.io/jaeger/`


    

API接口

api文档使用swagger标准。

Example:


      1. `from sanic_ms import doc`
2. `@user_bp.post('/')`
3. `@doc.summary('create user')`
4. `@doc.description('create user info')`
5. `@doc.consumes(Users)`
6. `@doc.produces({'id': int})`
7. `async def create_user(request):`
8. `data = request['data']`
9. `async with request.app.db.transaction(request) as cur:`
10. `record = await cur.fetchrow(`
11. `""" INSERT INTO users(name, age, city_id, role_id)`
12. `VALUES($1, $2, $3, $4, $5)`
13. `RETURNING id`
14. `""", data['name'], data['age'], data['city_id'], data['role_id']`
15. `)`
16. `return {'id': record['id']}`


    
  • summary: api概要
  • description: 详细描述
  • consumes: request的body数据
  • produces: response的返回数据
  • tag: API标签
  • 在consumes和produces中传入的参数可以是peewee的model,会解析model生成API数据, 在field字段的help_text参数来表示引用对象
  • http://host:ip/openapi/spec.json 获取生成的json数据

相关连接


      1. `swagger:https:
 //swagger.io/`


    

Response 数据

在返回时,不要返回sanic的response,直接返回原始数据,会在Middleware中对返回的数据进行处理,返回统一的格式,具体的格式可以[查看]

单元测试

单元测试使用unittest。 mock是自己创建了MockClient,因为unittest还没有asyncio的mock,并且sanic的测试接口也是发送request请求,所以比较麻烦. 后期可以使用pytest。

Example:


      1. `from sanic_ms.tests import APITestCase`
2. `from server import app`
3. `class TestCase(APITestCase):`
4. `_app = app`
5. `_blueprint = 'visit'`
6. `def setUp(self):`
7. `super(TestCase, self).setUp()`
8. `self._mock.get('/cities/1',`
9. `payload={'id': 1, 'name': 'shanghai'})`
10. `self._mock.get('/roles/1',`
11. `payload={'id': 1, 'name': 'shanghai'})`
12. `def test_create_user(self):`
13. `data = {`
14. `'name': 'test',`
15. `'age': 2,`
16. `'city_id': 1,`
17. `'role_id': 1,`
18. `}`
19. `res = self.client.create_user(data=data)`
20. `body = ujson.loads(res.text)`
21. `self.assertEqual(res.status, 200)`


    
  • 其中_blueprint为blueprint名称
  • 在setUp函数中,使用_mock来注册mock信息, 这样就不会访问真实的服务器, payload为返回的body信息
  • 使用client变量调用各个函数, data为body信息,params为路径的参数信息,其他参数是route的参数

代码覆盖


      1. `coverage erase`
2. `coverage run --source . -m sanic_ms tests`
3. `coverage xml -o reports/coverage.xml`
4. `coverage2clover -i reports/coverage.xml -o reports/clover.xml`
5. `coverage html -d reports`


    
  • coverage2colver 是将coverage.xml 转换成 clover.xml,bamboo需要的格式是clover的。

相关连接


      1. `unittest:https://docs.python.org/3/library/unittest.html`
2. `coverage:https://coverage.readthedocs.io/en/coverage-4.4.1/`


    

异常处理

使用 app.error_handler = CustomHander() 对抛出的异常进行处理

Example:


      1. `from sanic_ms.exception import ServerError`
2. `@visit_bp.delete('/users/<id:int>')`
3. `async def del_user(request, id):`
4. `raise ServerError(error='内部错误',code=10500, message="msg")`
5. `code: 错误码,无异常时为0,其余值都为异常`
6. `message: 状态码信息`
7. `error: 自定义错误信息`
8. `status_code: http状态码,使用标准的http状态码`


    

本文作者:宋吉义 1月29日来稿

GitHub ID:songcser

picture.image

picture.image

点击阅读原文,加入CodingGo编程社区 更多阅读请点击:

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
DataSail CDC 数据整库实时入仓入湖实践
在线数据库数据导入到数仓分析的链路已经存在多年,随着近年来实时计算的发展,业务希望有延迟更低、运维更便捷、效率更高的CDC同步通道。本次分享主要介绍DataSail实现CDC整库实时同步的技术方案和业务实践。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论