A Python content management system designed for kirsle.net featuring a blog, comments and photo albums. https://rophako.kirsle.net/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

jsondb.py 7.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. # -*- coding: utf-8 -*-
  2. from __future__ import unicode_literals, print_function, absolute_import
  3. """JSON flat file database system."""
  4. import codecs
  5. import os
  6. import os.path
  7. import re
  8. from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN
  9. import redis
  10. import json
  11. import time
  12. from rophako.settings import Config
  13. from rophako.utils import handle_exception
  14. from rophako.log import logger
  15. redis_client = None
  16. cache_lifetime = 60*60 # 1 hour
  17. def get(document, cache=True):
  18. """Get a specific document from the DB."""
  19. logger.debug("JsonDB: GET {}".format(document))
  20. # Exists?
  21. if not exists(document):
  22. logger.debug("Requested document doesn't exist")
  23. return None
  24. path = mkpath(document)
  25. stat = os.stat(path)
  26. # Do we have it cached?
  27. data = get_cache(document) if cache else None
  28. if data:
  29. # Check if the cache is fresh.
  30. if stat.st_mtime > get_cache(document+"_mtime"):
  31. del_cache(document)
  32. del_cache(document+"_mtime")
  33. else:
  34. return data
  35. # Get a lock for reading.
  36. lock = lock_cache(document)
  37. # Get the JSON data.
  38. data = read_json(path)
  39. # Unlock!
  40. unlock_cache(lock)
  41. # Cache and return it.
  42. if cache:
  43. set_cache(document, data, expires=cache_lifetime)
  44. set_cache(document+"_mtime", stat.st_mtime, expires=cache_lifetime)
  45. return data
  46. def commit(document, data, cache=True):
  47. """Insert/update a document in the DB."""
  48. # Only allow one commit at a time.
  49. lock = lock_cache(document)
  50. # Need to create the file?
  51. path = mkpath(document)
  52. if not os.path.isfile(path):
  53. parts = path.split("/")
  54. parts.pop() # Remove the file part
  55. directory = list()
  56. # Create all the folders.
  57. for part in parts:
  58. directory.append(part)
  59. segment = "/".join(directory)
  60. if len(segment) > 0 and not os.path.isdir(segment):
  61. logger.debug("JsonDB: mkdir {}".format(segment))
  62. os.mkdir(segment, 0o755)
  63. # Write the JSON.
  64. write_json(path, data)
  65. # Update the cached document.
  66. if cache:
  67. set_cache(document, data, expires=cache_lifetime)
  68. set_cache(document+"_mtime", time.time(), expires=cache_lifetime)
  69. # Release the lock.
  70. unlock_cache(lock)
  71. def delete(document):
  72. """Delete a document from the DB."""
  73. path = mkpath(document)
  74. if os.path.isfile(path):
  75. logger.debug("Delete DB document: {}".format(path))
  76. os.unlink(path)
  77. del_cache(document)
  78. def exists(document):
  79. """Query whether a document exists."""
  80. path = mkpath(document)
  81. return os.path.isfile(path)
  82. def list_docs(path, recursive=False):
  83. """List all the documents at the path."""
  84. root = os.path.join(Config.db.db_root, path)
  85. docs = list()
  86. if not os.path.isdir(root):
  87. return []
  88. for item in sorted(os.listdir(root)):
  89. target = os.path.join(root, item)
  90. db_path = os.path.join(path, item)
  91. # Descend into subdirectories?
  92. if os.path.isdir(target):
  93. if recursive:
  94. docs += [
  95. os.path.join(item, name) for name in list_docs(db_path)
  96. ]
  97. else:
  98. continue
  99. if target.endswith(".json"):
  100. name = re.sub(r'\.json$', '', item)
  101. docs.append(name)
  102. return docs
  103. def mkpath(document):
  104. """Turn a DB path into a JSON file path."""
  105. if document.endswith(".json"):
  106. # Let's not do that.
  107. raise Exception("mkpath: document path already includes .json extension!")
  108. return "{}/{}.json".format(Config.db.db_root, str(document))
  109. def read_json(path):
  110. """Slurp, decode and return the data from a JSON document."""
  111. path = str(path)
  112. if not os.path.isfile(path):
  113. raise Exception("Can't read JSON file {}: file not found!".format(path))
  114. # Don't allow any fishy looking paths.
  115. if ".." in path:
  116. logger.error("ERROR: JsonDB tried to read a path with two dots: {}".format(path))
  117. raise Exception()
  118. # Open and lock the file.
  119. fh = codecs.open(path, 'r', 'utf-8')
  120. flock(fh, LOCK_SH)
  121. text = fh.read()
  122. flock(fh, LOCK_UN)
  123. fh.close()
  124. # Decode.
  125. try:
  126. data = json.loads(text)
  127. except:
  128. logger.error("Couldn't decode JSON data from {}".format(path))
  129. handle_exception(Exception("Couldn't decode JSON from {}\n{}".format(
  130. path,
  131. text,
  132. )))
  133. data = None
  134. return data
  135. def write_json(path, data):
  136. """Write a JSON document."""
  137. path = str(path)
  138. # Don't allow any fishy looking paths.
  139. if ".." in path:
  140. logger.error("ERROR: JsonDB tried to write a path with two dots: {}".format(path))
  141. raise Exception()
  142. logger.debug("JsonDB: WRITE > {}".format(path))
  143. # Open and lock the file.
  144. fh = codecs.open(path, 'w', 'utf-8')
  145. flock(fh, LOCK_EX)
  146. # Write it.
  147. fh.write(json.dumps(data, indent=4, separators=(',', ': ')))
  148. # Unlock and close.
  149. flock(fh, LOCK_UN)
  150. fh.close()
  151. ############################################################################
  152. # Redis Caching Functions #
  153. ############################################################################
  154. disable_redis = False
  155. def get_redis():
  156. """Connect to Redis or return the existing connection."""
  157. global redis_client
  158. global disable_redis
  159. if not redis_client and not disable_redis:
  160. try:
  161. redis_client = redis.StrictRedis(
  162. host = Config.db.redis_host,
  163. port = Config.db.redis_port,
  164. db = Config.db.redis_db,
  165. )
  166. redis_client.ping()
  167. except Exception as e:
  168. logger.error("Couldn't connect to Redis; memory caching will be disabled! {}".format(e))
  169. redis_client = None
  170. disable_redis = True
  171. return redis_client
  172. def set_cache(key, value, expires=None):
  173. """Set a key in the Redis cache."""
  174. key = Config.db.redis_prefix + key
  175. client = get_redis()
  176. if not client:
  177. return
  178. try:
  179. client.set(key, json.dumps(value))
  180. # Expiration date?
  181. if expires:
  182. client.expire(key, expires)
  183. except:
  184. logger.error("Redis exception: couldn't set_cache {}".format(key))
  185. def get_cache(key):
  186. """Get a cached item."""
  187. key = Config.db.redis_prefix + key
  188. value = None
  189. client = get_redis()
  190. if not client:
  191. return
  192. try:
  193. value = client.get(key)
  194. if value:
  195. value = json.loads(value)
  196. except:
  197. logger.debug("Redis exception: couldn't get_cache {}".format(key))
  198. value = None
  199. return value
  200. def del_cache(key):
  201. """Delete a cached item."""
  202. key = Config.db.redis_prefix + key
  203. client = get_redis()
  204. if not client:
  205. return
  206. client.delete(key)
  207. def lock_cache(key, timeout=5, expire=20):
  208. """Cache level 'file locking' implementation.
  209. The `key` will be automatically suffixed with `_lock`.
  210. The `timeout` is the max amount of time to wait for a lock.
  211. The `expire` is how long a lock may exist before it's considered stale.
  212. Returns True on success, None on failure to acquire lock."""
  213. client = get_redis()
  214. if not client:
  215. return
  216. # Take the lock.
  217. lock = client.lock(key, timeout=expire)
  218. lock.acquire()
  219. logger.debug("Cache lock acquired: {}, expires in {}s".format(key, expire))
  220. return lock
  221. def unlock_cache(lock):
  222. """Release the lock on a cache key."""
  223. if lock:
  224. lock.release()
  225. logger.debug("Cache lock released")