#!/usr/bin/env python
# Command Line Movable Type Client
# ================================
# This application allows you to edit/post entries on a Movable Type site via
# XML-RPC calls. For more information about this script, visit:
#
# http://scott.yang.id.au/2002/12/mtsendpy/
#
# HISTORY
# =======
#
# 1.1 - 19 Nov 2005
# + Add SSL support for proxy.
#
# 1.0 - 20 May 2005
# + time module related fix for Python 2.4.
# + Ensure all cells passed to print_table() function are in string-type.
#
# 0.6.1 - 6 Apr 2004
# + Properly handles mt_allow_comments for MT2.6 servers.
#
# 0.6 - 1 Apr 2004
# + Add build-in support for HTTP proxy server, which is detected via
# environment variable HTTP_PROXY.
# + Alternative encoding for XML-RPC packets.
#
# 0.5 - 14 Oct 2003
# + Remove the support of MT2.5. Use the older version of mtsend.py if you
# need these supports.
# + Support KEYWORDS and PING into the header.
# + Add new functionalities provided by MT2.6's backend.
# - List out trackback pings of a post.
# - List out text filters installed.
# + Documentation in the source code.
#
# 0.4 - 10 Mar 2003
# + Support the new metaWeblog.newMediaObject() function via mtsend.py -U
# filename, i.e. you can now upload text/binary files to your
# MovableType site via mtsend.py!
# + Use mt.getRecentPostTitles() function in MT2.6 to save bandwidth.
# + Some bug fixes due to some inconsistency between MT2.6 and MT2.5.
#
# 0.3 - 3 Jan 2003
# + Make it to work on Python 2.1. "xmlrpclib" needs to be downloaded
# separately. It is tested
# on Python 2.1.2 for Windows.
#
# 0.2 - 30 Dec 2002
# + Fixed a bug in saving the post entry back, where new line characters
# are stripped.
#
# 0.1 - 30 Dec 2002
# + Initial public version
#
#
# CONFIGURATION FILE
# ==================
# Configuration file for mtsend.py is in the style of Windows INI files, which
# consist of sections and key/value pairs. There are 3 main sections - global,
# site and blog.
#
# Global Section:
# There is only one key/value in this section, and it is used to note the
# default blog alias to use if it is not provided on the command line.
# For example:
#
# [global]
# default=example
#
# It shows the default blog alias will be 'example'
#
# Site Section:
# You can have multiple site sections for each Movable Type installation
# you have access to. The section name will be [site-"site name"]. For
# example:
#
# [site-test]
# url=http://testdomain.com/mtinstall/mt-xmlrpc.cgi
# username=foo
# password=bar
# encoding=UTF-8
#
# It defines site "test" with the URL to the MovableType's XML-RPC CGI
# script, and the username/password used to access that site. "encoding" is
# optional, and defaults to UTF-8.
#
# Blog Section:
# You can have multiple blog sections for each Movable Type blogs you have
# on the sites you have access to. Blogs are distinguished by their 'alias',
# which you can select in the command line using -a. The section name for
# this blog will be [blog-"blog alias"]. For example,
#
# [blog-example]
# site=test
# blogid=3
#
# Each blog section must have "site", which indicates the site this blog
# belongs to, so that mtsend would be able to locate site-related
# information from the configuration file. It also needs the blog ID on that
# site. To find out all the blog IDs, you can use -B "site name" to print
# out the list.
#
#
# POST FORMAT
# ===========
# When editing or posting via mtsend, the post needs to be in a specific
# format. The format is very close to Movable Type's import/export format,
# which is documented here:
#
# http://www.movabletype.org/docs/mtmanual_importing.html
#
# It consists of a header and body. For example:
#
# [header1]: [value1]
# [header2]: [value2]
# [header3]: [value3]
# -----
# BODY:
# ....
# -----
# EXTENDED BODY:
# ....
# -----
# EXCERPT:
# ....
#
# Extended body and excerpt are optional in a post. Most header elements are
# optional when you are creating a new post. If they do not provide a value,
# then the default value configured by Movable Type will be used.
#
# These are the header keys/values:
#
# TITLE:
# The title of this post.
#
# ALLOW COMMENTS: 0/1
# Whether this post allows comments.
#
# ALLOW PINGS: 0/1
# Whether this post allows trackback pings.
#
# CATEGORY:
# The category associated with this post entry. You can have multiple
# CATEGORY in the header. The first CATEGORY automatically becomes the
# primary category, if PRIMARY CATEGORY is not specified.
#
# CONVERT BREAKS: 0/1/customised text filter name.
# Whether the line break will be automatically converted into
and
#
when posted. It can also be the name of an installed text filer. To
# get the list of installed text filter, use mtsend.py -T
#
# DATE: dd/mm/yyyy HH:MM:SS [AM|PM]
# The post date. It might not work if you are creating a new post.
#
# KEYWORDS:
# The keywords of your post.
#
# PING:
# The URL to be pinged during posting. You can have multiple PING in the
# header.
'''\
Usage:
mtsend.py [action] [options]
Actions:
-B site List all the blogs you can access in [site]. Site has to be in
the configuration file.
-C Print out a list of existing categories.
-E postid Edit an old post. It will read the post entry from the
standard input, in Movable Type's import/export format, and
then save it back to the server. If the value is '-', then it
will try to detect the postid from the input message itself.
-G postid Retrieve/get post from the blog. If the value is '-', it will
then try to get the most recent blog entry. Retrieved entry
will be printed to the standard output.
-L num List the most recent [num] posts.
-N Posting a new blog. The entry, in the Movable Type
import/export format, is read from the standard input.
-P postid List out trackback pings to this post.
-R postid Rebuild all the static files related to this entry.
-T List out the text filters installed on the server.
-U filename Upload a file, reading from standard input, to the blog site,
with destination filename provided.
-V Show version information.
Options:
-a alias Use "alias" as the blog alias. This script will locate
relavent site URL/username/password information using this
alias.
-c config Load "config" as configuration file, instead of $HOME/.mtsendrc
-h Display this help message.
-q Decrease verbose level.
-v Increase verbose level. Message goes to standard error.
For more information, please visit:
http://scott.yang.id.au/2002/12/mtsendpy/
'''
__author__ = 'Scott Yang '
__copyright__ = 'Copyright (c) 2002-2005 Scott Yang'
__date__ = '2005-11-19'
__version__ = '1.1'
import ConfigParser
import httplib
import os
import re
import sys
import time
import urllib
class MTSend(object):
def __init__(self):
self.alias = None
self.input = None
self.config = None
self.mode = None
self.verbose = 1
self.rpcsrv = None
self.site = None
self.modeopt = None
def execute(self):
try:
handler = getattr(self, 'execute_%s' % self.mode)
except AttributeError:
raise Exception, 'Unknown execution mode: %s' % self.mode
else:
handler()
def execute_b(self):
self.site = self.modeopt
srv = self.getRPCServer()
blogs = srv.blogger.getUsersBlogs('', self.get_username(),
self.get_password())
result = [['ID', 'Blog Name', 'URL']]
for blog in blogs:
result.append([blog['blogid'], blog['blogName'], blog['url']])
print_table(result)
def execute_c(self):
srv = self.getRPCServer()
cts = srv.mt.getCategoryList(self.get_blogid(), self.get_username(),
self.get_password())
result = []
for cat in cts:
result.append([cat['categoryId'], cat['categoryName']])
result.sort(lambda x, y: cmp(x[1], y[1]))
result[0:0] = [['ID', 'Category Name']]
print_table(result)
def execute_e(self):
self.log(1, 'Parsing post entry from standard input...')
post, cts, publish = parse_post()
postid = self.modeopt
if self.modeopt == '-':
try:
postid = post['postid']
except KeyError:
raise Exception, 'Cannot discover post ID from the input.'
elif post.has_key('postid') and (post['postid'] != postid):
raise Exception, \
'Post ID does not match. ID in the input is "%s"' % \
post['postid']
srv = self.getRPCServer()
self.log(1, 'Saving post entry "%s"...', postid)
srv.metaWeblog.editPost(postid, self.get_username(),
self.get_password(), post, publish)
cts = self._fixCategories(cts)
if len(cts) > 0:
self.log(1, 'Add categories "%s" to post entry "%s"...',
','.join([cat['categoryId'] for cat in cts]), postid)
srv.mt.setPostCategories(postid, self.get_username(),
self.get_password(), cts)
def execute_g(self):
srv = self.getRPCServer()
if self.modeopt.lower() == '-':
self.log(1, 'Retrieve most recent post entry...')
post = srv.metaWeblog.getRecentPosts(self.get_blogid(),
self.get_username(), self.get_password(), 1)
if len(post) > 0:
post = post[0]
else:
raise Exception, 'The current blog does not have any entry.'
else:
self.log(1, 'Retrieve post entry "%s"...', self.modeopt)
post = srv.metaWeblog.getPost\
( self.modeopt, self.get_username(), self.get_password() )
# Get the categories of this post.
self.log(1, 'Retrieve categories for post entry "%s"...',
post['postid'])
cts = srv.mt.getPostCategories(post['postid'], self.get_username(),
self.get_password())
print_post(post, cts)
def execute_l(self):
srv = self.getRPCServer()
try:
num = int(self.modeopt)
except:
num = 5
func = srv.mt.getRecentPostTitles
posts = func(self.get_blogid(), self.get_username(),
self.get_password(), num)
self.log(1, 'Retrieve "%d" recent posts...', num)
result = [['ID', 'Date', 'Title']]
for post in posts:
result.append([
post['postid'],
time.strftime('%Y-%m-%d %H:%M:%S',
decode_iso8601(post['dateCreated'].value)),
post['title']
])
print_table(result)
def execute_n(self):
self.log(1, 'Parsing post entry from standard input...')
post, cts, publish = parse_post()
srv = self.getRPCServer()
self.log(1, 'Saving new post entry...')
postid = srv.metaWeblog.newPost(self.get_blogid(), self.get_username(),
self.get_password(), post, publish)
cts = self._fixCategories(cts)
if len(cts) > 0:
self.log(1, 'Add categories "%s" to post entry "%s"...',
','.join([cat['categoryId'] for cat in cts]), postid)
srv.mt.setPostCategories(postid, self.get_username(),
self.get_password(), cts)
# Somehow under MovableType 2.5, the new post will not trigger a
# rebuild. Therefore we will force a rebuild here.
#
# XXX: Apparently this behaviour no longer exists in later version of
# MT.
if False:
self.modeopt = postid
self.execute_r()
print postid
def execute_p(self):
srv = self.getRPCServer()
result = [[
val['pingTitle'],
val['pingURL'],
val['pingIP'],
] for val in srv.mt.getTrackbackPings(self.modeopt)]
result.insert(0, ['Title', 'URL', 'IP'])
print_table(result)
def execute_r(self):
srv = self.getRPCServer()
srv.mt.publishPost(self.modeopt, self.get_username(),
self.get_password())
def execute_t(self):
srv = self.getRPCServer()
result = []
for val in srv.mt.supportedTextFilters():
result.append([val['key'], val['label']])
result.sort()
result.insert(0, ['Key', 'Label'])
print_table(result)
def execute_u(self):
srv = self.getRPCServer()
bin = sys.stdin.read()
self.log(1, 'Uploading "%s" (%d bytes)...', self.modeopt, len(bin))
media_object = {
'name': self.modeopt,
'bits': xmlrpclib.Binary(bin),
}
result = srv.metaWeblog.newMediaObject(self.get_blogid(),
self.get_username(), self.get_password(), media_object)
print result['url']
def getRPCServer(self):
if self.rpcsrv is not None:
return self.rpcsrv
httptype = urllib.splittype(self._getSite('url'))[0]
transport = get_rpc_transport(httptype)
# Default we will use 'UTF-8' encoding, if the site encoding option is
# not provided.
return xmlrpclib.ServerProxy(self._getSite('url'), transport,
self._getSite('encoding', 'UTF-8'))
def get_blogid(self):
return self._getBlog('blogid')
def get_password(self):
return self._getSite('password')
def get_username(self):
return self._getSite('username')
def loadConfig(self, config):
if config is None:
config = os.path.join(os.environ['HOME'], '.mtsendrc')
if not os.access(config, os.R_OK):
raise Exception, \
'Configuration file "%s" is not readable' % config
self.config = ConfigParser.ConfigParser()
self.config.read([config])
def log(self, level, msg, *fmt):
if self.verbose >= level:
print >> sys.stderr, msg % fmt
def setMode(self, mode, modeopt=None):
if self.mode is None:
self.mode = mode
self.modeopt = modeopt
else:
raise Exception, 'Conflicting operational mode.'
def _fixCategories(self, cts):
if len(cts) > 0:
srv = self.getRPCServer()
new = []
self.log(1, 'Retrieve available categories...')
old = srv.mt.getCategoryList(self.get_blogid(),
self.get_username(), self.get_password())
ctsmap = {}
for cat in old:
ctsmap[cat['categoryName'].lower()] = cat['categoryId']
for cat in cts:
try:
new.append({'categoryId': ctsmap[cat]})
del ctsmap[cat]
except KeyError:
pass
return new
else:
return []
def _getBlog(self, option, default=None):
if self.config is None:
raise Exception, 'Configuration has not been loaded.'
if self.alias is None:
try:
alias = self._getGlobal('default')
except KeyError:
raise Exception, 'Blog alias has not been specified.'
else:
alias = self.alias
try:
return self.config.get('blog-%s' % alias, option)
except ConfigParser.Error:
if default is not None:
return default
else:
raise KeyError, option
def _getGlobal(self, option, default=None):
if self.config is None:
raise Exception, 'Configuration has not been loaded.'
try:
return self.config.get('global', option)
except ConfigParser.Error:
if default is not None:
return default
else:
raise KeyError, option
def _getSite(self, option, default=None):
try:
if self.site is None:
self.site = self._getBlog('site')
return self.config.get('site-%s' % self.site, option)
except (ConfigParser.Error, KeyError):
if default is not None:
return default
else:
raise KeyError, option
try:
import xmlrpclib
except ImportError:
# Error reporting will be raised in the main() function. We will simply
# ignore the error here.
xmlrpclib = None
else:
class HTTP(httplib.HTTP):
def __init__(self, conn):
httplib.HTTP.__init__(self)
self._setup(conn)
class ProxyTransport(xmlrpclib.Transport):
"""Transport class for the XMLRPC.
Instead of using the HTTP/HTTPS transport, it tries to use a proxy
server to send/receive XMLRPC messages. This transport must be
initialised with the hostname and port number of the proxy server,
e.g.
transport = ProxyTransport('proxy.mydomain.com', 3128)
server = Server("http://betty.userland.com", transport)
print server.examples.getStateName(41)
"""
def __init__(self, host, port=3128, username=None, password=None,
ssl=False):
self.__host = host
self.__port = port
self.__username = username
self.__password = password
self.__ssl = ssl
self.__target_host = None
def get_authentication(self):
import base64
auth_token = '%s:%s' % (self.__username, self.__password)
auth_token = base64.encodestring(urllib.unquote(auth_token))
auth_token = auth_token.strip()
return 'Basic '+auth_token
def make_connection(self, host):
"Make a connection to the proxy server"
# Note that we will try to connect to the proxy server instead of
# our target host. It also needs to store the information about
# the target host so that we can use that information in
# send_request() call.
import socket
if self.__ssl:
# XXX: Code pieces taken from
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/301740
header = [
'CONNECT %s:443 HTTP/1.0' % host,
'User-Agent: mtsend.py/%s' % __version__,
]
if self.__username and self.__password:
header.append('Proxy-Authentication: %s' %
self.get_authentication())
proxy = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
proxy.connect((self.__host, self.__port))
proxy.sendall('\r\n'.join(header)+'\r\n\r\n')
response = proxy.recv(8192)
status = response.split()[1]
if status != '200':
print response
raise Exception, 'Invalid CONNECT response "%s"' % status
ssl = socket.ssl(proxy, None, None)
sock = httplib.FakeSocket(proxy, ssl)
conn = httplib.HTTPConnection('localhost')
conn.sock = sock
return HTTP(conn)
else:
self.__target_host = host
return httplib.HTTP('%s:%d' % (self.__host, self.__port))
def send_content(self, connection, request_body):
"""Send the content of the XML-RPC request to the server.
This method override the default send_content. If the proxy
username and password has been configured, then we will place an
extra header here so the connection can be authenticated.
"""
if (self.__username is not None) and (self.__password is not None):
connection.putheader("Proxy-Authorization",
self.get_authentication())
xmlrpclib.Transport.send_content(self, connection, request_body)
def send_request(self, connection, handler, request_body):
if not self.__ssl:
handler = 'http://' + self.__target_host + handler
connection.putrequest("POST", handler)
def decode_iso8601(date):
# Translate an ISO8601 date to the tuple format used in Python's time
# module.
regex = r'^(\d{4})(\d{2})(\d{2})T(\d{2}):(\d{2}):(\d{2})'
match = re.search(regex, str(date))
if not match:
raise Exception, '"%s" is not a correct ISO8601 date format' % date
else:
result = match.group(1, 2, 3, 4, 5, 6)
result = map(int, result)
result += [0, 1, -1]
return tuple(result)
def get_rpc_transport(httptype):
# Detect whether we need to use 'ProxyTranspory'. Proxy detection is
# done using HTTP_PROXY or http_proxy environment variable.
proxy = os.environ.get('HTTP_PROXY') or os.environ.get('http_proxy')
if proxy:
match = re.match(r'^(http://)?(([^:@]+)(:([^@]*))?@)?([^:]+):(\d+)',
proxy)
if match:
username = match.group(3) or None
password = match.group(5) or None
hostname = match.group(6)
bindport = int(match.group(7))
return ProxyTransport(hostname, bindport, username, password,
httptype=='https')
# Letting ServerProxy to pick the best suitable transport
return None
re_date = r'^(\d{2})/(\d{2})/(\d{4}) (\d{2}):(\d{2}):(\d{2})( ([AP]M))?$'
re_date = re.compile(re_date).search
def parse_date(val):
match = re_date(val.upper())
if match is None:
raise Exception, 'Date value "%s" is invalid.' % val
result = map(int, match.group(1, 2, 3, 4, 5, 6))
try:
ampm = match.group(8)
except IndexError:
pass
else:
if ampm == 'PM':
if result[3] != 12:
result[3] += 12
elif ampm == 'AM':
if result[3] == 12:
result[3] = 0
elif ampm is not None:
raise Exception, 'Expect (AM|PM) get "%s"' % ampm
result[0:3] = [result[2], result[0], result[1]]
result += [0, 1, -1]
return tuple(result)
def parse_post():
state = 0
code = None
post = {}
cts = []
publish = xmlrpclib.Boolean(0)
for line in sys.stdin:
line = line.rstrip()
if state == 0:
if line == '-----':
state = 1
else:
idx = line.find(':')
if idx < 0:
continue # Invalid entry
key, val = line[:idx].strip().upper(), line[idx+1:].strip()
if key == 'TITLE':
post['title'] = val
elif key == 'DATE':
val = time.strftime('%Y%m%dT%H:%M:%S', parse_date(val))
post['dateCreated'] = xmlrpclib.DateTime(val)
elif key == 'STATUS':
publish = xmlrpclib.Boolean(val.lower() == 'publish')
elif key == 'ALLOW COMMENTS':
val = int(val)
if val not in (0, 1, 2):
raise Exception, \
'ALLOW COMMENTS must be either 0, 1 or 2'
post['mt_allow_comments'] = val
elif key == 'ALLOW PINGS':
post['mt_allow_pings'] = int(val)
elif key == 'PING':
try:
post['mt_tb_ping_urls'].append(val)
except KeyError:
post['mt_tb_ping_urls'] = [val]
elif key == 'CONVERT BREAKS':
# MT2.6 - mt_convert_breaks has changed its value from
# XML-RPC boolean to string.
post['mt_convert_breaks'] = val
elif key == 'POSTID':
post['postid'] = val
elif key == 'PRIMARY CATEGORY':
cts.insert(0, val.lower())
elif key == 'CATEGORY':
cts.append(val.lower())
elif key == 'KEYWORDS':
post['mt_keywords'] = val
else:
raise Exception, 'Invalid field key: %s' % key
elif state == 1:
line = line.upper()
if line == 'BODY:':
code = 'description'
elif line == 'EXTENDED BODY:':
code = 'mt_text_more'
elif line == 'EXCERPT:':
code = 'mt_excerpt'
else:
raise Exception, 'Invalid line in the current state: %s' % line
state = 2
elif state == 2:
if line.startswith('-----') and (not line.rstrip('-')):
code = None
state = 1
else:
if post.has_key(code):
post[code] += '\n' + line
else:
post[code] = line
return post, cts, publish
def print_post(post, cts):
if post.has_key('title'):
print 'TITLE:', post['title']
print 'DATE:', time.strftime('%m/%d/%Y %H:%M:%S',
decode_iso8601(post['dateCreated'].value))
for cat in cts:
if cat['isPrimary']:
print 'PRIMARY CATEGORY:', cat['categoryName']
print 'CATEGORY:', cat['categoryName']
# We cannot really determine whether the post has been published.
# Therefore we assume that it is.
print 'STATUS: publish'
if post.has_key('mt_allow_comments'):
print 'ALLOW COMMENTS:', post['mt_allow_comments']
if post.has_key('mt_allow_pings'):
print 'ALLOW PINGS:', post['mt_allow_pings']
if post.has_key('mt_convert_breaks'):
print 'CONVERT BREAKS:', post['mt_convert_breaks']
if post.get('mt_keywords'):
print 'KEYWORDS:', post['mt_keywords']
# We will also print the postid so that it can be verified later.
print 'POSTID:', post['postid']
# Start printing the body
if post.get('description'):
print '-----'
print 'BODY:'
print post['description']
if post.get('mt_text_more'):
print '-----'
print 'EXTENDED BODY:'
print post['mt_text_more']
if post.get('mt_excerpt'):
print '-----'
print 'EXCERPT:'
print post['mt_excerpt']
def print_table(table, heading=1):
# We have to work out the maximum width first.
if not table:
return
widths = [0] * len(table[0])
for row in table:
for idx, cell in zip(range(len(row)), row):
if isinstance(cell, unicode):
cell = cell.encode(DEFAULT_ENCODING)
elif not isinstance(cell, str):
cell = str(cell)
row[idx] = cell
if len(cell) > widths[idx]:
widths[idx] = len(cell)
border = '+'+('+'.join(['-'*(width + 2) for width in widths]))+'+'
format = '|'+('|'.join([' %%-%ds ' % width for width in widths]))+'|'
hdrs = 0
print border
for row in table:
print format % tuple(row)
if (not hdrs) and heading and (len(table) > 1):
print border
hdrs = 1
print border
DEFAULT_ENCODING = 'utf-8'
def main(args):
import getopt
try:
opts, args = getopt.getopt(args, 'a:B:Cc:E:G:hL:NP:qR:TU:vV')
except getopt.GetoptError, ex:
print >> sys.stderr, 'Error: '+str(ex)
print >> sys.stderr, __doc__
sys.exit(1)
mtsend = MTSend()
config = None
for opt, arg in opts:
if opt == '-a':
mtsend.alias = arg
elif opt == '-B':
mtsend.setMode('b', arg)
elif opt == '-C':
mtsend.setMode('c')
elif opt == '-c':
config = arg
elif opt == '-E':
mtsend.setMode('e', arg)
elif opt == '-G':
mtsend.setMode('g', arg)
elif opt == '-h':
print >> sys.stderr, __doc__
sys.exit(0)
elif opt == '-L':
mtsend.setMode('l', arg)
elif opt == '-N':
mtsend.setMode('n')
elif opt == '-P':
mtsend.setMode('p', arg)
elif opt == '-q':
mtsend.verbose -= 1
elif opt == '-R':
mtsend.setMode('r', arg)
elif opt == '-T':
mtsend.setMode('t')
elif opt == '-U':
mtsend.setMode('u', arg)
elif opt == '-v':
mtsend.verbose += 1
elif opt == '-V':
print >> sys.stderr, 'Version %s' % __version__
sys.exit(0)
else:
print >> sys.stderr, 'Warning: Option "%s" is not handled.' % opt
if mtsend.mode is None:
print >> sys.stderr, 'Error: Action is not specified'
print >> sys.stderr, __doc__
sys.exit(1)
if xmlrpclib is None:
print >> sys.stderr, '''Error: Cannot import "xmlrpclib" module.
You should either upgrade to Python 2.2+, or download and install the
"xmlrpclib" from the following website:
http://www.pythonware.com/products/xmlrpc/
'''
sys.exit(1)
try:
mtsend.loadConfig(config)
mtsend.execute()
except Exception, ex:
if mtsend.verbose > 1:
raise
else:
print >> sys.stderr, 'Error:', ex
if __name__ == '__main__':
main(sys.argv[1:])