A Python content management system designed for kirsle.net featuring a blog, comments and photo albums. https://rophako.kirsle.net/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

177 lines
4.6KB

  1. # -*- coding: utf-8 -*-
  2. from __future__ import unicode_literals, absolute_import
  3. """User account models."""
  4. import bcrypt
  5. import time
  6. from rophako.settings import Config
  7. import rophako.jsondb as JsonDB
  8. import rophako.model.photo as Photo
  9. from rophako.log import logger
  10. def create(username, password, name=None, uid=None, role="user"):
  11. """Create a new user account.
  12. Returns the user ID number assigned to this user."""
  13. # Name defaults to username.
  14. if name is None:
  15. name = username
  16. username = username.lower()
  17. # Provided with a user ID?
  18. if uid is not None:
  19. # See if it's available.
  20. if exists(uid=uid):
  21. logger.warning("Wanted to use UID {} for user {} but it wasn't available.".format(uid, username))
  22. uid = None
  23. # Need to generate a UID?
  24. if uid is None:
  25. uid = get_next_uid()
  26. uid = int(uid)
  27. # Username musn't exist.
  28. if exists(username):
  29. # The front-end shouldn't let this happen.
  30. raise Exception("Can't create username {}: already exists!".format(username))
  31. # Crypt their password.
  32. hashedpass = hash_password(password)
  33. logger.info("Create user {} with username {}".format(uid, username))
  34. # Create the user file.
  35. JsonDB.commit("users/by-id/{}".format(uid), dict(
  36. uid=uid,
  37. username=username,
  38. name=name,
  39. picture="",
  40. role=role,
  41. password=hashedpass,
  42. created=time.time(),
  43. ))
  44. # And their username to ID map.
  45. JsonDB.commit("users/by-name/{}".format(username), dict(
  46. uid=uid,
  47. ))
  48. return uid
  49. def update_user(uid, data):
  50. """Update the user's data."""
  51. if not exists(uid=uid):
  52. raise Exception("Can't update user {}: doesn't exist!".format(uid))
  53. db = get_user(uid=uid)
  54. # Change of username?
  55. if "username" in data and len(data["username"]) and data["username"] != db["username"]:
  56. JsonDB.delete("users/by-name/{}".format(db["username"]))
  57. JsonDB.commit("users/by-name/{}".format(data["username"]), dict(
  58. uid=int(uid),
  59. ))
  60. db.update(data)
  61. JsonDB.commit("users/by-id/{}".format(uid), db)
  62. def delete_user(uid):
  63. """Delete a user account."""
  64. if not exists(uid=uid):
  65. return
  66. db = get_user(uid=uid)
  67. username = db["username"]
  68. # Mark the account deleted.
  69. update_user(uid, dict(
  70. username="",
  71. name="",
  72. role="deleted",
  73. password="!",
  74. ))
  75. # Delete their username.
  76. JsonDB.delete("users/by-name/{}".format(username))
  77. def list_users():
  78. """Get a sorted list of all users."""
  79. uids = JsonDB.list_docs("users/by-id")
  80. users = list()
  81. for uid in sorted(map(lambda x: int(x), uids)):
  82. db = get_user(uid=uid)
  83. if db["role"] == "deleted": continue
  84. users.append(db)
  85. return users
  86. def get_uid(username):
  87. """Turn a username into a user ID."""
  88. db = JsonDB.get("users/by-name/{}".format(username))
  89. if db:
  90. return int(db["uid"])
  91. return None
  92. def get_user(uid=None, username=None):
  93. """Get a user's DB file, or None if not found."""
  94. if username:
  95. uid = get_uid(username)
  96. logger.debug("get_user: resolved username {} to UID {}".format(username, uid))
  97. return JsonDB.get("users/by-id/{}".format(uid))
  98. def get_picture(uid):
  99. """Get the chosen profile photo for the user."""
  100. data = get_user(uid=uid)
  101. pic = data["picture"]
  102. if len(pic):
  103. photo = Photo.get_photo(pic)
  104. if photo:
  105. return photo["avatar"]
  106. return None
  107. def exists(uid=None, username=None):
  108. """Query whether a user ID or name exists."""
  109. if uid:
  110. return JsonDB.exists("users/by-id/{}".format(uid))
  111. elif username:
  112. return JsonDB.exists("users/by-name/{}".format(username.lower()))
  113. def hash_password(password):
  114. return bcrypt.hashpw(str(password).encode("utf-8"), bcrypt.gensalt(int(Config.security.bcrypt_iterations))).decode("utf-8")
  115. def check_auth(username, password):
  116. """Check the authentication credentials for the username and password.
  117. Returns a boolean true or false. On error, an error is logged."""
  118. # Check if the username exists.
  119. if not exists(username=username):
  120. logger.error("User authentication failed: username {} not found!".format(username))
  121. return False
  122. # Get the user's file.
  123. db = get_user(username=username)
  124. # Check the password.
  125. test = bcrypt.hashpw(str(password).encode("utf-8"), str(db["password"]).encode("utf-8")).decode("utf-8")
  126. return test == db["password"]
  127. def get_next_uid():
  128. """Get the next available user ID."""
  129. uid = 1
  130. while exists(uid=uid):
  131. uid += 1
  132. return uid