您的位置:首页 > 编程语言 > Python开发

libreoffice python 操作word及excel文档的方法

2019-07-04 15:10 1151 查看

1、开始、关闭libreoffice服务;

开始之前同步字体文件时间,是因为创建soffice服务时,服务会检查所需加载的文件的时间,如果其认为时间不符,则其可能会重新加载,耗时较长,因此需事先统一时间。

使用时如果需要多次调用,最后每次调用均开启后关闭,否则libreoffice会创建一个缓存文档并越用越大,处理时间会增加。

class OfficeProcess(object):
def __init__(self):
self.p = 0
subprocess.Popen('find /usr/share/fonts | xargs touch -m -t 201801010000.00', shell=True)

def start_office(self):
self.p = subprocess.Popen('soffice --pidfile=sof.pid --invisible --accept="socket,host=localhost,port=2002;urp;"', shell=True)
while True:
try:
local_context = uno.getComponentContext()
resolver = local_context.getServiceManager().createInstanceWithContext('com.sun.star.bridge.UnoUrlResolver', local_context)
resolver.resolve('uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext')
return
except:
print(ts(), "wait for connecting soffice...")
time.sleep(1)
continue

def stop_office(self):
with open("sof.pid", "rb") as f:
try:
os.kill(int(f.read()), signal.SIGTERM)
self.p.wait()
except:
pass

2、init service manager

local_context = uno.getComponentContext()
service_manager = local_context.getServiceManager()
resolver = service_manager.createInstanceWithContext('com.sun.star.bridge.UnoUrlResolver', local_context)
self.ctx = resolver.resolve('uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext')
self.smgr = self.ctx.ServiceManager
self.desktop = self.smgr.createInstanceWithContext('com.sun.star.frame.Desktop', self.ctx)

3、从二进制数据中读取doc文档

def ImportFromMemory(self, data):
istream = self.smgr.createInstanceWithContext('com.sun.star.io.SequenceInputStream', self.ctx)
istream.initialize((uno.ByteSequence(data), ))
pv = PropertyValue()
pv.Name = 'InputStream'
pv.Value = istream
self.doc = {'doc': []}
try:
self.document = self.desktop.loadComponentFromURL('private:stream/swriter', '_blank', 0, (pv, ))
self.text = self.document.getText()
except:
self.text = None

4、读取doc文档中的数据

def ExportToJson(self):
try:
l = self.__ParseText(self.text, self.__Callback(self.doc['doc']))
self.doc['length'] = l
except:
self.doc = {'doc': [], 'length': 0}
return json.dumps(self.doc)

@staticmethod
def __Callback(alist):
def Append(sth):
alist.append(sth)
return Append
def __ParseText(self, text, func):
l = 0
text_it = text.createEnumeration()
while text_it.hasMoreElements():
element = text_it.nextElement()
if element.supportsService('com.sun.star.text.Paragraph'):
l += self.__ParseParagraph(element, func)
elif element.supportsService('com.sun.star.text.TextTable'):
l += self.__ParseTable(element, func)
else:
pass
return l
def __ParseParagraph(self, paragraph, func):
p = {'paragraph': []}
l = 0
paragraph_it = paragraph.createEnumeration()
while paragraph_it.hasMoreElements():
portion = paragraph_it.nextElement()
if portion.TextPortionType == 'Text':
l += self.__ParsePortionText(portion, self.__Callback(p['paragraph']))
elif portion.TextPortionType == 'SoftPageBreak':
pass
elif portion.TextPortionType == 'TextField':
l += self.__ParsePortionText(portion, self.__Callback(p['paragraph']))
else:
l += self.__ParseTextContent(portion, self.__Callback(p['paragraph']))
if hasattr(paragraph, 'createContentEnumeration'):
l += self.__ParseTextContent(paragraph, self.__Callback(p['paragraph']))
p['length'] = l
func(p)
return l

def __ParseTextContent(self, textcontent, func):
l = 0
content_it = textcontent.createContentEnumeration('com.sun.star.text.TextContent')
while content_it.hasMoreElements():
element = content_it.nextElement()
if element.supportsService('com.sun.star.text.TextGraphicObject'):
l += self.__ParsePortionGraphic(element, func)
elif element.supportsService('com.sun.star.text.TextEmbeddedObject'):
pass
elif element.supportsService('com.sun.star.text.TextFrame'):
l += self.__ParseFrame(element, func)
elif element.supportsService('com.sun.star.drawing.GroupShape'):
l += self.__ParseGroup(element, func)
else:
pass
return l

def __ParseFrame(self, frame, func):
f = {'frame': []}
l = self.__ParseText(frame.getText(), self.__Callback(f['frame']))
f['length'] = l
func(f)
return l

def __ParseGroup(self, group, func):
l = 0
for i in range(group.getCount()):
it = group.getByIndex(i)
if it.supportsService('com.sun.star.drawing.Text'):
l += self.__ParseFrame(it, func)
else:
pass
return l

def __ParsePortionText(self, portion_text, func):
func({'portion': portion_text.String, 'length': len(portion_text.String)})
return len(portion_text.String)

def __ParsePortionGraphic(self, portion_graphic, func):
gp = self.smgr.createInstanceWithContext('com.sun.star.graphic.GraphicProvider', self.ctx)
stream = self.smgr.createInstanceWithContext('com.sun.star.io.TempFile', self.ctx)
pv1 = PropertyValue()
pv1.Name = 'OutputStream'
pv1.Value = stream
pv2 = PropertyValue()
pv2.Name = 'MimeType'
pv2.Value = 'image/png'
gp.storeGraphic(portion_graphic.Graphic, (pv1, pv2))
stream.getOutputStream().flush()
stream.seek(0)
l = stream.getInputStream().available()
b = uno.ByteSequence(b'')
stream.seek(0)
l, b = stream.getInputStream().readBytes(b, l)
img = {'image': base64.b64encode(b.value).decode('ascii')}
img['height'] = portion_graphic.Height
img['width'] = portion_graphic.Width
img['actualheight'] = portion_graphic.ActualSize.Height
img['actualwidth'] = portion_graphic.ActualSize.Width
img['croptop'] = portion_graphic.GraphicCrop.Top
img['cropbottom'] = portion_graphic.GraphicCrop.Bottom
img['cropleft'] = portion_graphic.GraphicCrop.Left
img['cropright'] = portion_graphic.GraphicCrop.Right
img['length'] = 0
func(img)
return 0

def __ParseTable(self, table, func):
l = 0
try:
matrix = self.__GetTableMatrix(table)
seps = self.__GetTableSeparators(table)
t = {}
count = 0
for ri in matrix.keys():
t[ri] = {}
for ci in matrix[ri].keys():
t[ri][ci] = dict(matrix[ri][ci])
del t[ri][ci]['cell']
t[ri][ci]['content'] = []
l += self.__ParseText(matrix[ri][ci]['cell'], self.__Callback(t[ri][ci]['content']))
count += t[ri][ci]['rowspan'] * t[ri][ci]['colspan']
if count != len(t) * len(seps):
raise ValueError('count of cells error')
func({'table': t, 'row': len(t), 'column': len(seps), 'length': l, 'tableid': self.table_id})
self.table_id += 1
except:
l = 0
print('discard wrong table')
return l

@staticmethod
def __GetTableSeparators(table):
result = [table.TableColumnRelativeSum]
for ri in range(table.getRows().getCount()):
result += [s.Position for s in table.getRows().getByIndex(ri).TableColumnSeparators]
result = sorted(set(result))
for i in range(len(result) - 1):
result[i] += 1 if result[i] + 1 == result[i + 1] else 0
return sorted(set(result))

@staticmethod
def __NameToRC(name):
r = int(re.sub('[A-Za-z]', '', name)) - 1
cstr = re.sub('[0-9]', '', name)
c = 0
for i in range(len(cstr)):
if cstr[i] >= 'A' and cstr[i] <= 'Z':
c = c * 52 + ord(cstr[i]) - ord('A')
else:
c = c * 52 + 26 + ord(cstr[i]) - ord('a')
return r, c

@staticmethod
def __GetTableMatrix(table):
result = {}
for name in table.getCellNames():
ri, ci = WordToJson.__NameToRC(name)
cell = table.getCellByName(name)
if ri not in result:
result[ri] = {}
result[ri][ci] = {'cell': cell, 'rowspan': cell.RowSpan, 'name': name}

seps = WordToJson.__GetTableSeparators(table)
for ri in result.keys():
sep = [s.Position for s in table.getRows().getByIndex(ri).TableColumnSeparators] + [table.TableColumnRelativeSum]
sep = sorted(set(sep))
for ci in result[ri].keys():
right = seps.index(sep[ci]) if sep[ci] in seps else seps.index(sep[ci] + 1)
left = -1 if ci == 0 else seps.index(sep[ci - 1]) if sep[ci - 1] in seps else seps.index(sep[ci - 1] + 1)
result[ri][ci]['colspan'] = right - left
return result

5、写doc文档

self.doco = self.desktop.loadComponentFromURL('private:factory/swriter', '_blank', 0, ())
self.texto = self.doco.getText()
self.cursoro = self.texto.createTextCursor()
self.cursoro.ParaBottomMargin = 500
def __WriteText(self, text, texto, cursoro):
for it in text:
if 'paragraph' in it:
self.__WriteParagraph(it, texto, cursoro)
elif 'image' in it:
self.__WritePortionGraphic(it, texto, cursoro)
elif 'table' in it:
self.__WriteTable(it, texto, cursoro)

def __WriteParagraph(self, paragraph, texto, cursoro):
if paragraph['length'] > 0:
if 'result' in paragraph:
for it in paragraph['result']:
texto.insertString(cursoro, it['trans_sen'], False)
else:
texto.insertString(cursoro, paragraph['paragraph'], False)
texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False)

def __WritePortionGraphic(self, portion_graphic, texto, cursoro):
png_base64 = portion_graphic['image']
png = base64.b64decode(png_base64)
gp = self.smgr.createInstanceWithContext('com.sun.star.graphic.GraphicProvider', self.ctx)
istream = self.smgr.createInstanceWithContext('com.sun.star.io.SequenceInputStream', self.ctx)
istream.initialize((uno.ByteSequence(png), ))
pv = PropertyValue()
pv.Name = 'InputStream'
pv.Value = istream

actualsize = uno.createUnoStruct('com.sun.star.awt.Size')
actualsize.Height = portion_graphic['actualheight'] if 'actualheight' in portion_graphic else portion_graphic['height']
actualsize.Width = portion_graphic['actualwidth'] if 'actualwidth' in portion_graphic else portion_graphic['width']
graphiccrop = uno.createUnoStruct('com.sun.star.text.GraphicCrop')
graphiccrop.Top = portion_graphic['croptop'] if 'croptop' in portion_graphic else 0
graphiccrop.Bottom = portion_graphic['cropbottom'] if 'cropbottom' in portion_graphic else 0
graphiccrop.Left = portion_graphic['cropleft'] if 'cropleft' in portion_graphic else 0
graphiccrop.Right = portion_graphic['cropright'] if 'cropright' in portion_graphic else 0

image = self.doco.createInstance('com.sun.star.text.TextGraphicObject')
image.Surround = NONE
image.Graphic = gp.queryGraphic((pv, ))
image.Height = portion_graphic['height']
image.Width = portion_graphic['width']
image.setPropertyValue('ActualSize', actualsize)
image.setPropertyValue('GraphicCrop', graphiccrop)
texto.insertTextContent(cursoro, image, False)
texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False)

def __WriteTable(self, table, texto, cursoro):
tableo = self.doco.createInstance('com.sun.star.text.TextTable')
tableo.initialize(table['row'], table['column'])
texto.insertTextContent(cursoro, tableo, False)
#    texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False)
tcursoro = tableo.createCursorByCellName("A1")

hitbug = False
if table['row'] > 1:
tcursoro.goDown(1, True)
hitbug = tcursoro.getRangeName() == 'A1'

for ri in sorted([int(r) for r in table['table'].keys()]):
rs = table['table'][str(ri)]
for ci in sorted([int(c) for c in rs.keys()]):
cell = rs[str(ci)]
if hitbug == False and (cell['rowspan'] > 1 or cell['colspan'] > 1):
tcursoro.gotoCellByName(cell['name'], False)
if cell['rowspan'] > 1:
tcursoro.goDown(cell['rowspan'] - 1, True)
if cell['colspan'] > 1:
tcursoro.goRight(cell['colspan'] - 1, True)
tcursoro.mergeRange()
ctexto = tableo.getCellByName(cell['name'])
if ctexto == None:
continue
ccursoro = ctexto.createTextCursor()
ccursoro.CharWeight = FontWeight.NORMAL
ccursoro.CharWeightAsian = FontWeight.NORMAL
ccursoro.ParaAdjust = LEFT
self.__WriteText(cell['content'], ctexto, ccursoro)

6、生成二进制的doc文档数据

streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
self.doco.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'MS Word 2007 XML', 0), PropertyValue('OutputStream', 0, streamo, 0)))
streamo.flush()
_, datao = streamo.readBytes(None, streamo.available())

7、从doc文档数据生成pdf的二进制数据

streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
self.doco.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'writer_pdf_Export', 0), PropertyValue('OutputStream', 0, streamo, 0)))
streamo.flush()
_, datap = streamo.readBytes(None, streamo.available())

8、读取excel二进制数据

  def ImportFromMemory(self, data):
istream = self.smgr.createInstanceWithContext('com.sun.star.io.SequenceInputStream', self.ctx)
istream.initialize((uno.ByteSequence(data), ))
pv = PropertyValue()
pv.Name = 'InputStream'
pv.Value = istream
self.doc = {'doc': []}
try:
print("before loadComponentFromURL")
self.document = self.desktop.loadComponentFromURL('private:stream/scalc', '_blank', 0, (pv, ))
self.sheets = self.document.getSheets()
print("ImportFromMemory done")
except:
print("ImportFromMemory failed")
self.sheets = None

9、读取excel的文本数据

def ExportToJson(self):
try:
l = self.__ParseText(self.sheets, self.__Callback(self.doc['doc']))
self.doc['length'] = l
except:
self.doc = {'doc': [], 'length': 0}
return json.dumps(self.doc)
def __ParseText(self, sheets, func):
l = 0
sheets_it = sheets.createEnumeration()
while sheets_it.hasMoreElements():
element = sheets_it.nextElement()
if element.supportsService('com.sun.star.sheet.Spreadsheet'):
l += self.__ParseSpreadsheet(element, func)
return l

def __ParseSpreadsheet(self, spreadsheet, func):
l = 0
p = {'spreadsheet': []}
visible_cells_it = spreadsheet.queryVisibleCells().getCells().createEnumeration()
while visible_cells_it.hasMoreElements():
cell = visible_cells_it.nextElement()
type = cell.getType()
if type == self.EMPTY:
print("cell.type==empty")
elif type == self.VALUE:
print("cell.type==VALUE", "value=", cell.getValue(), cell.getCellAddress ())
elif type == self.TEXT:
print("cell.type==TEXT","content=", cell.getString().encode("UTF-8"), cell.getCellAddress ())
l += self.__ParseCellText(spreadsheet, cell, self.__Callback(p['spreadsheet']))
print("__ParseCellText=", p)
elif type == self.FORMULA:
print("cell.type==FORMULA", "formula=", cell.getValue())
p['length'] = l
func(p)
return l

def __ParseCellText(self, sheet, cell, func):
try:
x = cell.getCellAddress().Column
y = cell.getCellAddress().Row
sheetname = sheet.getName()
except:
x = -1
y = -1
sheetname = None
func({'celltext': cell.getString(), 'x': x, 'y': y, 'sheetname': sheetname, 'length': len(cell.getString())})
return len(cell.getString())
     self.EMPTY = uno.Enum("com.sun.star.table.CellContentType", "EMPTY")
self.TEXT = uno.Enum("com.sun.star.table.CellContentType", "TEXT")
self.FORMULA = uno.Enum("com.sun.star.table.CellContentType", "FORMULA")
self.VALUE = uno.Enum("com.sun.star.table.CellContentType", "VALUE")

10、替换excel的文本信息

def ImportFromJson(self, data):
doc = json.loads(data)
try:
self.__WriteText(doc['doc'])
except:
pass
def __WriteText(self, text):
print("__WriteText begin:", text)
sheet = None
for it in text:
if 'paragraph' in it and 'sheetname' in it:
if sheet == None or sheet.getName() != it['sheetname']:
try:
sheet = self.sheets.getByName(it['sheetname'])
print("getsheet:", it['sheetname'], "=", sheet.getName())
except:
sheet = None
continue
self.__WriteParagraph(it, sheet)

def __WriteParagraph(self, paragraph, sheet):
print("__WriteParagraph")
if paragraph['length'] > 0:
try:
x = paragraph['x']
y = paragraph['y']
print("getcell:", x, y)
cell = sheet.getCellByPosition(x, y)
print("getcell done")
except:
return
if 'result' in paragraph:
for it in paragraph['result']:
print("cell=", cell.getString())
cell.setString(it['trans_sen'])
print("cell,", cell.getString(), ",done")

11、生成excel文档二进制数据

     streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
self.document.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'Calc MS Excel 2007 XML', 0), PropertyValue('OutputStream', 0, streamo, 0)))
streamo.flush()
_, datao = streamo.readBytes(None, streamo.available())

12、生成excel的pdf文档

streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
self.document.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'calc_pdf_Export', 0), PropertyValue('OutputStream', 0, streamo, 0)))
streamo.flush()
_, datap = streamo.readBytes(None, streamo.available())

以上就是本文的全部内容,希望对大家的学习有所帮助

您可能感兴趣的文章:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息