Skip to content

Commit

Permalink
Merge pull request #5 from HughWen/dev
Browse files Browse the repository at this point in the history
v0.2.0
  • Loading branch information
markwwen authored Feb 22, 2021
2 parents c7c4fa9 + 19f1710 commit 3e086f5
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 34 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
### 0.2.0

- change the input type of web agent from single item to batch item
- support model collection

### 0.1.0

- serialize the request data
- put data into redis broker
- get and process data from redis broker then put back
- web server gets output
37 changes: 29 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@ A middleware for model serving to speedup online inference.

<h2 align="center">What is Serving Agent</h2>


Serving Agent is designed as a middleware for model serving between web server and model server to help the server improve the GPU utilization
then speedup online inference.
For the service with machile learning model, the requests from the client are usually streaming.
To utilize the parallel computing capability of GPUs, we usually import a message queue/message broker to cache the request from web server then batch process with model server (the below figure shows the architecture). Serving Agent encapsulates the detial actions that such as serialize the request data, communicate with message queue (redis) and deserialization and more over. With Serving Agent, it is easy to build a scalable service with serveral codes.

![model serving architecture](img/architecture.png)
<!-- ![model serving architecture](img/architecture.png) -->
<div align="center">
<img src="img/architecture.png"/>
</div>

<h2 align="center">Changelog</h2>

[Changelog](./CHANGELOG.md)


<h2 align="center">Installation</h2>

Expand Down Expand Up @@ -46,7 +53,7 @@ from serving_agent import ModelAgent
from example.TestModel import TestModel

if __name__ == "__main__":
model_agent = ModelAgent(redis_broker='localhost:6379', redis_queue='example', model_class=TestModel)
model_agent = ModelAgent(redis_broker='localhost:6379', redis_queue='example', model_class=TestModel, collection=True, collection_limit=24000)
model_agent.run()
```

Expand All @@ -60,25 +67,39 @@ python -m example.run_model_server
from serving_agent import WebAgent
from flask import Flask, jsonify, request


app = Flask(__name__)
web_agent = WebAgent(redis_broker='localhost:6379', redis_queue='example')


@app.route('/api/test', methods=['POST'])
def test():
parmas = request.get_json()
data = parmas['data']
result = web_agent.process(data)
return jsonify({'data': result})
data = parmas['data'] # input batch
results = web_agent.process(data)
return jsonify({'data': results})


if __name__ == '__main__':
app.run(debug=True)

```

```shell
```
python -m example.run_web_server
```

4. Test the server.

```
curl --location --request POST 'http://127.0.0.1:5000/api/test' \
--header 'Content-Type: application/json' \
--data-raw '{
"data": [
"hello",
"world"
]
}'
```


Congratulate! You have developed a scalable sevice in serveral minutes!
27 changes: 20 additions & 7 deletions README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,47 +36,60 @@ class TestModel:
return [random.random() for x in inputs]
```

1. 开发一个 model server [run_model_server.py](./example/run_model_server.py) 并运行.
2. 开发一个 model server [run_model_server.py](./example/run_model_server.py) 并运行

```python
from serving_agent import ModelAgent

from example.TestModel import TestModel

if __name__ == "__main__":
model_agent = ModelAgent(redis_broker='localhost:6379', redis_queue='example', model_class=TestModel)
model_agent = ModelAgent(redis_broker='localhost:6379', redis_queue='example', model_class=TestModel, collection=True, collection_limit=24000)
model_agent.run()
```

```shell
python -m example.run_model_server
```

1. 使用 Flask 开发一个 web server (或任何别的 Python web 框架都可以) 并启动.
3. 使用 Flask 开发一个 web server (或任何别的 Python web 框架都可以) 并启动

```python
from serving_agent import WebAgent
from flask import Flask, jsonify, request


app = Flask(__name__)
web_agent = WebAgent(redis_broker='localhost:6379', redis_queue='example')


@app.route('/api/test', methods=['POST'])
def test():
parmas = request.get_json()
data = parmas['data']
result = web_agent.process(data)
return jsonify({'data': result})
data = parmas['data'] # input batch
results = web_agent.process(data)
return jsonify({'data': results})


if __name__ == '__main__':
app.run(debug=True)

```

```shell
python -m example.run_web_server
```

4. 测试服务。

```shell
curl --location --request POST 'http://127.0.0.1:5000/api/test' \
--header 'Content-Type: application/json' \
--data-raw '{
"data": [
"hello",
"world"
]
}'
```

恭喜!你已经在几分钟内就开发了一个高性能的模型服务!
1 change: 1 addition & 0 deletions example/TestModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

class TestModel:
def __init__(self):
# self.a = [i for i in range(100000000)]
pass

def predict(self, inputs):
Expand Down
2 changes: 1 addition & 1 deletion example/run_model_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
from example.TestModel import TestModel

if __name__ == "__main__":
model_agent = ModelAgent(redis_broker='localhost:6379', redis_queue='example', model_class=TestModel)
model_agent = ModelAgent(redis_broker='localhost:6379', redis_queue='example', model_class=TestModel, collection=True, collection_limit=24000)
model_agent.run()
6 changes: 3 additions & 3 deletions example/run_web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
@app.route('/api/test', methods=['POST'])
def test():
parmas = request.get_json()
data = parmas['data']
result = web_agent.process(data)
return jsonify({'data': result})
data = parmas['data'] # input batch
results = web_agent.process(data)
return jsonify({'data': results})


if __name__ == '__main__':
Expand Down
26 changes: 25 additions & 1 deletion serving_agent/model_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@


class ModelAgent:
def __init__(self, redis_broker='localhost:6379', redis_queue='broker', model_class=None, model_config={}, batch_size=64, model_sleep=0.1):
def __init__(
self,
redis_broker='localhost:6379',
redis_queue='broker',
model_class=None,
model_config={},
batch_size=64,
model_sleep=0.1,
collection=False,
collection_limit=6000,
):
parse = lambda x: {'host': x.split(':')[0], 'port': int(x.split(':')[1])}
self.db = redis.StrictRedis(**parse(redis_broker))
self.redis_queue = redis_queue
Expand All @@ -14,19 +24,33 @@ def __init__(self, redis_broker='localhost:6379', redis_queue='broker', model_cl
self.model_config = model_config
self.batch_size = batch_size
self.model_sleep = model_sleep
self.collection = collection
self.collection_limit = collection_limit

def run(self, pre_process=lambda x: x, post_process=lambda x: x):
model = self.model_class(**self.model_config)
mq_miss = 0
print('model init')
while True:
with self.db.pipeline() as pipe:
pipe.lrange(self.redis_queue, 0, self.batch_size - 1)
pipe.ltrim(self.redis_queue, self.batch_size, -1)
queue, _ = pipe.execute()
if queue:
mq_miss = 0
if not model:
model = self.model_class(**self.model_config)
messages = [pickle.loads(x) for x in queue]
keys = [message.get('key') for message in messages]
model_inputs = [pre_process(message.get('model_input')) for message in messages]
results = [post_process(x) for x in model.predict(model_inputs)]
self.db.mset({key: pickle.dumps(result) for key, result in zip(keys, results)})
else:
mq_miss += 1
if mq_miss and mq_miss % self.collection_limit == 0:
mq_miss = 0
if self.collection and model:
model = None
print('model is collected')

time.sleep(self.model_sleep)
32 changes: 21 additions & 11 deletions serving_agent/web_agent.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,42 @@
import uuid
import time
import pickle
from typing import List

import redis


class WebAgent:
def __init__(self, redis_broker='localhost:6379', redis_queue='broker', web_sleep=0.1, max_tries=6000):
def __init__(self, redis_broker='localhost:6379', redis_queue='broker', web_sleep=0.1, max_tries=6000, unpickling=True):
parse = lambda x: {'host': x.split(':')[0], 'port': int(x.split(':')[1])}
self.db = redis.StrictRedis(**parse(redis_broker))
self.redis_queue = redis_queue
self.web_sleep = web_sleep
self.max_tries = max_tries
self.unpickling = unpickling

def process(self, batch: List) -> List:
"""
input a batch, send items to redis broker and polling
"""
keys = [str(uuid.uuid4()) for _ in range(len(batch))]
with self.db.pipeline() as pipe:
for key, model_input in zip(keys, batch):
message = {'key': key, 'model_input': model_input}
pipe.rpush(self.redis_queue, pickle.dumps(message))
pipe.execute()

def process(self, model_input):
key = str(uuid.uuid4())
message = {'key': key, 'model_input': model_input}
self.db.rpush(self.redis_queue, pickle.dumps(message))
num_tries = 0
# polling the redis broker
num_tries = 0
while num_tries < self.max_tries:
time.sleep(self.web_sleep)
num_tries += 1
result = self.db.get(key)
if result:
model_output = pickle.loads(result)
self.db.delete(key)
return model_output
outputs = self.db.mget(keys)
if None not in outputs:
self.db.delete(*keys)
if self.unpickling:
outputs = [pickle.loads(x) for x in outputs]
return outputs
else:
return None

6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

setup(
name='serving_agent',
version="0.1.0",
version="0.2.0",
description='A middleware for model serving to speedup online inference.',
author="wwen",
author_email="[email protected]",
long_description=long_description,
long_description_content_type="text/markdown",
# long_description=long_description,
# long_description_content_type="text/markdown",
classifiers=[
'Development Status :: 3 - Alpha',
'Programming Language :: Python :: 3.5',
Expand Down

0 comments on commit 3e086f5

Please sign in to comment.