Convenience functions in Python for saving Keras models directly to S3

Keras is a very popular framework developed by Google for training and using machine learning models, and it has become somewhat ubiquitous in its use within the domain. Now I’m no data scientist or machine learning expert, but in my work I am presented with problems related to building things that make machine learning and its related applications easy for data scientists to use.

Serialized machine learning models are almost binary files, making them not the best to store and version control using conventional version control systems such as git. The solution for this is to put them in an object store such as AWS S3 where they can be stored, updated and used by different data scientists on the same team. However, Keras by default stores its models on a folder structure. Take the simple model:

import numpy as np
from tensorflow import keras
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = keras.Model(inputs, outputs)
model.compile(optimizer="adam", loss="mean_squared_error")
model.save("my_model")

which generates the following folder structure:

Easy folder structure generated using https://tree.nathanfriend.io/

which is only a very simple example of the various folders that Keras could generate depending on the type of model you are creating. By giving the top-level folder name in the fashion of:

model = keras.models.load_model("my_model")

you would be able to load the model for use later.

Problem: Enable easy export of Keras models to S3 without needing to traverse through the generated folder structure in code, and enable easy fetching of a model exported in such a manner so that it can be immediately loaded by Keras.

Solution: Zip up the folder structure generated by Keras in a temporary folder. Upload the zipped file to S3. When loading a model, download the corresponding zip file from S3 in to a temporary folder, unzip it, and load it from there.

Gist for the complete code.

Let’s say we have a simple Keras model like what was outlined above:

inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = keras.Model(inputs, outputs)
model.compile(optimizer="adam", loss="mean_squared_error")

We’re going to use Python’s tempfile library to save this model in a temporary location:

with tempfile.TemporaryDirectory() as tempdir:
    model.save(f"{tempdir}/{model_name}")

By using the temporary directory with context, with tempfile.TemporaryDirectory() , we ensure that the temporary directory is deleted and forgotten as soon as we leave that context block.

Next, we zip it up:

zipf = zipfile.ZipFile(f"{tempdir}/{model_name}.zip", "w", zipfile.ZIP_STORED)
zipdir(f"{tempdir}/{model_name}", zipf)
zipf.close()

This uses a zipdir function which traverses the folder with the Keras model in it, and adds it to the given zip file:

def zipdir(path, ziph):
  # Zipfile hook to zip up model folders
  length = len(path)
  for root, dirs, files in os.walk(path):
    folder = root[length:] # Stop zipping parent folders
    for file in files:
      ziph.write(os.path.join(root, file), os.path.join(folder, file))

Now, we can use an s3fs object to write the zipped file to the S3 bucket we need:

s3fs = s3fs.S3FileSystem(key=AWS_ACCESS_KEY, secret=AWS_SECRET_KEY)
s3fs.put(f"{tempdir}/{model_name}.zip", f"{BUCKET_NAME}/{model_name}.zip")

To get this file back and use it in Keras, we have a simple function that uses all the above libraries to reverse the process:

def s3_get_keras_model(model_name: str) -> keras.Model:
  with tempfile.TemporaryDirectory() as tempdir:
    s3fs = get_s3fs()
    # Fetch and save the zip file to the temporary directory
    s3fs.get(f"{BUCKET_NAME}/{model_name}.zip", f"{tempdir}/{model_name}.zip")
    # Extract the model zip file within the temporary directory
    with zipfile.ZipFile(f"{tempdir}/{model_name}.zip") as zip_ref:
        zip_ref.extractall(f"{tempdir}/{model_name}")
    # Load the keras model from the temporary directory
    return keras.models.load_model(f"{tempdir}/{model_name}")

Put everything together, and we have a simple implementation of saving Keras models in their entirety to S3 and getting them back without having to think about traversing nested folder structures created when saving Keras models.

Generate and track metrics for Flask API applications using Prometheus and Grafana

The code for this entire implementation can be found here: https://github.com/ramdesh/flask-prometheus-grafana-example

Flask is a very popular lightweight framework for writing web and web service applications in Python. In this blog post, I’m going to talk about how to monitor metrics on a Flask RESTful web service API application using Prometheus and Grafana. We’ll be tying it all together using docker-compose so that we can run everything using a single command, in an isolated Docker network.

Prometheus is a time-series cloud database, that is extremely popular as a metrics and monitoring database, specially with Kubernetes. Promtheus is really cool because it is designed to scrape metrics from your application, instead of your application having to send metrics to it actively. Coupled with Grafana, this stack turns in to a powerful metrics tracking/monitoring tool, which is used in applications the world over.

To couple Flask with Prometheus and Grafana, we’re going to use the invaluable prometheus_flask_exporter library. This library allows us to create a /metrics endpoint for Prometheus to scrape with useful metrics regarding endpoint access, such as time taken to generate each response, CPU metrics, and so on.

The first thing we need to do in order to set up, is to create our Flask app. Here’s a really simple server.py with the exporter library included:

import logging

from flask import Flask
from flask import jsonify
from prometheus_flask_exporter import PrometheusMetrics

logging.basicConfig(level=logging.INFO)
logging.info("Setting LOGLEVEL to INFO")

api = Flask(__name__)
metrics = PrometheusMetrics(api)

metrics.info("app_info", "App Info, this can be anything you want", version="1.0.0")


@api.route("/flask-prometheus-grafana-example/")
def hello():
    return jsonify(say_hello())


def say_hello():
    return {"message": "hello"}

This code just returns a “hello” message when you access the flask-prometheus-grafana-example endpoint. The important part here is the integration of the prometheus_flask_exporter library. All you have to do is initialize a metrics object using metrics = PrometheusMetrics(yourappname) to get it working. It will automatically start exporting metrics to the /metrics endpoint of your application for the specified endpoint after that. If you go your app’s /metrics endpoint after running it, you’ll be greeted with something like this:

Now to set up Prometheus and Grafana. For Prometheus, you need a prometheus.yml file, which would look something like this:

# my global config
global:
  scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'
    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.
    static_configs:
    - targets: ['example-prometheus:9090']

  - job_name: 'flask-api'
    scrape_interval: 5s
    static_configs:
    - targets: ['flask-api:5000']

In this example, we see that Prometheus is watching two endpoints, itself, example-prometheus:9090, and the Flask api, flask-api. These names are arbitrarily set inside the docker-compose config file, which we will get to later.

For Grafana, we need a datasource.yml file;

# config file version
apiVersion: 1

# list of datasources that should be deleted from the database
deleteDatasources:
  - name: Prometheus
    orgId: 1

# list of datasources to insert/update depending
# whats available in the database
datasources:
  # <string, required> name of the datasource. Required
- name: Prometheus
  # <string, required> datasource type. Required
  type: prometheus
  # <string, required> access mode. direct or proxy. Required
  access: proxy
  # <int> org id. will default to orgId 1 if not specified
  orgId: 1
  # <string> url
  url: http://example-prometheus:9090
  # <string> database password, if used
  password:
  # <string> database user, if used
  user:
  # <string> database name, if used
  database:
  # <bool> enable/disable basic auth
  basicAuth: false
  # <string> basic auth username, if used
  basicAuthUser:
  # <string> basic auth password, if used
  basicAuthPassword:
  # <bool> enable/disable with credentials headers
  withCredentials:
  # <bool> mark as default datasource. Max one per org
  isDefault: true
  # <map> fields that will be converted to json and stored in json_data
  jsonData:
     graphiteVersion: "1.1"
     tlsAuth: false
     tlsAuthWithCACert: false
  # <string> json object of data that will be encrypted.
  secureJsonData:
    tlsCACert: "..."
    tlsClientCert: "..."
    tlsClientKey: "..."
  version: 1
  # <bool> allow users to edit datasources from the UI.
  editable: true

In this file, we are defining datasources.url, which is also derived from the name of the prometheus container on the Docker network via the docker-compose file.

Finally, we have a config.monitoring file:

GF_SECURITY_ADMIN_PASSWORD=pass@123
GF_USERS_ALLOW_SIGN_UP=false

This basically means we’ll be logging in to our Grafana dashboard using username: admin and password: pass@123.

Next, we’re going to load this all up using a Docker Compose file:

version: "3.5"

services:
  flask-api:
    build:
      context: ./api
    restart: unless-stopped
    container_name: flask-api
    image: example-flask-api
    ports:
      - "5000:5000"
    networks:
      example-network:
        ipv4_address: 172.16.238.10

  example-prometheus:
    image: prom/prometheus:latest
    restart: unless-stopped
    container_name: example-prometheus
    ports:
      - 9090:9090
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
    networks:
      example-network:
        ipv4_address: 172.16.238.11

  example-grafana:
    image: grafana/grafana:latest
    restart: unless-stopped
    user: "472"
    container_name: example-grafana
    depends_on:
      - example-prometheus
    ports:
      - 3000:3000
    volumes:
      - ./monitoring/datasource.yml:/etc/grafana/provisioning/datasource.yml
    env_file:
      - ./monitoring/config.monitoring
    networks:
      example-network:
        ipv4_address: 172.16.238.12

networks:
  example-network:
    name: example-network
    driver: bridge
    ipam:
      driver: default
      config:
        - subnet: 172.16.238.0/24

Note that we’re creating our own Docker network and putting all our applications on it, which allows them to talk to each other. However, this would be the same if you didn’t specify a network at all. Also important to note is that am using wsgi to run the Flask application.

Once Grafana is up, you should be able to log in and configure Prometheus as a datasource:

Add Prometheus as a datasource for Grafana

Once that’s done, you can use the example dashboard from the creator of the prometheus_flask_exporter library (use Import->JSON) which can be found here: https://github.com/rycus86/prometheus_flask_exporter/blob/master/examples/sample-signals/grafana/dashboards/example.json

This gives you a cool dashboard like this:

Grafana dashboard for Flask metrics

As you can see, this gives us a killer implementation of Prometheus + Grafana to monitor a Flask web service application with minimum effort.

I wish everything in software development was this easy.