A Python content management system designed for kirsle.net featuring a blog, comments and photo albums. https://rophako.kirsle.net/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

jsondb.py 7.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. # -*- coding: utf-8 -*-
  2. from __future__ import unicode_literals, print_function, absolute_import
  3. """JSON flat file database system."""
  4. import codecs
  5. import os
  6. import os.path
  7. import re
  8. from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN
  9. import redis
  10. import json
  11. import time
  12. from rophako.settings import Config
  13. from rophako.utils import handle_exception
  14. from rophako.log import logger
  15. redis_client = None
  16. cache_lifetime = 60*60 # 1 hour
  17. def get(document, cache=True):
  18. """Get a specific document from the DB."""
  19. logger.debug("JsonDB: GET {}".format(document))
  20. # Exists?
  21. if not exists(document):
  22. logger.debug("Requested document doesn't exist")
  23. return None
  24. path = mkpath(document)
  25. stat = os.stat(path)
  26. # Do we have it cached?
  27. data = get_cache(document) if cache else None
  28. if data:
  29. # Check if the cache is fresh.
  30. if stat.st_mtime > get_cache(document+"_mtime"):
  31. del_cache(document)
  32. del_cache(document+"_mtime")
  33. else:
  34. return data
  35. # Get a lock for reading.
  36. lock = lock_cache(document)
  37. # Get the JSON data.
  38. data = read_json(path)
  39. # Unlock!
  40. unlock_cache(lock)
  41. # Cache and return it.
  42. if cache:
  43. set_cache(document, data, expires=cache_lifetime)
  44. set_cache(document+"_mtime", stat.st_mtime, expires=cache_lifetime)
  45. return data
  46. def commit(document, data, cache=True):
  47. """Insert/update a document in the DB."""
  48. # Only allow one commit at a time.
  49. lock = lock_cache(document)
  50. # Need to create the file?
  51. path = mkpath(document)
  52. if not os.path.isfile(path):
  53. parts = path.split("/")
  54. parts.pop() # Remove the file part
  55. directory = list()
  56. # Create all the folders.
  57. for part in parts:
  58. directory.append(part)
  59. segment = "/".join(directory)
  60. if len(segment) > 0 and not os.path.isdir(segment):
  61. logger.debug("JsonDB: mkdir {}".format(segment))
  62. os.mkdir(segment, 0o755)
  63. # Write the JSON.
  64. write_json(path, data)
  65. # Update the cached document.
  66. if cache:
  67. set_cache(document, data, expires=cache_lifetime)
  68. set_cache(document+"_mtime", time.time(), expires=cache_lifetime)
  69. # Release the lock.
  70. unlock_cache(lock)
  71. def delete(document):
  72. """Delete a document from the DB."""
  73. path = mkpath(document)
  74. if os.path.isfile(path):
  75. logger.info("Delete DB document: {}".format(path))
  76. os.unlink(path)
  77. del_cache(document)
  78. def exists(document):
  79. """Query whether a document exists."""
  80. path = mkpath(document)
  81. return os.path.isfile(path)
  82. def list_docs(path, recursive=False):
  83. """List all the documents at the path."""
  84. root = os.path.join(Config.db.db_root, path)
  85. docs = list()
  86. for item in sorted(os.listdir(root)):
  87. target = os.path.join(root, item)
  88. db_path = os.path.join(path, item)
  89. # Descend into subdirectories?
  90. if os.path.isdir(target):
  91. if recursive:
  92. docs += [
  93. os.path.join(item, name) for name in list_docs(db_path)
  94. ]
  95. else:
  96. continue
  97. if target.endswith(".json"):
  98. name = re.sub(r'\.json$', '', item)
  99. docs.append(name)
  100. return docs
  101. def mkpath(document):
  102. """Turn a DB path into a JSON file path."""
  103. if document.endswith(".json"):
  104. # Let's not do that.
  105. raise Exception("mkpath: document path already includes .json extension!")
  106. return "{}/{}.json".format(Config.db.db_root, str(document))
  107. def read_json(path):
  108. """Slurp, decode and return the data from a JSON document."""
  109. path = str(path)
  110. if not os.path.isfile(path):
  111. raise Exception("Can't read JSON file {}: file not found!".format(path))
  112. # Don't allow any fishy looking paths.
  113. if ".." in path:
  114. logger.error("ERROR: JsonDB tried to read a path with two dots: {}".format(path))
  115. raise Exception()
  116. # Open and lock the file.
  117. fh = codecs.open(path, 'r', 'utf-8')
  118. flock(fh, LOCK_SH)
  119. text = fh.read()
  120. flock(fh, LOCK_UN)
  121. fh.close()
  122. # Decode.
  123. try:
  124. data = json.loads(text)
  125. except:
  126. logger.error("Couldn't decode JSON data from {}".format(path))
  127. handle_exception(Exception("Couldn't decode JSON from {}\n{}".format(
  128. path,
  129. text,
  130. )))
  131. data = None
  132. return data
  133. def write_json(path, data):
  134. """Write a JSON document."""
  135. path = str(path)
  136. # Don't allow any fishy looking paths.
  137. if ".." in path:
  138. logger.error("ERROR: JsonDB tried to write a path with two dots: {}".format(path))
  139. raise Exception()
  140. logger.debug("JsonDB: WRITE > {}".format(path))
  141. # Open and lock the file.
  142. fh = codecs.open(path, 'w', 'utf-8')
  143. flock(fh, LOCK_EX)
  144. # Write it.
  145. fh.write(json.dumps(data, sort_keys=True, indent=4, separators=(',', ': ')))
  146. # Unlock and close.
  147. flock(fh, LOCK_UN)
  148. fh.close()
  149. ############################################################################
  150. # Redis Caching Functions #
  151. ############################################################################
  152. disable_redis = False
  153. def get_redis():
  154. """Connect to Redis or return the existing connection."""
  155. global redis_client
  156. global disable_redis
  157. if not redis_client and not disable_redis:
  158. try:
  159. redis_client = redis.StrictRedis(
  160. host = Config.db.redis_host,
  161. port = Config.db.redis_port,
  162. db = Config.db.redis_db,
  163. )
  164. redis_client.ping()
  165. except Exception as e:
  166. logger.error("Couldn't connect to Redis; memory caching will be disabled! {}".format(e))
  167. redis_client = None
  168. disable_redis = True
  169. return redis_client
  170. def set_cache(key, value, expires=None):
  171. """Set a key in the Redis cache."""
  172. key = Config.db.redis_prefix + key
  173. client = get_redis()
  174. if not client:
  175. return
  176. try:
  177. client.set(key, json.dumps(value))
  178. # Expiration date?
  179. if expires:
  180. client.expire(key, expires)
  181. except:
  182. logger.error("Redis exception: couldn't set_cache {}".format(key))
  183. def get_cache(key):
  184. """Get a cached item."""
  185. key = Config.db.redis_prefix + key
  186. value = None
  187. client = get_redis()
  188. if not client:
  189. return
  190. try:
  191. value = client.get(key)
  192. if value:
  193. value = json.loads(value)
  194. except:
  195. logger.warning("Redis exception: couldn't get_cache {}".format(key))
  196. value = None
  197. return value
  198. def del_cache(key):
  199. """Delete a cached item."""
  200. key = Config.db.redis_prefix + key
  201. client = get_redis()
  202. if not client:
  203. return
  204. client.delete(key)
  205. def lock_cache(key, timeout=5, expire=20):
  206. """Cache level 'file locking' implementation.
  207. The `key` will be automatically suffixed with `_lock`.
  208. The `timeout` is the max amount of time to wait for a lock.
  209. The `expire` is how long a lock may exist before it's considered stale.
  210. Returns True on success, None on failure to acquire lock."""
  211. client = get_redis()
  212. if not client:
  213. return
  214. # Take the lock.
  215. lock = client.lock(key, timeout=expire)
  216. lock.acquire()
  217. logger.debug("Cache lock acquired: {}, expires in {}s".format(key, expire))
  218. return lock
  219. def unlock_cache(lock):
  220. """Release the lock on a cache key."""
  221. if lock:
  222. lock.release()
  223. logger.debug("Cache lock released")