- Logesh Krishnan
- July 6, 2022
TensorFlow Serving of AI models with Celery Workers
Introduction:
As AI applications proliferate across industries, it is important to develop strategies to deploy these applications with high performance and at scale. With the help of Python, Celery and TensorFlow Serving, it is possible to easily deploy Machine Learning models as well as send tasks asynchronously to multiple workers using the Celery task queueing mechanism.
Please refer Use of Celery in Serverless systems for scaling AI workloads and A Hands-on Guide to Backend Performance Optimization of a SaaS platform for scalable AI workloads for two other articles related to Celery.
What is Celery?
Celery is an open source asynchronous distributed message passing system. It is a simple, flexible and reliable system which can process large quantities of messages in real time. It also supports task queuing mechanisms enabling us to use multiple workers simultaneously.
What is TensorFlow Serving?
TensorFlow Serving (or TF Serving) is a flexible, high-performance serving system for machine learning models, designed for production environments. It allows you to safely deploy new models and run experiments while maintaining the same server architecture and APIs. TF Serving is ideal for running multiple models at a large scale making the process of deploying these models into on-premise or cloud-based servers easier and faster.
Prerequisites
In this article, we describe the process of creating a basic Celery based task class which will send prediction requests to a TF Serving Docker (with an example). For that, we’ll start with creating a virtual environment in python and pip install the following packages
- Celery
- Tensorflow
- Redis
To deploy TF Serving and Redis in a docker you need to install Docker in the system.
- The procedure to install Docker Engine in Ubuntu is available here.
- The source code for this example is available here.
Docker based Redis deployment
In this example, we’re going to use Redis data store as a message broker and backend system for Celery. It helps Celery to store messages and to send them to workers when required and also to store the results produced. Redis can be easily deployed using docker using the following command.
“` docker run -d -p 6379:6379 redis“`
Docker based TF serving deployment
Before we see how to deploy TF serving using docker, we have to train a model and save in a SavedModel format. In this example, we will make use of the MNIST dataset from tensorflow.keras, load the train and test images, and then scale and reshape them to fit the model.
Let’s create a simple model with a few layers, compile and fit it with the training data and once the training is done we can evaluate it. Finally, lets save the model inside models/simple_model/1 directory. Refer train.py in the given repo. The ‘1’ in the directory path refers to the version of the model.
As the next step, we’ll create a simple config file with model name, path and platform. This file will be given as a parameter to TF Serving for it to load the models. The following is the format required:
model_config_list {
config {
name: ‘simple_model’
base_path: ‘/models/simple_model’
model_platform: ‘tensorflow’
}
}
Finally let’s deploy TF Serving Docker. One of the easiest ways to start TF Serving is to deploy using Docker. If you’re using TF Serving Docker for the first time, docker pull command can be used, first, to pull the latest TF Serving image available.
“`docker pull tensorflow/serving“`
Next, we can run the TF Serving Docker with port number and model location specified using the docker run command.
“`docker run -t –rm -p 8501:8501 -v “$(pwd)/models/:/models/” tensorflow/serving –model_config_file=/models/models.config“`
You are now done deploying models in a TF Serving Docker.
How to send data and receive a response from TF Serving
Now let’s focus on creating a Celery based Task class which will contain a predict function to post a request to TF Serving Docker and receive a response containing predictions. This whole process will be taken care of by Celery.
class PredTask(Task):
“””Celery task class for tf serving predictions”””
def __init__(self):
super().__init__()
def predict(self, image, model):
prediction_url = f’http://localhost:8501/v1/models/{model}:predict’
data = json.dumps({“signature_name”: “serving_default”, “instances”: image})
headers = {“content-type”: “application/json”}
json_response = requests.post(prediction_url, data=data, headers=headers)
predictions = json.loads(json_response.text)[‘predictions’]
return predictions
def run(self, data, model):
preds = self.predict(data, model)
return preds
In the code above, we create a class called “PredTask” which inherits the Task imported from the Celery package; this way we’ll able to make use of Celery functions. Inside predict method we write the URL where our TF Serving is being hosted, then we create a request in json format with the signature name and instances as parameters. Then, we mention the headers with default values.
Once the prediction is done we get the response back; these will be loaded using json.loads and we index for “predictions” containing the actual results. Finally, we return the predictions. We take the image to be predicted and model name as inputs. This way we can request for predictions from other models if they are deployed.
An important part of this class is the run method; we use this method to instruct Celery on what to do when a task is called. We call the predict function created inside the run method, get the predictions and return them. We get the same image and name as the input to this function so that when the task is called we can give them as inputs.
We’ll add this script inside the “celery_task” directory and name it tasks.py. It is important to follow a certain directory structure as they need to be specified while initializing a Celery application. This is one of the ways to create a Celery task; we can also create a Celery task simply by adding @app.task decorator to any function. Using this method, we can add more functionalities if needed.
Initialize and register a Celery task
In this part, we’ll understand how we can initialize a Celery app and register the task class we created in the previous step.
from __future__ import absolute_import
from celery import Celery
from celery_task.tasks import PredTask
app = Celery(‘celery_task’,
broker=”redis://”,
backend=”redis://”,
include=[‘celery_task.tasks’])
predict_task = app.register_task(PredTask)
if __name__ == ‘__main__’:
app.start()
We’ll start by importing the required packages. Next, we’ll initialize a Celery app by adding a few parameters – the name of the working directory, broker URL, backend URL and include the location of the task class. “celery_task” as the first parameter will help Celery to initialize the application when called from the terminal. “redis://” is the URL for accessing the Redis Docker; we’re using Redis as both backend and broker for Celery. Also, we add the location of the task class.
This script is also added inside “celery_task” directory named “celery_app.py”. Let’s also create a “__init__.py” file with an import “from .celery_app import app as celery_app” to avoid import issues while starting Celery. Finally let’s start Celery with the following command.
“`celery -A celery_task worker –loglevel info –pool threads“`
Calling tasks
Now that we’re ready to execute tasks with Celery, let’s create a simple inference script. Similar to training we can load the MNIST dataset from Tensorflow and create some test images. In order to call tasks with Celery, we need to import the registered task from “celery_app.py” and use the delay method with the required inputs (data and model name). The delay method will help execute the task in Celery which in turn will send a prediction request to TF Serving. Once TF Serving gives the prediction back to Celery, we can use that by applying .get() on the result from delay.
Conclusion
In this example, we created a simple Celery application and registered a task which executes a prediction task with a model loaded via TF Serving. We can make use of Celery and TF Serving to execute multiple tasks asynchronously with multiple models loaded simultaneously to create high performance and scaled AI inference systems.