# -*- coding: cp1252 -*- ## # Implements the minimal functionality required # to extract a "Workbook" or "Book" stream (as one big string) # from an OLE2 Compound Document file. #

Copyright © 2005-2006 Stephen John Machin, Lingfo Pty Ltd

#

This module is part of the xlrd package, which is released under a BSD-style licence.

## # No part of the content of this file was derived from the works of David Giffin. # 2007-04-22 SJM Missing "<" in a struct.unpack call => can't open files on bigendian platforms. # 2007-05-07 SJM Meaningful exception instead of IndexError if a SAT (sector allocation table) is corrupted. import sys from struct import unpack from timemachine import * ## # Magic cookie that should appear in the first 8 bytes of the file. SIGNATURE = "\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1" EOCSID = -2 FREESID = -1 SATSID = -3 MSATSID = -4 class CompDocError(Exception): pass class DirNode(object): def __init__(self, DID, dent, DEBUG=0): # dent is the 128-byte directory entry self.DID = DID # (cbufsize, self.etype, self.colour, self.left_DID, self.right_DID, # self.root_DID, # self.first_SID, # self.tot_size) = \ # unpack('> logfile, "\nCompDoc format: version=0x%04x revision=0x%04x" % (version, revision) self.mem = mem ssz, sssz = unpack('> logfile, \ "WARNING *** file size (%d) not 512 + multiple of sector size (%d)" \ % (len(mem), sec_size) if DEBUG: print >> logfile, 'sec sizes', ssz, sssz, sec_size, self.short_sec_size print >> logfile, "mem data: %d bytes == %d sectors" % (mem_data_len, mem_data_secs) print >> logfile, "SAT_tot_secs=%d, dir_first_sec_sid=%d, min_size_std_stream=%d" \ % (SAT_tot_secs, self.dir_first_sec_sid, self.min_size_std_stream,) print >> logfile, "SSAT_first_sec_sid=%d, SSAT_tot_secs=%d" % (SSAT_first_sec_sid, SSAT_tot_secs,) print >> logfile, "MSAT_first_sec_sid=%d, MSAT_tot_secs=%d" % (MSAT_first_sec_sid, MSAT_tot_secs,) nent = int_floor_div(sec_size, 4) # number of SID entries in a sector fmt = "<%di" % nent trunc_warned = 0 # # === build the MSAT === # MSAT = list(unpack('<109i', mem[76:512])) sid = MSAT_first_sec_sid while sid >= 0: if sid >= mem_data_secs: raise CompDocError( "MSAT extension: accessing sector %d but only %d in file" % (sid, mem_data_secs) ) offset = 512 + sec_size * sid news = list(unpack(fmt, mem[offset:offset+sec_size])) sid = news.pop() MSAT.extend(news) if DEBUG: print >> logfile, "MSAT: len =", len(MSAT) print >> logfile, MSAT # # === build the SAT === # self.SAT = [] for msid in MSAT: if msid == FREESID: continue if msid >= mem_data_secs: if not trunc_warned: print >> logfile, "WARNING *** File is truncated, or OLE2 MSAT is corrupt!!" print >> logfile, \ "INFO: Trying to access sector %d but only %d available" \ % (msid, mem_data_secs) trunc_warned = 1 continue offset = 512 + sec_size * msid news = list(unpack(fmt, mem[offset:offset+sec_size])) self.SAT.extend(news) if DEBUG: print >> logfile, "SAT", self.SAT # print >> logfile, "SAT ", # for i, s in enumerate(self.SAT): # print >> logfile, "entry: %4d offset: %6d, next entry: %4d" % (i, 512 + sec_size * i, s) # print >> logfile, "%d:%d " % (i, s), print # === build the directory === # dbytes = self._get_stream( self.mem, 512, self.SAT, self.sec_size, self.dir_first_sec_sid, name="directory") dirlist = [] did = -1 for pos in xrange(0, len(dbytes), 128): did += 1 dirlist.append(DirNode(did, dbytes[pos:pos+128], 0)) self.dirlist = dirlist _build_family_tree(dirlist, 0, dirlist[0].root_DID) # and stand well back ... if DEBUG: for d in dirlist: d.dump(DEBUG) # # === get the SSCS === # sscs_dir = self.dirlist[0] assert sscs_dir.etype == 5 # root entry self.SSCS = self._get_stream( self.mem, 512, self.SAT, sec_size, sscs_dir.first_SID, sscs_dir.tot_size, name="SSCS") # if DEBUG: print >> logfile, "SSCS", repr(self.SSCS) # # === build the SSAT === # self.SSAT = [] if SSAT_tot_secs > 0 and sscs_dir.tot_size == 0: print >> logfile, \ "WARNING *** OLE2 inconsistency: SSCS size is 0 but SSAT size is non-zero" if sscs_dir.tot_size > 0: sid = SSAT_first_sec_sid nsecs = SSAT_tot_secs while sid >= 0 and nsecs > 0: nsecs -= 1 start_pos = 512 + sid * sec_size news = list(unpack(fmt, mem[start_pos:start_pos+sec_size])) self.SSAT.extend(news) sid = self.SAT[sid] # assert SSAT_tot_secs == 0 or sid == EOCSID if DEBUG: print >> logfile, "SSAT last sid %d; remaining sectors %d" % (sid, nsecs) assert nsecs == 0 and sid == EOCSID if DEBUG: print >> logfile, "SSAT", self.SSAT def _get_stream(self, mem, base, sat, sec_size, start_sid, size=None, name=''): # print >> self.logfile, "_get_stream", base, sec_size, start_sid, size sectors = [] s = start_sid if size is None: # nothing to check against while s >= 0: start_pos = base + s * sec_size sectors.append(mem[start_pos:start_pos+sec_size]) try: s = sat[s] except IndexError: raise CompDocError( "OLE2 stream %r: sector allocation table invalid entry (%d)" % (name, s) ) assert s == EOCSID else: todo = size while s >= 0: start_pos = base + s * sec_size grab = sec_size if grab > todo: grab = todo todo -= grab sectors.append(mem[start_pos:start_pos+grab]) try: s = sat[s] except IndexError: raise CompDocError( "OLE2 stream %r: sector allocation table invalid entry (%d)" % (name, s) ) assert s == EOCSID if todo != 0: print >> self.logfile, \ "WARNING *** OLE2 stream %r: expected size %d, actual size %d" \ % (name, size, size - todo) return ''.join(sectors) def _dir_search(self, path, storage_DID=0): # Return matching DirNode instance, or None head = path[0] tail = path[1:] dl = self.dirlist for child in dl[storage_DID].children: if dl[child].name.lower() == head.lower(): et = dl[child].etype if et == 2: return dl[child] if et == 1: if not tail: raise CompDocError("Requested component is a 'storage'") return self._dir_search(tail, child) dl[child].dump(1) raise CompDocError("Requested stream is not a 'user stream'") return None ## # Interrogate the compound document's directory; return the stream as a string if found, otherwise # return None. # @param qname Name of the desired stream e.g. u'Workbook'. Should be in Unicode or convertible thereto. def get_named_stream(self, qname): d = self._dir_search(qname.split("/")) if d is None: return None if d.tot_size >= self.min_size_std_stream: return self._get_stream( self.mem, 512, self.SAT, self.sec_size, d.first_SID, d.tot_size, name=qname) else: return self._get_stream( self.SSCS, 0, self.SSAT, self.short_sec_size, d.first_SID, d.tot_size, name=qname + " (from SSCS)") ## # Interrogate the compound document's directory. # If the named stream is not found, (None, 0, 0) will be returned. # If the named stream is found and is contiguous within the original byte sequence ("mem") # used when the document was opened, # then (mem, offset_to_start_of_stream, length_of_stream) is returned. # Otherwise a new string is built from the fragments and (new_string, 0, length_of_stream) is returned. # @param qname Name of the desired stream e.g. u'Workbook'. Should be in Unicode or convertible thereto. def locate_named_stream(self, qname): d = self._dir_search(qname.split("/")) if d is None: return (None, 0, 0) if d.tot_size >= self.min_size_std_stream: return self._locate_stream(self.mem, 512, self.SAT, self.sec_size, d.first_SID, d.tot_size) else: return ( self._get_stream( self.SSCS, 0, self.SSAT, self.short_sec_size, d.first_SID, d.tot_size, qname + " (from SSCS)"), 0, d.tot_size ) return (None, 0, 0) # not found def _locate_stream(self, mem, base, sat, sec_size, start_sid, size): # print >> self.logfile, "_locate_stream", base, sec_size, start_sid, size s = start_sid if s < 0: raise CompDocError("_locate_stream: start_sid (%d) is -ve" % start_sid) p = -99 # dummy previous SID start_pos = -9999 end_pos = -8888 slices = [] while s >= 0: if s == p+1: # contiguous sectors end_pos += sec_size else: # start new slice if p >= 0: # not first time slices.append((start_pos, end_pos)) start_pos = base + s * sec_size end_pos = start_pos + sec_size p = s s = sat[s] assert s == EOCSID # print >> self.logfile, len(slices) + 1, "slices" if not slices: # The stream is contiguous ... just what we like! return (mem, start_pos, size) slices.append((start_pos, end_pos)) return (''.join([mem[start_pos:end_pos] for start_pos, end_pos in slices]), 0, size) # ==========================================================================================