Three different ML architectures for real-time inference in Python

Paddy Green
7 min readSep 14, 2024

--

Many MLOps frameworks can encourage architectures that won’t offer the best performance and cost when it comes to deploying your model. In this article I will describe three different ways you can deploy a model yourself that will better suit how it’s used.

#1 Monolithic Architecture

For high frequency and timely requirements hosting you model in a multi threaded framework like Spark, that is streaming your requests through kafka, can be cheaper and faster for services running 24/7.

Fight tracking model

Here’s how to host an MLFlow model with EMR on AWS

Using MLFlow in Spark

First initialise the model outside of the spark session so that it’s available as a method for all the executors to run predictions.

client = mlflow.tracking.MlflowClient()
model_version = client.get_latest_versions(
config.model, stages=["production"]
)
model_uri = client.get_model_version_download_uri(
name=config.model, version=model_version
)

After writing a very simple streaming job that reads and writes to kafka you’ll need to create a udf to run your model in the spark executors and call it from .withColumn. To reduce the need for model changes in spark you can save your model as a pyfunc which will allow you to add additional transformations to the model and save them to MLFlow.

model = mlflow.pyfunc.load_model(model_uri=model_uri) 

Spark streaming pandas UDF is optimised to pass batches of json input rather than rows, utilising any vectorisation you may have coded into the model and improving performance. Below is an example of how to set it up.

@pandas_udf(StringType(), PandasUDFType.SCALAR)
def model_caller(df : pd.Series) -> pd.Series:
df = df.to_frame()
df = json_normalize(df['_0'].apply(json.loads), max_level = 1)
df = model.predict(df)
df = df.apply(lambda row: json.dumps(row.to_dict()).encode("utf-8"), axis=1)
return df
...
df.withColumn("output",model_caller("input").cast("string"))

Spark Optimisations

Add the below config to benefit from the arror optimised udf released in spark 3.5.

"spark.sql.execution.pythonUDF.arrow.enabled": True

Also with a stateless spark streaming job without aggregations you can avoid checkpointing and get further speed improvements by tracking where you are up to in kafka yourself. By writing a custom StreamingQueryListener and adding the listener to your spark session you can disable the spark managed checkpointing in your pipeline and speed the app up by a second.

from pyspark.sql.streaming import StreamingQueryListener
from pyspark.sql.streaming.listener import *

class kafka_offset_committer(StreamingQueryListener):
def __init__(self, offsets_path):
self.offsets_path = offsets_path

def onQueryStarted(self, event: QueryStartedEvent) -> None:
pass

def onQueryProgress(self, event: QueryProgressEvent) -> None:
offsets = event.progress.sources[0].endOffset
f = open(self.offsets_path, 'w+')
f.write(offsets)
f.close()

def onQueryTerminated(self, event: QueryTerminatedEvent) -> None:
pass

def onQueryIdle(self, event: QueryIdleEvent) -> None:
pass
...

spark_session.streams.addListener(kafka_offset_committer)

Managing your Spark Service with EMR

As you release new versions of your model your spark streaming job will also need restarting which can be done easily when you use EMR. To achieve this without downtime you can orchestrate 2 deployments running in parallel during the upgrade. This will ensure predictions are still produced as each EMR redeployment can cause 5 minutes of downtime.

You can deploy with AWS CLI and may want to use a stateful orchestration tool like Airflow or Kubernetes so you can remember the current jobid.state and can cancel the correct job afterwards. The bash code I used to start and stop the job is below:

if [ -f storage/jobid.state ];
then
/usr/local/bin/aws emr-containers cancel-job-run --virtual-cluster-id $VIRTUAL_CLUSTER_ID --id $(cat storage/jobid.state);
fi
/usr/local/bin/aws emr-containers start-job-run --cli-input-json file://./conf/job-spec.json | jq -r '.id' > storage/jobid.state

You also will need to define a job spec for your spark streaming app, see this EMR doc for details on using EMR:

For more tips on Spark Streaming and also how to add stateful feature engineering upstream of your model see this link

#2 Microservice Architecture

For easy model management and large numbers of requests, hosting your model in a microservice may offer the best resilience and price vs latency trade off.

Green camera ball tracking model

Here’s how to run an MLFlow Server in Kubernetes

Saving your model

Assuming you’ve setup a MLFlow Registry, to be able to serve you model when you initially publish it to the registry you’ll first need to define it’s environment in this case I’ve used a conda.yaml. You’ll also need to publish your model with an appropriate “predict” function so it knows the entry point your requests should reach. You should check your MLFlow UI for these configurations before moving forward.

Click on an Experiment and then on Artifacts to see this MLmodel config

Deploying from a model registry

To test this locally you can use these three steps below in a conda terminal with the python mlflow[extras] and flask packages installed:

Set envs

export MLFLOW_TRACKING_URI=https://mlflow.....
export MLFLOW_TRACKING_USERNAME=...
export MLFLOW_TRACKING_PASSWORD=....

Deploy

mlflow models serve --env-manager conda -m "models:/<name>/latest" -p 5000 -h 0.0.0.0

Test

curl http://0.0.0.0:5000/invocations -H 'Content-Type: application/json' -d '{"dataframe_split":{"index":[...],"columns":[...],"data":[...]}}'

If you’re happy with the results then deploy it to you cluster using a Kubernetes Deployment along with a Kubernetes Service to make traffic available on the correct port. You can also deploy multiple replicas with a RollingUpdate to prevent downtime when releasing new models.

apiVersion: apps/v1
kind: Deployment
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
template:
containers:
- name: serve
image: <YOUR TRAINING CODES IMAGE>
ports:
- containerPort: 5000
imagePullPolicy: Always
securityContext:
allowPrivilegeEscalation: false
tty: false
env:
- name: MLFLOW_TRACKING_URI
...
- name: MLFLOW_TRACKING_USERNAME
...
- name: MLFLOW_TRACKING_PASSWORD
...
- name: MODELS_SERVED
value: #####
- name: MODELS_VERSION
value: #####
- name: MODELS_PORT
value: 5000
command:
- /bin/bash
- "-vx"
- "-c"
- |
/bin/bash <<'EOF'
mlflow models serve --env-manager conda -m "models:/$MODELS_SERVED/$MODELS_VERSION" -p $MODELS_PORT -h 0.0.0.0
EOF

Autoscaling with NGINX

Deploying a nginx load balancer in front of the mlflow servers ensures balanced traffic amongst the model deployments and also allows you to set up connection driven horizontal autoscaling. An example of how to configure this using Keda is below:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: #########
namespace: #########
spec:
pollingInterval: 30
minReplicaCount: <MIN_REPLICAS>
maxReplicaCount: <MAX_REPLICAS>
scaleTargetRef:
name: #########
apiVersion: apps/v1
kind: Deployment
triggers:
- type: prometheus
serverAddress: http://prometheus-server.monitoring:80
metricName: nginx_ingress_nginx_connections_active
threshold: '100'
query: sum(nginx_ingress_nginx_connections_active{class="#####"})

For more information on MLFlow serve see this link

#3 Managed Architecture

For low frequency and less time dependent requests you may want to host your model in the cloud and make requests with cloud function. Here it can be easily scaled to zero when not in use and can be dramatically cheaper when it’s not required 24/7.

Snooker ball detection model

Here’s how to run a TensorFlow/Keras model in AI Platform on GCP

Deploying the model

To deploy a Tensorflow model in AI Platform the model first needs to be saved in a saved model format. The greatest challenge here is ensuring that you can pass whatever you have defined as input as a json object. As I am passing an image I’ve serialised the image as a string.

inputdata = tf.placeholder(
dtype=tf.string, shape=[None], name="input_1"
)
flatinputdata = tf.reshape(inputdata, [])
reform = tf.io.decode_jpeg(tf.io.decode_base64(flatinputdata))
image_data = tf.cast(
tf.reshape(reform, (1280, 1280, 3)),
tf.float32)
readydata = tf.expand_dims(tf.divide(image_data, 255), 0)
inputkeras = Input(tensor=readydata)

When saving the model you must include signature definitions like the input, output and method_name as these are required for serving the model.

graphdef_inf = tf.graph_util.remove_training_nodes(
graph.as_graph_def())
graphdef_frozen = tf.graph_util.convert_variables_to_constants(
session,
graphdef_inf,
[output0.op.name, output1.op.name, output2.op.name]
)
builder = tf.saved_model.builder.SavedModelBuilder(export_path)

prediction_signature = (
tf.saved_model.signature_def_utils.build_signature_def(
inputs={
'images':tf.saved_model.utils.build_tensor_info(input0)
},
outputs={
'scores':tf.saved_model.utils.build_tensor_info(output1),
'boxes': tf.saved_model.utils.build_tensor_info(output0),
'classes': tf.saved_model.utils.build_tensor_info(output2)
},
method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME
)
)

builder.add_meta_graph_and_variables(
session,
[tf.saved_model.tag_constants.SERVING],
signature_def_map = {
'predict_images': prediction_signature,
},
main_op=tf.tables_initializer()
)
builder.save()

Once the model is in this saved format upload it to Google Cloud Storage and follow the steps in the AI Platform UI to create the model.

Testing

Finally to do a prediction you need to match the format that the model requires and wrap that input as a list under "instances”.

jpeg_bytes = base64.b64encode(
open(file='input.jpg', mode="rb").read()
).decode('utf-8')

credentials = GoogleCredentials.get_application_default()
service = discovery.build('ml', 'v1', credentials=credentials)
name = 'projects/{}/models/{}'.format(PROJECT_NAME, MODEL_NAME)
name += '/versions/{}'.format('initial')
body = {
"signature_name": "predict_images",
"instances": [
{"images": re.sub("\+", "-", re.sub("/", "_", "%s" % jpeg_bytes))}
]}
response = service.projects().predict(
name=name,
body=body
).execute()

For more tips on AI Platform and how to scale to zero and schedule predictions in a serverless way see this link

Conclusion

For ultra low latency host the model in the same application you plan to make requests from using Spark or an alternative parallel processing framework. To run a handful of requests cheaply use Google Cloud and utilise their managed services. And for serving requests at scale with relaxed speed requirements host a cluster of MLFlow serve or similar model server in Kubernetes.

--

--

No responses yet