imap.py ( imap access routines in python )

Steven D. Majewski (sdm7g@elvis.med.virginia.edu)
Sat, 9 Jul 1994 20:54:53 -0400

Here is another INCOMPLETE, but BETTER set of classes to read
imap mailboxes. The original kernal of the previous version was
quite old, and I didn't realize how BAD it actually was until
I got into extending it. This version is not quite complete,
either, but what's there mostly works. Perhaps I shouldn't
distribute THIS version either, but I'm leaving on a vacation
for two weeks and I have an uncontrollable urge to try to
cover my tracks on the previous version! :->

However, on the plus side, it *does* implement an IMAP version
of "frm" (sort of) for which I've heard a desire expressed on
a couple of occasions. And without too much difficulty, it
could be turned into a sleep -> check for /NEW -> FETCH ENVELOPE
IMAP remote biff program.

Known bugs:

LocalSession()'s that aren't explicitly closed can leave
a process hanging. I need to fix __del__ to clean up
properly.

LocalSession()'s can be killed by control-c and other signals.

"mis-features" or To-Do's:

Things are currently a mix of old access codes that didn't
parse everything, and the new methods which DO fully parse
things. So there are probably several holes where an unexpected
return isn't handled.

The distinction of Token('EXISTS') and "EXISTS" to keep
the distinction of tokens and strings is probably not
necessary now. It was useful for debugging to keep that
additional context, but it now gets awkward to remember
that I'm required to test equality to a Token class, and
not a string.

Commands for a sequence of messages are not yet supported,
nor is having more than one command pending. ( And I may
not actually implement it the way the comments suggest. )

The code can print my proposed imap URL's, but there is
yet no code to parse URL's and fetch. ( But most of the
basic parsing is in urllib. I just need some glue. )

SLOW-SLOW-SLOW! --- I hope adding fetch-range and cleaning
up the parsing and a few other things will fix this. Otherwise,
I'll be forced to write a python-C interface to c-client.

rfc822.py in the standard library should have code to parse
the rfc822 header into a dictionary

Obviously, not IMAP4 specific support yet.

- I'm off for 2 weeks at the Outer Banks tomorrow!

If my desk is not piled TOO high when I get back, I'll hope to finish
the above, and soon after have a WWW proxy CGI that accepts imap URL's
and spits out HTML, making Mosaic or other WWW-clients into IMAP
readers. ( Including turning In-Reply-To:'s into anchors, etc. )

In the mean time, the Dutch soccer team has my best wishes!
My sister-in-law has been warming up the trans-atlantic
phones after every game, talking to the family in Einthoven (sp?),
so I expect that even a few souls at CWI are a bit excited.

- Steve Majewski (804-982-0831) <sdm7g@Virginia.EDU>
- UVA Department of Molecular Physiology and Biological Physics
------------------------------------------------------------------

#!/usr/local/bin/python
#
# WARNING: The documentation is probably leading the implementation
# in a number of places, and actually documents the PLAN of the
# implementation.
# Some of these routines haven't been converted to use the new
# parse routines or to properly check their values.
#
# A BAD should raise an error, while a NO could raise an error
# in some contexts, or return a false (None/Empty-List) value.
# [ This is certainly not yet true! ]
#
# Operation is (so far) stricly sequential, with no outstanding
# commands. [ That should be comming soon. ]
#
# Classes:
# IMAP Sessions
# generic _Session() [ A stub containing the common routines for: ]
# - RemoteSession( host, user, passwd )
# - LocalSession() # PREAUTHENTICATED
# Mailboxes
# are a Session context, plus the mailbox name and context.
# Messages
# are a Mailbox and a sequence number. ( Should be UID! )
# Date fetched from the mailbox while the session is open
# will be kept when the Session is closed, but no new
# data fields are available unless the Session is reopened.
# Thus: If an Message Envelope is fetched, it can be
# references when the mailboxes session is closed, but
# another field, like RFC822.TEXT, that hasn't been cached,
# for example, will raise an InvalidState Error.
# MailboxSequences /* not yet! */
# MailboxSequences are (will be) more effecient than
# sequences of Messages ( but have the restriction that
# they must all be from the same mailbox and contiguous
# in sequence. )
# mbxSeq[100:200].envelope()
# maps to a "FETCH 100:200 ENVELOPE" rather than
# a 101 separate "FETCH <n> ENVELOPE" commands.
# Lists of Messages.
# Messages from the same of different Mailbox, or
# even from different Sessions can be combined into
# ordinary lists or other sequence type containers.
#
#

from socket import gethostbyname,getservbyname,socket,AF_INET,SOCK_STREAM
from rand import choice
import string, sys, urllib, posix

# generate an increasing sequence of symbols
# all elements of sequence share a common "random" prefix
#
class GenSym:
def __init__( self ):
self._prefix = choice( string.lowercase ) + choice( string.uppercase )
self._current = 0
self._outstanding = []
def last( self ):
return self._prefix + '%04d' % self._current
def next( self ):
self._current = self._current + 1
self._outstanding.append( self._current )
return self.last()
def retire( self ):
last = self.last()
self._outstanding = self._outstanding[1:]
return( last )

DEBUG = None
_BUFSIZ = 8192
_S_CONNECTED = 'Connected'
_S_AUTHENTICATED = 'Authorized'
_S_SELECTED = 'Mailbox Selected'
InvalidState = 'InvalidState'
ImapError = 'ImapError'

class _Session:
# _Session is a generic "stub" class that defines some common behaviour
# for LocalSession and RemoteSession ( and any other derrived classes ).
# It is not to be instantiated inself, but only here as part of the
# implementation of those other classes.
def __init__( self ):
raise RuntimeErr, 'Generic Stub class - use RemoteSession or LocalSession!'
def _init_( self ):
self.log = sys.stderr.write
# can be another file write method or [].append or None
self.state = None
self._gensym = GenSym()
self.nextseq = self._gensym.next
self.lastseq = self._gensym.retire
def __repr__( self ):
if hasattr( self, "hostname" ):
hostname = ' at: ' + self.hostname
else: hostname = ' (LOCAL connection)'
state = repr( self.state )
return '<imap server connection: ' + state + hostname + '>'
def __str__( self ): return self._url
def url( self ): return self._url
# Basic routines send & recv must be defined in subclass
# Common Basic Routines
def close( self ):
if self.state:
self.logout()
self.state = None
self.file.close()
def __del__( self ):
self.close()
def read( self ):
block = ticket = ''
while ticket <> self.lastseq():
tmp = self.recv()
block = block + tmp
if block[-2:] <> _CRLF : continue
lastline = string.splitfields( tmp, _CRLF )[-2]
ticket = string.split( lastline )[0]
if DEBUG and self.log : self.log( block )
if self.set_status_check( lastline ) : return block
def readline( self ):
return self.file.readline()
def readlines( self ):
lines = []
last = self.lastseq()
slen = len(last)
lines.append( self.file.readline() )
while ( last <> lines[-1][:slen] ) :
lines.append( self.file.readline() )
if DEBUG and self.log : self.log( lines[-1] )
self.set_status( lines[-1] )
return lines
def set_status( self, line ):
line = string.strip( line )
if self.log : self.log( line + "\n" )
status = string.split( line )
if status[0] <> self.lastseq():
raise InvalidState, 'message sequence out of order'
self.status = status[:2]
self.status.append( string.join( status[2:] ) )
return self.status[1]
def check_status( self ):
if self.status[1] <> 'OK' : raise ImapError, string.join( self.status )
else: return self.status[1]
def set_status_check( self, line ):
self.set_status( line )
return self.check_status()
def logout( self ):
self.send( 'LOGOUT' )
junk = self.readlines()
self.state = None
# Most Higher Level Routines are Common
def select( self, mbox ):
return self.select_examine( mbox, 'SELECT' )
def examine( self, mbox ):
return self.select_examine( mbox, 'EXAMINE' )
def select_examine( self, mbox, cmd ):
self.send( cmd + ' ' + mbox )
reply = self.read()
self.state = cmd
self.mboxname = mbox
return reply
def find( self, match ):
self.send( 'FIND ALL.MAILBOXES ' + match )
reply = self.readlines()
self.check_status()
mboxlist = []
for line in reply:
line = string.split( line )
if line[1] == 'MAILBOX' : mboxlist.append( line[-1] )
return mboxlist
def Mailbox( self, mbox ):
return Mailbox( self, mbox )
def findmailboxes( self, match ):
mboxlist = []
for each in self.find( match ):
mboxlist.append( self.Mailbox( each ) )
return mboxlist

class RemoteSession( _Session ) :
def __init__( self, *args ):
_Session._init_(self)
if args : junk = self.connect( args[0] )
if args[1:] : junk = apply( self.login, args[1:3] )
def reopen( self ):
if self.state : raise InvalidState, "Can't reopen an open Session!"
self.connect( self.hostname )
self.login( self.user, self.passwd )
def connect( self, hostname ):
if self.state : raise InvalidState, `self.state`
self.hostname = hostname
self.host = gethostbyname( hostname )
self.port = getservbyname( 'imap', 'tcp' )
self.sock = socket( AF_INET, SOCK_STREAM )
self.sock.connect( (self.host, self.port) )
self.file = self.sock.makefile( 'rw' )
self.state = _S_CONNECTED
self.banner = self.readline()
if self.log: self.log( self.banner )
self.status = string.split( self.banner )[1]
return self
def login( self, user, passwd ):
self.user = user
self.passwd = passwd
if self.state <> _S_CONNECTED : raise InvalidState, `self.state`
self.send( 'LOGIN ' + user + ' ' + passwd )
self.set_status( self.recv() )
if ( self.status[1] == 'OK' ) : self.state = _S_AUTHENTICATED
self._url = 'imap://'+self.user+':'+self.passwd+'@'+self.hostname+'/'
# low level Socket routines:
def send( self, cmd ):
self.sock.send( self.nextseq() + ' ' + cmd + _CRLF )
def recv( self ):
return self.sock.recv( _BUFSIZ )
def close( self ):
_Session.close( self )
self.sock.close()

ImapSession = Session = RemoteSession # qualified name synonym, in case of "from imap import *"

#
# LocalSessions are run from a logged in process and are PREAUTHENTICATED
#

IMAPD = '/usr/local/etc/imapd' # /*LOCAL*/

class LocalSession( _Session ):
def __init__( self ):
_Session._init_(self)
r, self.W = posix.pipe()
self.R, w = posix.pipe()
self.pid = posix.fork()
if not self.pid:
try:
posix.dup2( w, 1 )
posix.dup2( r, 0 )
posix.execv( IMAPD, () )
except: sys.exit( 1 )
self.file = posix.fdopen( self.R, 'r' )
self.banner = self.file.readline()
if self.log: self.log( self.banner )
self.status = string.split( self.banner )[1]
if self.status == "PREAUTH" : self.state = _S_AUTHENTICATED
self._url = "imap:"
def reopen( self ):
if not self.state : self.__init__()
else: raise InvalidState, "Can't reopen() an open Session!"
def send( self, cmd ):
n = posix.write( self.W, self.nextseq() + ' ' + cmd + '\r\n' )
def recv( self ):
return posix.read( self.R, _BUFSIZ )

class Mailbox:
def __init__( self, session, name ):
self.session = session
self.name = name
self.dict = {}
self.examine()
def exists( self ):
self.examine()
return self.dict['EXISTS']
def recent( self ):
self.examine()
return self.dict['RECENT']
def select_examine( self, cmd ):
if self.session.state == cmd == self.state and self.session.mboxname == self.name :
return
reply = self.session.select_examine( self.name, cmd )
for resp in parse( reply )[:-1] :
if resp[1] == Token('FLAGS') :
self.dict['FLAGS'] = resp[2]
else: self.dict[str(resp[2])] = resp[1]
self.state = cmd
def select( self ):
self.select_examine( 'SELECT' )
def examine( self ):
self.select_examine( 'EXAMINE' )
#
# len() returns the cached length, not necessarily the actual current value.
# I don't want 'for' to make a N calls to EXAMINE to check the value
# When Guido changed the implementation of for to raise END signals this
# won't be a problem. Till then, I haven't figured out all of the possiblities
# for IMAP to return an unsolicited EXISTS.
#
def __len__( self ):
return self.dict['EXISTS'] - 1
def __getitem__( self, i ):
if i < 0 : i = self.__len__() + i
if i < self.__len__() :
return self.Message(i+1)
else: raise IndexError
def Message( self, i ):
return Message( self, i )
def __getslice__( self, i, j ):
list = []
for n in range( i, j ):
list.append( self[n] )
return list
def search( self, match ):
if not self.state : self.examine()
self.session.send( 'SEARCH ' + match )
reply = self.session.readlines()
stat = string.split(reply[-1])
if stat[1] <> 'OK' or stat[2] <> 'SEARCH' : return None
list = map( string.atoi, string.split(reply[-2])[2:] )
return map( self.Message, list )
def url( self ):
return self.session.url() + self.name
def __repr__( self ):
return self.url()

class Message:
def __init__( self, mbox, seqn ):
self.mbox = mbox
self.seqn = seqn
self.session = self.mbox.session
def fetch( self, what ):
self.mbox.examine()
self.session.send( 'FETCH ' + `self.seqn` + ' ' + what )
self.parsed_reply = parse(self.session.read())
for S in self.parsed_reply:
if S[2] == Token('FETCH') and S[1] == self.seqn :
return S[3]
def __getitem__( self, key ):
key = string.upper(key)
getter = string.lower(key)
if hasattr(self, key) : return getattr( self, key )
elif hasattr( self, getter ):
val = getattr( self, getter )()
setattr( self, key, val )
return val
else: raise KeyError, key
def header( self ):
return self.fetch( 'RFC822.HEADER' )[1]
def envelope( self ):
tmp = self.fetch('ENVELOPE')
if tmp[0] == Token('ENVELOPE'):
return tmp[1]
def text( self ):
return self.fetch('RFC822.TEXT')[1]
def mid(self):
return self['ENVELOPE'][-1]
def subj(self):
return self['ENVELOPE'][1] or ''
def date( self ):
return self['ENVELOPE'][0]
def fromname(self):
return self['ENVELOPE'][2][0][0] or ''
def fromaddress(self):
fr1 = self['ENVELOPE'][2][0]
return fr1[-2]+'@'+fr1[-1]
def personfrom( self ):
return self.fromname() or self['ENVELOPE'][2][0][2] or ''
def uid( self ):
# NOTE: FETCH uid and some other FETCH commands in IMAP2bis don't
pass;
def id(self):
return `self.seqn` # TEMP: should be uid or message id
def url( self ):
return self.mbox.url() + '#mid:' + self['MID'][1:-1]

#------------------ parsing routines -----------

import string

_CRLF = '\015\012'
_SPACE = ' '
_TAB = '\t'
_WHITESPACE = _SPACE + _TAB
_ENDTOKEN = _WHITESPACE + ')' + _CRLF
_QUOTE = '"'
SyntaxError = 'SyntaxError'

class Token: # to designate keywords and non quoted alpha strings
def __init__( self, istr ):
self.str = string.upper(istr)
def __repr__( self ):
return '<' + str(self.str) + '>'
def __hash__( self ):
return hash( self.__class__ ) ^ hash( self.str )
def __cmp__( self, other ):
return hash( self ) <> hash( other )
def __len__( self ):
return len(self.str)
def __getitem__( self, i ):
return self.str[i]
def __getslice__( self, i, j ):
return self.str[i:j]
def __str__( self ):
return self.str

class Special( Token ):
def __repr__( self ):
if self.str == _CRLF : return '{CRLF}'
else: return str(self.str)

class Flag( Token ): # for /flags
def __init__( self, istr ):
self.str = istr[0] + string.upper( istr[1] ) + string.lower( istr[2:] )
def __repr__( self ):
return str( self.str )

def Literal( istr ): # noop class for literal strings
return istr


Nil = None
Unsolicited = Special( '*' )
EOL = Special( _CRLF )

def parse( s ):
list = []; line = []
while s:
tok, s = nexttok( s )
if tok <> '' : line.append( tok )
if tok == EOL :
list.append( line )
line = []
if line : list.append( line )
return list

def eatSpaces( s ):
for i in range( len(s) ):
if s[i] not in _WHITESPACE : return s[i:]
return ''

def nexttok( str ):
str = eatSpaces( str )
if not str : return ( str, str )

if str[0] == _QUOTE : # quoted string
i = string.index( str[1:], _QUOTE ) + 1
return ( Literal(str[1:i]), str[i+1:] )
elif str[0] == '{' : # literal string
i = string.index( str, '}' )
size = string.atoi( str[1:i] )
i = i + 1
while str[i] not in _CRLF : i = i + 1
while str[i] in _CRLF : i = i + 1
return ( Literal(str[i:i+size]), str[i+size:] )
elif str[0] == '(' : # S-EXPR
SExpr = ()
str = str[1:]
while str:
tok, str = nexttok( str )
if tok == ')' : return( SExpr, str )
else: SExpr = SExpr + ( tok, )
return ( SExpr, str )
elif str[0] == ')' : # end of S-EXPR
return ( ')', str[1:] )
elif str[0] == '*' :
return ( Unsolicited, str[1:] )
elif str[0:2] == _CRLF : return ( EOL, str[2:] )
elif string.upper(str[:3]) == 'NIL' : # NIL
return ( Nil, str[3:] )
elif str[0] in string.digits:
for i in range( len(str) ):
if str[i] not in string.digits : break
if i == len( str )-1 and str[i] in string.digits: i = i + 1
return ( string.atoi( str[:i] ), str[i:] )
elif str[0] == '\\' :
for i in range( len(str) ):
if str[i] in _ENDTOKEN :
return ( Flag(str[:i]), str[i:] )
return ( Flag( str ), '' )
else:
for i in range( len(str) ):
if str[i] in _ENDTOKEN :
return ( Token(str[:i]), str[i:] )
return ( Token( str ), '' )

def dict( plist ):
# plist must be a list that can be paired, i.e. even number length
# and the first of each pair should be a Keyword
# and it BETTER have a "str" attribute.
d = {}
for i in range( 0, len(plist), 2 ):
d[ plist[i].str ] = plist[i+1]
return d

#### TEST ####

def frm( n, *args ):
lj = string.ljust
if args : mbox, rmt = args[0], args[1:]
else: mbox = "INBOX"
if rmt : Imapd = RemoteSession( rmt[0], rmt[1], rmt[2] )
else: Imapd = LocalSession()
mbox = Imapd.Mailbox( mbox )
Imapd.log = None
for msg in mbox[-n:]:
print ( "%03d:" % msg.seqn ), lj(msg.personfrom(),20), ">" + msg['SUBJ'][:55]

def urltest( n, *args ):
if args : mbox, rmt = args[0], args[1:]
else: mbox = "INBOX"
if rmt : Imapd = RemoteSession( rmt[0], rmt[1], rmt[2] )
else: Imapd = LocalSession()
mbox = Imapd.Mailbox( mbox )
Imapd.log = None
for msg in mbox[-n:]: print msg.url()

if __name__ == "__main__" :
import sys
if sys.argv[1:] : n = string.atoi( sys.argv[1] ) or 0
else: n = 20
if sys.argv[2:] : mbox = sys.argv[2]
else: mbox = "INBOX"
frm( n, mbox )