Data Types¶
This section of the documentation shows how to work with special data types from the CrateDB SQLAlchemy dialect. Currently, these are:
Container types
ObjectTypeandObjectArray.Geospatial types
GeopointandGeoshape.Vector data type
FloatVector.
Introduction¶
Import the relevant symbols:
>>> import sqlalchemy as sa
>>> from datetime import datetime
>>> from geojson import Point, Polygon
>>> from sqlalchemy import delete, func, text
>>> from sqlalchemy.orm import sessionmaker
>>> from sqlalchemy.sql import operators
>>> try:
... from sqlalchemy.orm import declarative_base
... except ImportError:
... from sqlalchemy.ext.declarative import declarative_base
>>> from uuid import uuid4
Establish a connection to the database, see also Engine Configuration and Connect / Drivers:
>>> engine = sa.create_engine(f"crate://{crate_host}")
>>> connection = engine.connect()
Create an SQLAlchemy Session:
>>> session = sessionmaker(bind=engine)()
>>> Base = declarative_base()
Introduction to container types¶
In a document oriented database, it is a common pattern to store objects within
a single field. For such cases, the CrateDB SQLAlchemy dialect provides the
ObjectType and ObjectArray types.
The ObjectType type effectively implements a dictionary- or map-like type. The
ObjectArray type maps to a Python list of dictionaries.
For exercising those features, let’s define a schema using SQLAlchemy’s Declarative Mapping:
>>> from sqlalchemy_cratedb import ObjectType, ObjectArray
>>> def gen_key():
... return str(uuid4())
>>> class Character(Base):
... __tablename__ = 'characters'
... id = sa.Column(sa.String, primary_key=True, default=gen_key)
... name = sa.Column(sa.String)
... quote = sa.Column(sa.String)
... details = sa.Column(ObjectType)
... more_details = sa.Column(ObjectArray)
In CrateDB’s SQL dialect, those container types map to OBJECT and ARRAY.
ObjectType¶
Let’s add two records which have additional items within the details field.
Note that item keys have not been defined in the DDL schema, effectively
demonstrating the DYNAMIC column policy.
>>> arthur = Character(name='Arthur Dent')
>>> arthur.details = {}
>>> arthur.details['gender'] = 'male'
>>> arthur.details['species'] = 'human'
>>> session.add(arthur)
>>> trillian = Character(name='Tricia McMillan')
>>> trillian.details = {}
>>> trillian.quote = "We're on a space ship Arthur. In space."
>>> trillian.details['gender'] = 'female'
>>> trillian.details['species'] = 'human'
>>> trillian.details['female_only_attribute'] = 1
>>> session.add(trillian)
>>> session.commit()
After INSERT statements are submitted to the database, the newly inserted
records aren’t immediately available for retrieval because the index is only
updated periodically (default: each second). In order to synchronize that,
refresh the table:
>>> _ = connection.execute(text("REFRESH TABLE characters"))
A subsequent select query will see all the records:
>>> query = session.query(Character).order_by(Character.name)
>>> [(c.name, c.details['gender']) for c in query]
[('Arthur Dent', 'male'), ('Tricia McMillan', 'female')]
It is also possible to just select a part of the document, even inside the
ObjectType type:
>>> sorted(session.query(Character.details['gender']).all())
[('female',), ('male',)]
In addition, filtering on the attributes inside the details column is also
possible:
>>> query = session.query(Character.name)
>>> query.filter(Character.details['gender'] == 'male').all()
[('Arthur Dent',)]
Update dictionary¶
The SQLAlchemy CrateDB dialect supports change tracking deep down the nested
levels of a ObjectType type field. For example, the following query will only
update the gender key. The species key which is on the same level will
be left untouched.
>>> char = session.query(Character).filter_by(name='Arthur Dent').one()
>>> char.details['gender'] = 'manly man'
>>> session.commit()
>>> session.refresh(char)
>>> char.details['gender']
'manly man'
>>> char.details['species']
'human'
Update nested dictionary¶
>>> char_nested = Character(id='1234id')
>>> char_nested.details = {"name": {"first": "Arthur", "last": "Dent"}}
>>> session.add(char_nested)
>>> session.commit()
>>> char_nested = session.query(Character).filter_by(id='1234id').one()
>>> char_nested.details['name']['first'] = 'Trillian'
>>> char_nested.details['size'] = 45
>>> session.commit()
Refresh and query “characters” table:
>>> _ = connection.execute(text("REFRESH TABLE characters"))
>>> session.refresh(char_nested)
>>> char_nested = session.query(Character).filter_by(id='1234id').one()
>>> pprint(char_nested.details)
{'name': {'first': 'Trillian', 'last': 'Dent'}, 'size': 45}
ObjectArray¶
Note that opposed to the ObjectType type, the ObjectArray type isn’t smart
and doesn’t have intelligent change tracking. Therefore, the generated
UPDATE statement will affect the whole list:
>>> char.more_details = [{'foo': 1, 'bar': 10}, {'foo': 2}]
>>> session.commit()
>>> char.more_details.append({'foo': 3})
>>> session.commit()
This will generate an UPDATE statement which looks roughly like this:
"UPDATE characters SET more_details = ? ...", ([{'foo': 1, 'bar': 10}, {'foo': 2}, {'foo': 3}],)
To run queries against fields of ObjectArray types, use the
.any(value, operator=operators.eq) method on a subscript, because accessing
fields of object arrays (e.g. Character.more_details['foo']) returns an
array of the field type.
Only one of the objects inside the array has to match in order for the result to be returned:
>>> query = session.query(Character.name)
>>> query.filter(Character.more_details['foo'].any(1, operator=operators.eq)).all()
[('Arthur Dent',)]
Querying a field of an object array will result in an array of all values of that field of all objects in that object array:
>>> query = session.query(Character.more_details['foo']).order_by(Character.name)
>>> query.all()
[([1, 2, 3],), (None,), (None,)]
Geospatial types¶
CrateDB’s geospatial types, such as GEO_POINT and GEO_SHAPE, can also be used within an SQLAlchemy declarative schema:
>>> from sqlalchemy_cratedb import Geopoint, Geoshape
>>> class City(Base):
... __tablename__ = 'cities'
... name = sa.Column(sa.String, primary_key=True)
... coordinate = sa.Column(Geopoint)
... area = sa.Column(Geoshape)
One way of inserting these types is using the geojson library, to create points or shapes:
>>> area = Polygon(
... [
... [
... (139.806, 35.515),
... (139.919, 35.703),
... (139.768, 35.817),
... (139.575, 35.760),
... (139.584, 35.619),
... (139.806, 35.515),
... ]
... ]
... )
>>> point = Point(coordinates=(139.76, 35.68))
These two objects can then be added to an SQLAlchemy model and added to the session:
>>> tokyo = City(coordinate=point, area=area, name='Tokyo')
>>> session.add(tokyo)
>>> session.commit()
>>> _ = connection.execute(text("REFRESH TABLE cities"))
When reading them back, they are retrieved as the corresponding geojson objects:
>>> query = session.query(City.name, City.coordinate, City.area)
>>> query.all()
[('Tokyo', (139.75999999791384, 35.67999996710569), {"coordinates": [[[139.806, 35.515], [139.919, 35.703], [139.768, 35.817], [139.575, 35.76], [139.584, 35.619], [139.806, 35.515]]], "type": "Polygon"})]
Vector type¶
CrateDB’s vector data type, FLOAT_VECTOR, allows to store dense vectors of float values of fixed length.
>>> from sqlalchemy_cratedb import FloatVector, knn_match
>>> class SearchIndex(Base):
... __tablename__ = 'search'
... name = sa.Column(sa.String, primary_key=True)
... embedding = sa.Column(FloatVector(3))
Create an entity and store it into the database. float_vector values
can be defined by using arrays of floating point numbers.
>>> foo_item = SearchIndex(name="foo", embedding=[42.42, 43.43, 44.44])
>>> session.add(foo_item)
>>> session.commit()
>>> _ = connection.execute(sa.text("REFRESH TABLE search"))
When reading it back, the FLOAT_VECTOR value will be returned as a NumPy array.
>>> query = session.query(SearchIndex.name, SearchIndex.embedding)
>>> query.all()
[('foo', array([42.42, 43.43, 44.44], dtype=float32))]
In order to apply search, i.e. to match embeddings against each other, use the knn_match(float_vector, float_vector, int) function like this.
>>> query = session.query(SearchIndex.name) \
... .filter(knn_match(SearchIndex.embedding, [42.42, 43.43, 41.41], 3))
>>> query.all()
[('foo',)]