Listening for new data
In Superduper, AI models may be configured to listen for newly inserted data. Outputs will be computed over that data and saved back to the data-backend.
In this example we show how to configure 3 models to interact when new data is added.
- A featurizing computer vision model (images
->
vectors). - 2 models evaluating image-2-text similarity to a set of key-words.
We use an open-source model "CLIP" which we install via pip
directly from GitHub.
You can read more about installing requirements on our docs here.
!pip install git+https://github.com/openai/CLIP.git
We apply our setup to images from the cats and dogs dataset. We've prepared a subset especially for quick experimentation.
!curl -O https://superduperdb-public-demo.s3.amazonaws.com/images.zip && unzip images.zip
from PIL import Image
import os
data = [f'images/{x}' for x in os.listdir('./images') if x.endswith('png')]
data = [{'img': Image.open(path)} for path in data]
Now that we've prepared these records we can insert this data "directly" into the database with
a standard insert statement. (Notice however the difference from pymongo
with the .execute()
call.)
The same pattern may be applied to other database types.
from superduper import superduper, Document
db = superduper('mongomock://')
db['images'].insert_many([Document(r) for r in data[:-1]]).execute()
We can verify that the images are correctly saved by retrieved a single record:
r = db['images'].find_one().execute()
r
The contents of the Document
may be accessed by calling .unpack()
. You can see that the images were saved and retrieved correctly.
r.unpack()['img']
We now build a torch
model for text-2-image similarity using the clip
library. In order to
save the outputs correctly in the system, we add the tensor
datatype to the model:
import clip
import torch
from superduper.ext.torch import TorchModel, tensor
model, preprocess = clip.load("ViT-B/32", "cpu")
class ImageModel(torch.nn.Module):
def __init__(self):
super().__init__()
self.model = model
def forward(self, image_tensors):
return self.model.encode_image(image_tensors)
dt = tensor(dtype='float', shape=(512,))
image_model = TorchModel(
identifier='clip_image',
object=ImageModel(),
preprocess=preprocess,
datatype=dt,
loader_kwargs={'batch_size': 5},
)
We can verify that this model gives us the correct outputs on the added data with the .predict
method:
image_model.predict(data[0]['img'])
Now we'd like to set up this model to compute outputs on the 'img'
key of each record.
To do that we create a Listener
(see here for more information) which
"listens" for incoming and existing data, and computes outputs on that data.
When new data is inserted, the model automatically will create outputs on that data. This is a very handy feature for productionizing AI and ML, since a data deployment needs to be keep up-to-date as far as possible.
listener = image_model.to_listener(
select=db['images'].find(),
key='img',
identifier='image_predictions',
)
_ = db.apply(listener)
We can verify that the outputs are correctly inserted into the documents with this query.
The outputs are saved in the listener.outputs
field:
list(listener.outputs_select.limit(1).execute())[0][listener.outputs].unpack()
Downstream of this first model, we now can add another smaller model, to classify images with configurable terms. Since the dataset is concerned with cats and dogs we create 2 downstream models classifying the images in 2 different ways.
from superduper import ObjectModel
class Comparer:
def __init__(self, words, text_features):
self.targets = {w: text_features[i] for i, w in enumerate(words)}
self.lookup = list(self.targets.keys())
self.matrix = torch.stack(list(self.targets.values()))
def __call__(self, vector):
best = (self.matrix @ vector).topk(1)[1].item()
return self.lookup[best]
cats_vs_dogs = ObjectModel(
'cats_vs_dogs',
object=Comparer(['cat', 'dog'], model.encode_text(clip.tokenize(['cat', 'dog']))),
).to_listener(
select=db['images'].find(),
key=listener.outputs,
)
felines_vs_canines = ObjectModel(
'felines_vs_canines',
object=Comparer(['feline', 'canine'], model.encode_text(clip.tokenize(['feline', 'canine']))),
).to_listener(
select=db['images'].find(),
key=listener.outputs,
)
db.apply(cats_vs_dogs)
db.apply(felines_vs_canines)
We can verify that both downstream models have written their outputs to the database by querying a document:
r = db['images'].find_one().execute()
print(r[cats_vs_dogs.outputs])
print(r[felines_vs_canines.outputs])
Let's check that the predictions make sense for the inserted images:
db['images'].find_one({cats_vs_dogs.outputs: 'cat'}).execute()['img']
db['images'].find_one({felines_vs_canines.outputs: 'feline'}).execute()['img']
db['images'].find_one({cats_vs_dogs.outputs: 'dog'}).execute()['img']
db['images'].find_one({felines_vs_canines.outputs: 'canine'}).execute()['img']
Now that we have installed the models using Listener
, we can insert new data. This
data should be automatically processed by the installed models:
db['images'].insert_one(Document({**data[-1], 'new': True})).execute()
We can verify this by querying the data again:
r = db['images'].find_one({'new': True}).execute().unpack()
r['img']
You see here that the models have been called in the correct order on the newly added data and the outputs saved to the new record:
r['_outputs']