A Python content management system designed for kirsle.net featuring a blog, comments and photo albums. https://rophako.kirsle.net/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

user.py 4.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. # -*- coding: utf-8 -*-
  2. """User account models."""
  3. import bcrypt
  4. import time
  5. import config
  6. import rophako.jsondb as JsonDB
  7. import rophako.model.photo as Photo
  8. from rophako.log import logger
  9. def create(username, password, name=None, uid=None, role="user"):
  10. """Create a new user account.
  11. Returns the user ID number assigned to this user."""
  12. # Name defaults to username.
  13. if name is None:
  14. name = username
  15. username = username.lower()
  16. # Provided with a user ID?
  17. if uid is not None:
  18. # See if it's available.
  19. if exists(uid=uid):
  20. logger.warning("Wanted to use UID {} for user {} but it wasn't available.".format(uid, username))
  21. uid = None
  22. # Need to generate a UID?
  23. if uid is None:
  24. uid = get_next_uid()
  25. uid = int(uid)
  26. # Username musn't exist.
  27. if exists(username):
  28. # The front-end shouldn't let this happen.
  29. raise Exception("Can't create username {}: already exists!".format(username))
  30. # Crypt their password.
  31. hashedpass = hash_password(password)
  32. logger.info("Create user {} with username {}".format(uid, username))
  33. # Create the user file.
  34. JsonDB.commit("users/by-id/{}".format(uid), dict(
  35. uid=uid,
  36. username=username,
  37. name=name,
  38. picture="",
  39. role=role,
  40. password=hashedpass,
  41. created=time.time(),
  42. ))
  43. # And their username to ID map.
  44. JsonDB.commit("users/by-name/{}".format(username), dict(
  45. uid=uid,
  46. ))
  47. return uid
  48. def update_user(uid, data):
  49. """Update the user's data."""
  50. if not exists(uid=uid):
  51. raise Exception("Can't update user {}: doesn't exist!".format(uid))
  52. db = get_user(uid=uid)
  53. # Change of username?
  54. if "username" in data and len(data["username"]) and data["username"] != db["username"]:
  55. JsonDB.delete("users/by-name/{}".format(db["username"]))
  56. JsonDB.commit("users/by-name/{}".format(data["username"]), dict(
  57. uid=int(uid),
  58. ))
  59. db.update(data)
  60. JsonDB.commit("users/by-id/{}".format(uid), db)
  61. def delete_user(uid):
  62. """Delete a user account."""
  63. if not exists(uid=uid):
  64. return
  65. db = get_user(uid=uid)
  66. username = db["username"]
  67. # Mark the account deleted.
  68. update_user(uid, dict(
  69. username="",
  70. name="",
  71. role="deleted",
  72. password="!",
  73. ))
  74. # Delete their username.
  75. JsonDB.delete("users/by-name/{}".format(username))
  76. def list_users():
  77. """Get a sorted list of all users."""
  78. uids = JsonDB.list_docs("users/by-id")
  79. users = list()
  80. for uid in sorted(map(lambda x: int(x), uids)):
  81. db = get_user(uid=uid)
  82. if db["role"] == "deleted": continue
  83. users.append(db)
  84. return users
  85. def get_uid(username):
  86. """Turn a username into a user ID."""
  87. db = JsonDB.get("users/by-name/{}".format(username))
  88. if db:
  89. return int(db["uid"])
  90. return None
  91. def get_user(uid=None, username=None):
  92. """Get a user's DB file, or None if not found."""
  93. if username:
  94. uid = get_uid(username)
  95. logger.debug("get_user: resolved username {} to UID {}".format(username, uid))
  96. return JsonDB.get("users/by-id/{}".format(uid))
  97. def get_picture(uid):
  98. """Get the chosen profile photo for the user."""
  99. data = get_user(uid=uid)
  100. pic = data["picture"]
  101. if len(pic):
  102. photo = Photo.get_photo(pic)
  103. if photo:
  104. return photo["avatar"]
  105. return None
  106. def exists(uid=None, username=None):
  107. """Query whether a user ID or name exists."""
  108. if uid:
  109. return JsonDB.exists("users/by-id/{}".format(uid))
  110. elif username:
  111. return JsonDB.exists("users/by-name/{}".format(username.lower()))
  112. def hash_password(password):
  113. return bcrypt.hashpw(str(password), bcrypt.gensalt(config.BCRYPT_ITERATIONS))
  114. def check_auth(username, password):
  115. """Check the authentication credentials for the username and password.
  116. Returns a boolean true or false. On error, an error is logged."""
  117. # Check if the username exists.
  118. if not exists(username=username):
  119. logger.error("User authentication failed: username {} not found!".format(username))
  120. return False
  121. # Get the user's file.
  122. db = get_user(username=username)
  123. # Check the password.
  124. return bcrypt.hashpw(str(password), str(db["password"])) == db["password"]
  125. def get_next_uid():
  126. """Get the next available user ID."""
  127. uid = 1
  128. while exists(uid=uid):
  129. uid += 1
  130. return uid