rophako/rophako/jsondb.py

294 regels
7.6 KiB
Python

# -*- coding: utf-8 -*-
2015-07-10 07:27:13 +00:00
from __future__ import unicode_literals, print_function, absolute_import
"""JSON flat file database system."""
import codecs
import os
import os.path
import re
from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN
import redis
import json
import time
2014-12-04 23:06:44 +00:00
from rophako.settings import Config
2014-12-04 23:56:20 +00:00
from rophako.utils import handle_exception
from rophako.log import logger
redis_client = None
cache_lifetime = 60*60 # 1 hour
2014-12-02 23:46:13 +00:00
def get(document, cache=True):
"""Get a specific document from the DB."""
logger.debug("JsonDB: GET {}".format(document))
# Exists?
if not exists(document):
logger.debug("Requested document doesn't exist")
return None
path = mkpath(document)
stat = os.stat(path)
# Do we have it cached?
2014-12-02 23:46:13 +00:00
data = get_cache(document) if cache else None
if data:
# Check if the cache is fresh.
if stat.st_mtime > get_cache(document+"_mtime"):
del_cache(document)
del_cache(document+"_mtime")
else:
return data
# Get a lock for reading.
lock = lock_cache(document)
# Get the JSON data.
data = read_json(path)
# Unlock!
unlock_cache(lock)
# Cache and return it.
2014-12-02 23:46:13 +00:00
if cache:
set_cache(document, data, expires=cache_lifetime)
set_cache(document+"_mtime", stat.st_mtime, expires=cache_lifetime)
return data
2014-12-02 23:46:13 +00:00
def commit(document, data, cache=True):
"""Insert/update a document in the DB."""
# Only allow one commit at a time.
lock = lock_cache(document)
# Need to create the file?
path = mkpath(document)
if not os.path.isfile(path):
parts = path.split("/")
parts.pop() # Remove the file part
directory = list()
# Create all the folders.
for part in parts:
directory.append(part)
segment = "/".join(directory)
if len(segment) > 0 and not os.path.isdir(segment):
logger.debug("JsonDB: mkdir {}".format(segment))
2014-06-02 21:04:44 +00:00
os.mkdir(segment, 0o755)
# Write the JSON.
write_json(path, data)
# Update the cached document.
2014-12-02 23:46:13 +00:00
if cache:
set_cache(document, data, expires=cache_lifetime)
set_cache(document+"_mtime", time.time(), expires=cache_lifetime)
# Release the lock.
unlock_cache(lock)
def delete(document):
"""Delete a document from the DB."""
path = mkpath(document)
if os.path.isfile(path):
logger.debug("Delete DB document: {}".format(path))
os.unlink(path)
2014-12-02 23:46:13 +00:00
del_cache(document)
def exists(document):
"""Query whether a document exists."""
path = mkpath(document)
return os.path.isfile(path)
2015-01-25 09:32:38 +00:00
def list_docs(path, recursive=False):
"""List all the documents at the path."""
2015-01-25 09:32:38 +00:00
root = os.path.join(Config.db.db_root, path)
docs = list()
if not os.path.isdir(root):
return []
2015-01-25 09:32:38 +00:00
for item in sorted(os.listdir(root)):
target = os.path.join(root, item)
db_path = os.path.join(path, item)
# Descend into subdirectories?
if os.path.isdir(target):
if recursive:
docs += [
os.path.join(item, name) for name in list_docs(db_path)
]
else:
continue
if target.endswith(".json"):
name = re.sub(r'\.json$', '', item)
docs.append(name)
return docs
def mkpath(document):
"""Turn a DB path into a JSON file path."""
if document.endswith(".json"):
# Let's not do that.
raise Exception("mkpath: document path already includes .json extension!")
2014-12-04 23:06:44 +00:00
return "{}/{}.json".format(Config.db.db_root, str(document))
def read_json(path):
"""Slurp, decode and return the data from a JSON document."""
path = str(path)
if not os.path.isfile(path):
raise Exception("Can't read JSON file {}: file not found!".format(path))
2014-04-06 22:45:43 +00:00
# Don't allow any fishy looking paths.
if ".." in path:
logger.error("ERROR: JsonDB tried to read a path with two dots: {}".format(path))
raise Exception()
# Open and lock the file.
fh = codecs.open(path, 'r', 'utf-8')
flock(fh, LOCK_SH)
text = fh.read()
flock(fh, LOCK_UN)
fh.close()
# Decode.
try:
data = json.loads(text)
except:
logger.error("Couldn't decode JSON data from {}".format(path))
2014-12-04 23:56:20 +00:00
handle_exception(Exception("Couldn't decode JSON from {}\n{}".format(
path,
text,
)))
data = None
return data
def write_json(path, data):
"""Write a JSON document."""
path = str(path)
2014-04-06 22:45:43 +00:00
# Don't allow any fishy looking paths.
if ".." in path:
logger.error("ERROR: JsonDB tried to write a path with two dots: {}".format(path))
raise Exception()
logger.debug("JsonDB: WRITE > {}".format(path))
# Open and lock the file.
fh = codecs.open(path, 'w', 'utf-8')
flock(fh, LOCK_EX)
# Write it.
2015-07-10 07:41:44 +00:00
fh.write(json.dumps(data, indent=4, separators=(',', ': ')))
# Unlock and close.
flock(fh, LOCK_UN)
fh.close()
############################################################################
# Redis Caching Functions #
############################################################################
disable_redis = False
def get_redis():
"""Connect to Redis or return the existing connection."""
global redis_client
global disable_redis
if not redis_client and not disable_redis:
try:
redis_client = redis.StrictRedis(
host = Config.db.redis_host,
port = Config.db.redis_port,
db = Config.db.redis_db,
)
redis_client.ping()
except Exception as e:
2015-07-10 07:27:13 +00:00
logger.error("Couldn't connect to Redis; memory caching will be disabled! {}".format(e))
redis_client = None
disable_redis = True
return redis_client
def set_cache(key, value, expires=None):
"""Set a key in the Redis cache."""
2014-12-04 23:06:44 +00:00
key = Config.db.redis_prefix + key
client = get_redis()
if not client:
return
try:
client.set(key, json.dumps(value))
# Expiration date?
if expires:
client.expire(key, expires)
except:
logger.error("Redis exception: couldn't set_cache {}".format(key))
def get_cache(key):
"""Get a cached item."""
2014-12-04 23:06:44 +00:00
key = Config.db.redis_prefix + key
value = None
client = get_redis()
if not client:
return
try:
value = client.get(key)
if value:
value = json.loads(value)
except:
logger.debug("Redis exception: couldn't get_cache {}".format(key))
value = None
return value
def del_cache(key):
"""Delete a cached item."""
2014-12-04 23:06:44 +00:00
key = Config.db.redis_prefix + key
client = get_redis()
if not client:
return
2014-12-02 23:46:13 +00:00
client.delete(key)
def lock_cache(key, timeout=5, expire=20):
"""Cache level 'file locking' implementation.
The `key` will be automatically suffixed with `_lock`.
The `timeout` is the max amount of time to wait for a lock.
The `expire` is how long a lock may exist before it's considered stale.
Returns True on success, None on failure to acquire lock."""
client = get_redis()
if not client:
return
# Take the lock.
lock = client.lock(key, timeout=expire)
lock.acquire()
logger.debug("Cache lock acquired: {}, expires in {}s".format(key, expire))
return lock
def unlock_cache(lock):
"""Release the lock on a cache key."""
if lock:
lock.release()
logger.debug("Cache lock released")