A Python content management system designed for kirsle.net featuring a blog, comments and photo albums. https://rophako.kirsle.net/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

jsondb.py 7.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. # -*- coding: utf-8 -*-
  2. from __future__ import unicode_literals
  3. """JSON flat file database system."""
  4. import codecs
  5. import os
  6. import os.path
  7. import glob
  8. import re
  9. from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN
  10. import redis
  11. import json
  12. import time
  13. from rophako.settings import Config
  14. from rophako.utils import handle_exception
  15. from rophako.log import logger
  16. redis_client = None
  17. cache_lifetime = 60*60 # 1 hour
  18. def get(document, cache=True):
  19. """Get a specific document from the DB."""
  20. logger.debug("JsonDB: GET {}".format(document))
  21. # Exists?
  22. if not exists(document):
  23. logger.debug("Requested document doesn't exist")
  24. return None
  25. path = mkpath(document)
  26. stat = os.stat(path)
  27. # Do we have it cached?
  28. data = get_cache(document) if cache else None
  29. if data:
  30. # Check if the cache is fresh.
  31. if stat.st_mtime > get_cache(document+"_mtime"):
  32. del_cache(document)
  33. del_cache(document+"_mtime")
  34. else:
  35. return data
  36. # Get a lock for reading.
  37. lock = lock_cache(document)
  38. # Get the JSON data.
  39. data = read_json(path)
  40. # Unlock!
  41. unlock_cache(lock)
  42. # Cache and return it.
  43. if cache:
  44. set_cache(document, data, expires=cache_lifetime)
  45. set_cache(document+"_mtime", stat.st_mtime, expires=cache_lifetime)
  46. return data
  47. def commit(document, data, cache=True):
  48. """Insert/update a document in the DB."""
  49. # Only allow one commit at a time.
  50. lock = lock_cache(document)
  51. # Need to create the file?
  52. path = mkpath(document)
  53. if not os.path.isfile(path):
  54. parts = path.split("/")
  55. parts.pop() # Remove the file part
  56. directory = list()
  57. # Create all the folders.
  58. for part in parts:
  59. directory.append(part)
  60. segment = "/".join(directory)
  61. if len(segment) > 0 and not os.path.isdir(segment):
  62. logger.debug("JsonDB: mkdir {}".format(segment))
  63. os.mkdir(segment, 0o755)
  64. # Write the JSON.
  65. write_json(path, data)
  66. # Update the cached document.
  67. if cache:
  68. set_cache(document, data, expires=cache_lifetime)
  69. set_cache(document+"_mtime", time.time(), expires=cache_lifetime)
  70. # Release the lock.
  71. unlock_cache(lock)
  72. def delete(document):
  73. """Delete a document from the DB."""
  74. path = mkpath(document)
  75. if os.path.isfile(path):
  76. logger.info("Delete DB document: {}".format(path))
  77. os.unlink(path)
  78. del_cache(document)
  79. def exists(document):
  80. """Query whether a document exists."""
  81. path = mkpath(document)
  82. return os.path.isfile(path)
  83. def list_docs(path, recursive=False):
  84. """List all the documents at the path."""
  85. root = os.path.join(Config.db.db_root, path)
  86. docs = list()
  87. for item in sorted(os.listdir(root)):
  88. target = os.path.join(root, item)
  89. db_path = os.path.join(path, item)
  90. # Descend into subdirectories?
  91. if os.path.isdir(target):
  92. if recursive:
  93. docs += [
  94. os.path.join(item, name) for name in list_docs(db_path)
  95. ]
  96. else:
  97. continue
  98. if target.endswith(".json"):
  99. name = re.sub(r'\.json$', '', item)
  100. docs.append(name)
  101. return docs
  102. def mkpath(document):
  103. """Turn a DB path into a JSON file path."""
  104. if document.endswith(".json"):
  105. # Let's not do that.
  106. raise Exception("mkpath: document path already includes .json extension!")
  107. return "{}/{}.json".format(Config.db.db_root, str(document))
  108. def read_json(path):
  109. """Slurp, decode and return the data from a JSON document."""
  110. path = str(path)
  111. if not os.path.isfile(path):
  112. raise Exception("Can't read JSON file {}: file not found!".format(path))
  113. # Don't allow any fishy looking paths.
  114. if ".." in path:
  115. logger.error("ERROR: JsonDB tried to read a path with two dots: {}".format(path))
  116. raise Exception()
  117. # Open and lock the file.
  118. fh = codecs.open(path, 'r', 'utf-8')
  119. flock(fh, LOCK_SH)
  120. text = fh.read()
  121. flock(fh, LOCK_UN)
  122. fh.close()
  123. # Decode.
  124. try:
  125. data = json.loads(text)
  126. except:
  127. logger.error("Couldn't decode JSON data from {}".format(path))
  128. handle_exception(Exception("Couldn't decode JSON from {}\n{}".format(
  129. path,
  130. text,
  131. )))
  132. data = None
  133. return data
  134. def write_json(path, data):
  135. """Write a JSON document."""
  136. path = str(path)
  137. # Don't allow any fishy looking paths.
  138. if ".." in path:
  139. logger.error("ERROR: JsonDB tried to write a path with two dots: {}".format(path))
  140. raise Exception()
  141. logger.debug("JsonDB: WRITE > {}".format(path))
  142. # Open and lock the file.
  143. fh = codecs.open(path, 'w', 'utf-8')
  144. flock(fh, LOCK_EX)
  145. # Write it.
  146. fh.write(json.dumps(data, sort_keys=True, indent=4, separators=(',', ': ')))
  147. # Unlock and close.
  148. flock(fh, LOCK_UN)
  149. fh.close()
  150. ############################################################################
  151. # Redis Caching Functions #
  152. ############################################################################
  153. disable_redis = False
  154. def get_redis():
  155. """Connect to Redis or return the existing connection."""
  156. global redis_client
  157. global disable_redis
  158. if not redis_client and not disable_redis:
  159. try:
  160. redis_client = redis.StrictRedis(
  161. host = Config.db.redis_host,
  162. port = Config.db.redis_port,
  163. db = Config.db.redis_db,
  164. )
  165. redis_client.ping()
  166. except Exception as e:
  167. logger.error("Couldn't connect to Redis; memory caching will be disabled! {}".format(e.message))
  168. redis_client = None
  169. disable_redis = True
  170. return redis_client
  171. def set_cache(key, value, expires=None):
  172. """Set a key in the Redis cache."""
  173. key = Config.db.redis_prefix + key
  174. client = get_redis()
  175. if not client:
  176. return
  177. try:
  178. client.set(key, json.dumps(value))
  179. # Expiration date?
  180. if expires:
  181. client.expire(key, expires)
  182. except:
  183. logger.error("Redis exception: couldn't set_cache {}".format(key))
  184. def get_cache(key):
  185. """Get a cached item."""
  186. key = Config.db.redis_prefix + key
  187. value = None
  188. client = get_redis()
  189. if not client:
  190. return
  191. try:
  192. value = client.get(key)
  193. if value:
  194. value = json.loads(value)
  195. except:
  196. logger.warning("Redis exception: couldn't get_cache {}".format(key))
  197. value = None
  198. return value
  199. def del_cache(key):
  200. """Delete a cached item."""
  201. key = Config.db.redis_prefix + key
  202. client = get_redis()
  203. if not client:
  204. return
  205. client.delete(key)
  206. def lock_cache(key, timeout=5, expire=20):
  207. """Cache level 'file locking' implementation.
  208. The `key` will be automatically suffixed with `_lock`.
  209. The `timeout` is the max amount of time to wait for a lock.
  210. The `expire` is how long a lock may exist before it's considered stale.
  211. Returns True on success, None on failure to acquire lock."""
  212. client = get_redis()
  213. if not client:
  214. return
  215. # Take the lock.
  216. lock = client.lock(key, timeout=expire)
  217. lock.acquire()
  218. logger.debug("Cache lock acquired: {}, expires in {}s".format(key, expire))
  219. return lock
  220. def unlock_cache(lock):
  221. """Release the lock on a cache key."""
  222. if lock:
  223. lock.release()
  224. logger.debug("Cache lock released")