A Python content management system designed for kirsle.net featuring a blog, comments and photo albums. https://rophako.kirsle.net/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
4.2KB

  1. # -*- coding: utf-8 -*-
  2. """User account models."""
  3. import bcrypt
  4. import time
  5. import config
  6. import rophako.jsondb as JsonDB
  7. from rophako.log import logger
  8. def create(username, password, name=None, uid=None, role="user"):
  9. """Create a new user account.
  10. Returns the user ID number assigned to this user."""
  11. # Name defaults to username.
  12. if name is None:
  13. name = username
  14. username = username.lower()
  15. # Provided with a user ID?
  16. if uid is not None:
  17. # See if it's available.
  18. if exists(uid=uid):
  19. logger.warning("Wanted to use UID {} for user {} but it wasn't available.".format(uid, username))
  20. uid = None
  21. # Need to generate a UID?
  22. if uid is None:
  23. uid = get_next_uid()
  24. uid = int(uid)
  25. # Username musn't exist.
  26. if exists(username):
  27. # The front-end shouldn't let this happen.
  28. raise Exception("Can't create username {}: already exists!".format(username))
  29. # Crypt their password.
  30. hashedpass = hash_password(password)
  31. logger.info("Create user {} with username {}".format(uid, username))
  32. # Create the user file.
  33. JsonDB.commit("users/by-id/{}".format(uid), dict(
  34. uid=uid,
  35. username=username,
  36. name=name,
  37. role=role,
  38. password=hashedpass,
  39. created=time.time(),
  40. ))
  41. # And their username to ID map.
  42. JsonDB.commit("users/by-name/{}".format(username), dict(
  43. uid=uid,
  44. ))
  45. return uid
  46. def update_user(uid, data):
  47. """Update the user's data."""
  48. if not exists(uid=uid):
  49. raise Exception("Can't update user {}: doesn't exist!".format(uid))
  50. db = get_user(uid=uid)
  51. # Change of username?
  52. if "username" in data and len(data["username"]) and data["username"] != db["username"]:
  53. JsonDB.delete("users/by-name/{}".format(db["username"]))
  54. JsonDB.commit("users/by-name/{}".format(data["username"]), dict(
  55. uid=int(uid),
  56. ))
  57. db.update(data)
  58. JsonDB.commit("users/by-id/{}".format(uid), db)
  59. def delete_user(uid):
  60. """Delete a user account."""
  61. if not exists(uid=uid):
  62. return
  63. db = get_user(uid=uid)
  64. username = db["username"]
  65. # Mark the account deleted.
  66. update_user(uid, dict(
  67. username="",
  68. name="",
  69. role="deleted",
  70. password="!",
  71. ))
  72. # Delete their username.
  73. JsonDB.delete("users/by-name/{}".format(username))
  74. def list_users():
  75. """Get a sorted list of all users."""
  76. uids = JsonDB.list_docs("users/by-id")
  77. users = list()
  78. for uid in sorted(map(lambda x: int(x), uids)):
  79. db = get_user(uid=uid)
  80. if db["role"] == "deleted": continue
  81. users.append(db)
  82. return users
  83. def get_uid(username):
  84. """Turn a username into a user ID."""
  85. db = JsonDB.get("users/by-name/{}".format(username))
  86. if db:
  87. return int(db["uid"])
  88. return None
  89. def get_user(uid=None, username=None):
  90. """Get a user's DB file, or None if not found."""
  91. if username:
  92. uid = get_uid(username)
  93. logger.debug("get_user: resolved username {} to UID {}".format(username, uid))
  94. return JsonDB.get("users/by-id/{}".format(uid))
  95. def exists(uid=None, username=None):
  96. """Query whether a user ID or name exists."""
  97. if uid:
  98. return JsonDB.exists("users/by-id/{}".format(uid))
  99. elif username:
  100. return JsonDB.exists("users/by-name/{}".format(username.lower()))
  101. def hash_password(password):
  102. return bcrypt.hashpw(str(password), bcrypt.gensalt(config.BCRYPT_ITERATIONS))
  103. def check_auth(username, password):
  104. """Check the authentication credentials for the username and password.
  105. Returns a boolean true or false. On error, an error is logged."""
  106. # Check if the username exists.
  107. if not exists(username=username):
  108. logger.error("User authentication failed: username {} not found!".format(username))
  109. return False
  110. # Get the user's file.
  111. db = get_user(username=username)
  112. print db
  113. # Check the password.
  114. return bcrypt.hashpw(str(password), str(db["password"])) == db["password"]
  115. def get_next_uid():
  116. """Get the next available user ID."""
  117. uid = 1
  118. while exists(uid=uid):
  119. uid += 1
  120. return uid