A Batch Job ML Model Deployment

Deploying an ML Model In an ETL Job

python code


In previous blog posts I showed how to develop an ML model in such a way that makes it easy to deploy, and I showed how to create a web app that is able to deploy any model that followed the same design pattern. However, not all deployments of ML model are deployed within web apps. In this blog post I deploy the same model used in the previous blog posts as an ETL job.

Bonobo for ETL Jobs

The bonobo package is a python package for writing ETL jobs, offering a simple pythonic interface for writing code that loads, transforms, and saves data. The package works well for small datasets that can be processed in single processes, but not as useful for larger datasets. Nevertheless, the package is perfect for small scale data processing. The package has a strong object-oriented bend to it and it encourages good software engineering best practices through a well-designed API.

ETL Application

To develop the ETL application with the Bonobo package I first set up the project structure

- data ( folder for test data )
- model_etl ( folder for application code)
- __init__.py
- etl_job.py
- graph.py
- model_node.py
- s3_etl_job.py
- tests ( folder for unit tests )
- .gitignore
- Makefile
- requirements.txt
- setup.py
- test_requirements.txt

MLModelTransformer Class

Running a machine learning model prediction step inside an ETL DAG requires many of the same things as running a model inside a web application. In the previous blog post we managed instances of MLModel classes inside a ModelManager singleton object. The ModelManager object was used by the web application to maintain a list of MLModel objects, and returned information about them on request.

class MLModelTransformer(object):
def __init__(self, module_name, class_name):
model_module = importlib.import_module(module_name)
model_class = getattr(model_module, class_name)
model_object = model_class()
if isinstance(model_object, MLModel) is False:
raise ValueError(“The MLModelNode can only hold references to objects of type MLModel.”)
self.model = model_object
def __call__(self, data):
yield self.model.predict(data=data)
except MLModelSchemaValidationException as e:
raise e
pip install git+https://github.com/schmidtbri/ml-model-abc-improvements
>>> from model_etl.model_node import MLModelTransformer
>>> model_transformer = MLModelTransformer(module_name=”iris_model.iris_predict”, class_name=”IrisModel”)
>>> generator = model_transformer(data={“sepal_length”: 4.4, “sepal_width”: 2.9, “petal_length”: 1.4, “petal_width”: 0.2})
>>> result = list(generator)
>>> result
[{‘species’: ‘setosa’}]

Creating a Graph

A bonobo application runs an ETL from a Graph object that is defined at application startup. Any number of transformations can be used, and they can be arranged into complex DAGs. Every Graph object must contain at least one extractor to get data from an outside source, and one loader to save data to an outside destination. The bonobo package provides several options for accessing data files, we’ll use the JSONLD extractor and loader transformations to define a simple Graph inside a function:

def get_graph(**options):
graph = bonobo.Graph()
LdjsonReader(options[“input_file”], mode=’r’),
MLModelTransformer(module_name=”iris_model.iris_predict”, class_name=”IrisModel”),
LdjsonWriter(options[“output_file”], mode=’w’))
return graph
>>> from model_etl.etl_job import get_graph
>>> graph = get_graph(“data/input.json”, “data/output.json”)
>>> graph
<bonobo.structs.graphs.Graph object at 0x10a52ffd0>

Running the ETL Process Locally

The graph defined in the previous section works well when running it from an interactive Python session, but it would be better to run in from the command line. Before writing the code to create simple command line interface, we need to create some parameters for the input and output file names:

def get_argument_parser(parser=None):
parser = bonobo.get_argument_parser(parser=parser)

parser.add_argument(“--input_file”, “-i”, type=str, default=None, help=”Path of the input file.”)

parser.add_argument(“--output_file”, “-o”, type=str, default=None, help=”Path of the output file.”)
return parser
if __name__ == ‘__main__’:
parser = get_argument_parser()
with bonobo.parse_args(parser) as options:
python model_etl/etl_job.py -—input_file=data/input.json --utput_file=data/output.json

Accessing Data from a Service

When testing an ETL job locally, it is easiest to load data from and save data to the local hard drive. When running the ETL in a production environment, the ETL code will most likely be accessing data from remote storage systems. We can easily write an implementation of the LdjsonReader and LdjsonWriter classes to access files from a remote system, but this is not a best practice.

pip install fs-s3fs
def get_services(**options):
return {
‘fs’: S3FS(options[“bucket”],
def get_argument_parser(parser=None):
parser = bonobo.get_argument_parser(parser=parser)

parser.add_argument("--input_file”, “-i”, type=str, default=None, help=”Path of the input file.”)
parser.add_argument(“--output_file”, “-o”, type=str, default=None, help=”Path of the output file.”)
# these parameters are added for accessing different S3 services
parser.add_argument(“--bucket”, “-b”, type=str, default=None, help=”Bucket name in S3 service.”)
parser.add_argument("--key”, “-k”, type=str, default=None, help=”Key to access S3 service.”)
parser.add_argument(“--secret_key”, “-sk”, type=str, default=None, help=”Secret key to access the S3 service.”)
parser.add_argument(“--endpoint_url”, “-ep”, type=str, default=None, help=”Endpoint URL for S3 service.”)

return parser
if __name__ == ‘__main__’:
parser = get_argument_parser()
with bonobo.parse_args(parser) as options:
docker run -p 9000:9000 — name minio -e “MINIO_ACCESS_KEY=TEST” -e “MINIO_SECRET_KEY=ASDFGHJKL” -v /Users/brian/Code/etl-job-ml-model-deployment:/data minio/minio server /data
Screenshot of the minio service UI
Screenshot of the minio service UI
The minio web UI.
python model_etl/s3_etl_job.py — input_file=input.json — output_file=output.json — bucket=data — key=TEST — secret_key=ASDFGHJKL — endpoint_url=


In this blog post, I showed how to deploy the iris model developed in a previous blog post inside of an ETL application. By splitting the deployment code and the model code into separate packages, I’m able to reuse the model in many different types of deployments. By structuring the codebases in this way, I’m able to keep the machine learning code separate from the deployment code very effectively.

Coder and machine learning enthusiast

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store