blog/cmd/tumblr-import/tumblr-import.py

245 lines
7.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Import a Tumblr blog.
This is written in Python because the Tumblr JSON is too hairy for Go (dynamic
keys and such).
Usage: tumblr-import.py --root /path/to/blog/root example.tumblr.com
It initializes your blog settings with the metadata (title/description), then
starts crawling all of your blog posts from oldest to newest, creating blog
posts in the new Blog CMS. This way post ID #1 matches with your first Tumblr
post!
All images referenced in photo posts are downloaded to ``<root>/static/photos/``
with their original ``tumblr_XXXXXXX.jpg`` file names. The blog entries simply
list these images with ``<img>`` tags.
"""
import argparse
from datetime import datetime
import json
import logging
import os
import re
import requests
logging.basicConfig()
log = logging.getLogger("*")
log.setLevel(logging.INFO)
def main(tumblr, root, scheme):
log.info("Starting tumblr-import from blog: %s", tumblr)
API_ROOT = "{}://{}/api/read/json".format(scheme, tumblr)
log.info("Root Tumblr API URL: %s", API_ROOT)
try:
r = api_get(API_ROOT)
except Exception as e:
log.info("Failed to get Tumblr API root at %s: %s", API_ROOT, e)
exit(1)
# Ingest the site settings from the first page of results.
SiteSettings = {
"site": {
"title": r["tumblelog"]["title"],
"description": r["tumblelog"]["description"],
},
}
log.info("Blog title: %s", SiteSettings["site"]["title"])
log.info("Description: %s", SiteSettings["site"]["description"])
write_db(root, "app/settings", SiteSettings)
posts_total = r["posts-total"] # n
log.info("Total Posts: %d", posts_total)
# Go to the last page and work backwards.
per_page = 50
posts_start = posts_total - per_page
if posts_start < 0:
posts_start = 0
POST_ID = 0
while posts_start >= 0:
log.info("GET PAGE start=%d num=%d\n", posts_start, per_page)
r = api_get(API_ROOT, start=posts_start, num=per_page)
with open("tumblr-import.json", "w") as fh:
fh.write(json.dumps(r, indent=4))
# Ingest the posts in reverse order.
for post in r["posts"][::-1]:
log.info("{id}: {type} [{date_gmt}]".format(
id=post["id"],
type=post["type"],
date_gmt=post["date-gmt"],
))
timestamp = datetime.fromtimestamp(post["unix-timestamp"])
timestamp = timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
slug = post.get("slug")
if not slug:
slug = re.sub(r'.*{}/'.format(tumblr), '', post.get("url-with-slug"))
# Post model to build up from this.
POST_ID += 1
Post = dict(
id=POST_ID,
title="",
fragment=slug,
contentType="html",
author=1,
body="",
privacy="public",
sticky=False,
enableComments=True,
tags=post.get("tags", []),
created=timestamp,
updated=timestamp,
)
# Photo posts
if post["type"] == "photo":
# Search it for the best quality image.
images = _download_images(root, _all_photos(post))
img_tags = "\n".join([ '<p><img src="{}"></p>\n'.format(i) for i in images ])
Post["body"] = (
'{}\n\n{}'
.format(img_tags, post.get("photo-caption"))
.strip()
)
elif post["type"] == "answer":
Post["body"] = (
'<blockquote><strong>Anonymous</strong> asked:\n'
'<p>{question}</p>\n'
'</blockquote>\n\n{answer}'
.format(question=post.get("question"), answer=post.get("answer"))
)
elif post["type"] == "regular":
Post["title"] = post["regular-title"]
Post["body"] = post["regular-body"]
elif post["type"] == "video":
log.error("No support for video posts!")
else:
print("UNKNOWN TYPE OF POST!")
print(json.dumps(post, indent=2))
input()
# Add an import notice suffix.
Post["body"] += (
"\n\n"
'<span class="text-muted">Imported from Tumblr where it had {} notes.</span>'
.format(post.get("note-count"))
)
write_db(root, "blog/posts/{}".format(Post["id"]), Post)
write_db(root, "blog/fragments/{}".format(Post["fragment"]), dict(
id=Post["id"],
))
if posts_start == 0:
break
posts_start -= per_page
if posts_start < 0:
posts_start = 0
def write_db(root, path, data):
"""
Write JSON data to a DB path in the Blog DB.
Parameters:
path (str): DB path like `blog/posts/123`
data (dict): dict of JSON serializable data to write.
"""
dirs, filename = path.rsplit("/", 1)
dirs = os.path.join(root, ".private", dirs)
if not os.path.isdir(dirs):
log.info("CREATE DIRECTORY: %s", dirs)
os.makedirs(dirs)
with open(os.path.join(dirs, filename+".json"), "w") as fh:
fh.write(json.dumps(data, indent=4))
def api_get(root, start=0, num=20):
url = root + "?start={}&num={}".format(start, num)
log.info("API GET: %s", url)
r = requests.get(url)
if r.ok:
body = r.content.decode()
m = re.match(r'var tumblr_api_read = (.+);', body)
if m:
return json.loads(m.group(1))
raise Exception("didn't get expected JSON wrapped response")
def _all_photos(data):
"""Extract all photos from a photo post."""
main_photo = _best_resolution(data)
result = [ main_photo ]
if "photos" in data:
for photo in data["photos"]:
best = _best_resolution(photo)
result.append(best)
return result
def _best_resolution(data):
"""Find the best image resolution from post json."""
highest = 0
best = ""
for k, v in data.items():
m = re.match(r'^photo-url-(\d+)$', k)
if m:
width = int(m.group(1))
if width > highest:
highest = width
best = v
return best
def _download_images(root, images):
"""Download an array of images. Return array of file paths."""
result = []
if not os.path.isdir(os.path.join(root, "static/photos")):
os.makedirs(os.path.join(root, "static/photos"))
for i, url in enumerate(images):
log.info("DOWNLOAD: %s", url)
filename = url.rsplit("/", 1)[-1]
r = requests.get(url)
if r.ok:
filename = "static/photos/"+filename
if os.path.isfile(os.path.join(root, filename)):
continue
with open(os.path.join(root, filename), "wb") as fh:
fh.write(r.content)
result.append("/"+filename)
else:
result.append(url)
return result
if __name__ == "__main__":
parser = argparse.ArgumentParser("Tumblr importer")
parser.add_argument("--root",
help="Output website root for the blog.",
type=str,
required=True,
)
parser.add_argument("--nossl",
help="Don't use SSL when connecting to the tumblr site",
action="store_true",
)
parser.add_argument("tumblr",
help="Tumblr blog name, in username.tumblr.com format.",
)
args = parser.parse_args()
main(
tumblr=args.tumblr,
root=args.root,
scheme="http" if args.nossl else "https",
)