Skip to main content
Version: 0.6

Listening for new data

note

In Superduper, AI models may be configured to listen for newly inserted data. Outputs will be computed over that data and saved back to the data-backend.

In this example we show how to configure 3 models to interact when new data is added.

  1. A featurizing computer vision model (images -> vectors).
  2. 2 models evaluating image-2-text similarity to a set of key-words.

We use an open-source model "CLIP" which we install via pip directly from GitHub. You can read more about installing requirements on our docs here.

!pip install git+https://github.com/openai/CLIP.git

We apply our setup to images from the cats and dogs dataset. We've prepared a subset especially for quick experimentation.

# !curl -O https://superduperdb-public-demo.s3.amazonaws.com/images.zip && unzip images.zip
from PIL import Image
import os

data = [f'images/{x}' for x in os.listdir('./images') if x.endswith('png')]
data = [{'img': Image.open(path)} for path in data]

Now that we've prepared these records we can insert this data "directly" into the database with a standard insert statement. (Notice however the difference from pymongo with the .execute() call.) The same pattern may be applied to other database types.

from superduper import superduper, Document, Table

db = superduper('mongomock://')

table = Table('images', fields={'img': 'superduper_pillow.pil_image'})

db.apply(table, force=True)

_ = db['images'].insert(data[:-1])

We can verify that the images are correctly saved by retrieved a single record:

r = db['images'].get()
r['img']

We now build a torch model for text-2-image similarity using the clip library. In order to save the outputs correctly in the system, we add the tensor datatype to the model:

import clip
import hashlib
import torch
from superduper_torch import TorchModel, Tensor


model_name = "ViT-B/32"
model, preprocess = clip.load(model_name, "cpu")

class ImageModel(torch.nn.Module):
def __init__(self):
super().__init__()
self.model = model

def forward(self, image_tensors):
return self.model.encode_image(image_tensors)

def __hash__(self):
return int(hashlib.sha256(model_name.encode()).hexdigest(), 16)


image_model = TorchModel(
identifier='clip_image',
object=ImageModel(),
preprocess=preprocess,
datatype='superduper_torch.Tensor[float32:512]',
loader_kwargs={'batch_size': 5},
)

We can verify that this model gives us the correct outputs on the added data with the .predict method:

Now we'd like to set up this model to compute outputs on the 'img' key of each record. To do that we create a Listener (see here for more information) which "listens" for incoming and existing data, and computes outputs on that data.

When new data is inserted, the model automatically will create outputs on that data. This is a very handy feature for productionizing AI and ML, since a data deployment needs to be keep up-to-date as far as possible.

from superduper import Listener

listener = Listener(
'image_listener',
model=image_model,
select=db['images'],
key='img',
)

Downstream of this first model, we now can add another smaller model, to classify images with configurable terms. Since the dataset is concerned with cats and dogs we create 2 downstream models classifying the images in 2 different ways.

from superduper import ObjectModel
from superduper.misc.utils import hash_item


class Comparer:
def __init__(self, words, text_features):
self.targets = {w: text_features[i] for i, w in enumerate(words)}
self.lookup = list(self.targets.keys())
self.matrix = torch.stack(list(self.targets.values()))

def __call__(self, vector):
best = (self.matrix @ vector).topk(1)[1].item()
return self.lookup[best]

def __hash__(self):
return int(hash_item(self.matrix.detach().numpy().tolist()), 16)


cats_vs_dogs = Listener(
'cats_vs_dogs',
model=ObjectModel(
'cats_vs_dogs',
object=Comparer(['cat', 'dog'], model.encode_text(clip.tokenize(['cat', 'dog']))),
),
select=db[listener.outputs],
key=listener.outputs,
upstream=[listener],
)


felines_vs_canines = Listener(
'felines_vs_canines',
model=ObjectModel(
'felines_vs_canines',
object=Comparer(['feline', 'canine'], model.encode_text(clip.tokenize(['feline', 'canine']))),
),
select=db[listener.outputs],
key=listener.outputs,
upstream=[listener],
)
from superduper import Application

application = Application(
'animal_image_analysis',
components=[
listener,
cats_vs_dogs,
felines_vs_canines,
]
)

db.apply(application)

We can verify that both downstream models have written their outputs to the database by querying a document:

r = db['images'].outputs(cats_vs_dogs.predict_id, felines_vs_canines.predict_id).get()

print(r[cats_vs_dogs.outputs])
print(r[felines_vs_canines.outputs])
r['img']
inserted_id = db['images'].insert([data[-1]])[0]

We can verify this by querying the data again:

r = db['images'].outputs(cats_vs_dogs.predict_id).get(_id=inserted_id)
r
r['img']