| Author: | Sean J Taylor |
|---|---|
| Date: | 2008.03.15 |
| Venue: | PyCon 2008 |
The slides have tons of clickable links, you can view them here:
http://blog.codejams.net/pycon/
Document-oriented Database are simple key-value stores.
Here is the Invoice as a JSON document
{ "payments": [
{"type": "VISA", "amount": 14.00}],
"lineItems": [
{ "description": "Dive into Python",
"amount": 17.00,
"allocation": {
"code1": {"amount":5.00, "paid": 5.00},
"code2": {"amount":12.00, "paid": 9.00}}
}]
}
We may be giving up some of the ACID properties that are so desirable in databases.
DictMixin is useful for constructing persistent dictionaries.
class DictMixin(object):
def keys(self): pass
def __getitem__(self, key): pass
def __setitem__(self, key, value): pass
def __delitem__(self, key): pass
All other mapping-interface methods are derived from these four operations. See Ian Bicking's Blog Entry for more info.
Example: FileStore
class FileStore(DictMixin):
def __init__(self, path):
self.path = path
def keys(self):
return os.listdir(self.path)
def __getitem__(self, key):
p = os.path.join(self.path, key)
if not os.path.exists(p):
raise KeyError(key)
return open(p).read()
FileStore continued
class FileStorage(DictMixin):
def __setitem__(self):
f = open(os.path.join(self.path, key))
f.write(value)
f.close()
def __delitem__(self, key):
os.remove(os.path.join(self.path, key))
Add file locking and it's safe for multiple read/writes.
The standard library comes with plenty of options.
>>> import *dbm as dbm
>>> db = dbm.open(filename, mode)
- dumbdbm:
- written in Python, a flat file
- fallback option
- dbm:
- uses UNIX dbm library
- removes items/values methods
- gdbm:
- GNU DBM library
- removes items/values methods
- adds firstkey/nextkey methods
- dbhash:
- BSD DB library
- adds first/last/next/previous methods
Choice of storage will depend on your application. You may need:
- to be able to edit the data with a text editor
- to make the store accessible from other languages
- to scale to a large number of documents
- to partition the data or acheive some redundancy
| backend | writetime | listkeys | readtime |
|---|---|---|---|
| bsddb | 11.40 | 0.80 | 2.65 |
| sqlite | 33.76 | 0.23 | 4.32 |
| dbm | 14.27 | 0.74 | 2.62 |
| file | 2.15 | 0.03 | 9.90 |
payments:
- amount: 14.00
type: VISA
lineItems:
- description: Dive Into Python
amount: 17.00
allocation:
code1:
amount: 5.00
paid: 5.00
code2:
amount: 12.00
paid: 9.00
- interoperability
- speed
- human editability/readability
To add features, we subclass a UserDict to make middleware
class UserDict(object):
def __init__(self, data):
self.data = data
def __getitem__(self, key):
return self.data[key]
def __setitem__(self, key, value):
self.data[key] = value
Values are converted to/from JSON as they are set/get.
class JSONSerializer(UserDict):
def __getitem__(self, key):
return simplejson.loads(self.data[key])
def __setitem__(self, key, value):
self.data[key] = simplejson.dumps(value)
Using FormEncode, we can easily validate a document has the correct schema:
class InvoiceSchema(Schema):
payments = ForEach(PaymentSchema)
lineItems = ForEach(LineItemSchema)
class PaymentSchema(Schema):
type = UnicodeString()
amount = Number()
class LineItemSchema(Schema):
allocation = AllocationSchema()
We'll pass our document through a validation test when it is saved.
class ValidatingDict(UserDict):
def __init__(self, data, validator=None):
self.validator = validator
...
def __setitem__(self, key, value):
self.validator(value)
self.data[key] = value
class VersionedDict(UserDict):
def __getitem__(self, key):
value = self.data[key]
version = value['latest']
if isinstance(key, tuple):
key, version = key
return value[version]
>>> db['docname']
>>> db['docname', 'revision number']
class VersionedDict(UserDict):
def __setitem__(self, key, value):
try:
versions = self.data[key]
except KeyError:
versions = {}
if (versions and value['rev'] != versions['latest']):
raise Conflict()
rev = uuid.uuid1(); value['rev']=rev
versions[rev] = value; versions['latest'] = rev
self.data[key] = versions
class TimeStampDict(UserDict):
def __setitem__(self, key, value):
now = datetime.now()
value['timestamp'] = now
self.data[key] = value
class ReplicationDict(UserDict):
def __init__(self, data, backup=None):
self.backup = backup
...
def __setitem__(self, key, value):
t = Thread(target=self.backup.__setitem__,
args=(key, value))
t.start()
...
class ZipDict(UserDict):
def __getitem__(self, key):
value = self.data[key]
return zlib.decompress(value)
def __setitem__(self, key, value):
value = zlib.compress(value)
self.data[key] = value
import Crypto.Cipher.AES as AES
class CipherDict(UserDict):
def __init__(self, data, key=key):
self.crypter = AES(key)
....
You get the idea.
from functools import partial
def compose(outer, inner):
def composition(*args, **kwd):
return outer(inner(*args, **kwd))
return composition
compose_all = partial(reduce, compose)
store = FileStore('/var/database')
backup = SimpleDBStore()
db = compose_all([
partial(ValidatingDict,
validator=InvoiceSchema.to_python),
TimeStampDict,
VersionedDict,
JSONSerializer,
partial(ReplicatingDict, backup=backup)
])(store)
Unsolved problem: how to share state across the stack.
def has_high_balance(doc):
if calculate_balance(doc) >= 50.00:
return True
invoices = (inv for inv in db.itervalues()
if has_high_balance(inv))
So maybe this isn't the most efficient approach, but it's simple.
MapReduce was invented by Google to query incredibly large datasets.
It can be run incrementally if we're willing to store redundant information.
The Map Function takes a document and may yield 0 or more key-value pairs.
def map_func(doc):
for li in doc['lineItems']:
for c, a in li['allocation'].iteritems():
yield c, a['amount'] - a['paid']
The Map function is run on each document as it is set:
map_output = {'invoiceId': [
('code1', 13.00),
('code2', 18.00),
('code1', 0.00),
('code3', 1.00)]}
This is cached for each key for each view in the database.
reduce_input = defaultdict(list)
for values in map_output.itervalues():
for key, val in values:
reduce_input[key].append(val)
reduce_input should be persistent for incremental updates.
The Reduce function takes a key and a list of values to reduce. In our case, it will be a simple sum.
def reduce_func(accounting_code, amounts):
return sum(amounts)
The resuling view on our data looks like this:
{'code1': 13.00,
'code2': 18.00,
'code3': 1.00}
class MapReduceView(UserDict):
def __init__(self, data, store1, store2):
self.map_output = store1
self.reduce_input = store2
....
def map(self, key, value): abstract
def reduce(self, key, values): abstract
def update_view(self, key, value):
# update map_output
# update reduce_input
# update data
To keep our interface from being polluted, we'll make the views addressable.
class MapReduceDict(UserDict):
views = {'ar_by_accounting_code': my_view_instance}
def __setitem__(self, key, value):
for view in self.views.values():
view.update_view(key, value)
def __getitem__(self, key):
if key.startswith('_view/'):
return self.views[key[6:]]
To create a database server, we'll use WSGI to serve over HTTP. REST maps neatly onto the mapping interface.
class HTTPDict(UserDict):
def GET(self, uri, environ):
try:
data = self.data[uri]
except KeyError:
raise HTTPNotFound()
return '200 OK', data
def POST(self, uri, environ):
resource_id = str(uuid.uuid1())
self._set_item(resource_id, environ)
return '201 Created', ''
def PUT(self, uri, environ):
new = uri not in self.data
self._set_item(uri, environ)
return '201 Created' if new else '204 No Content', ''