test

testclass.py

import uno
import subprocess
import time
import os
import threading
import re
import platform
from datetime import datetime, timedelta

# =========================
# Singleton
# =========================
class _Singleton:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
        return cls._instance

# =========================
# LibreOffice Manager
# =========================
class LibreOfficeManager(_Singleton):

    def __init__(self, host="localhost", port=2002, soffice_path=None):
        if hasattr(self, "_initialized"):
            return

        self.host = host
        self.port = port

        # OS別パス
        if soffice_path:
            self.soffice_path = soffice_path
        else:
            if platform.system() == "Windows":
                self.soffice_path = r"C:\Program Files\LibreOffice\program\soffice.exe"
            elif platform.system() == "Darwin":
                self.soffice_path = "/Applications/LibreOffice.app/Contents/MacOS/soffice"
            else:
                self.soffice_path = "soffice"

        self.ctx = None
        self.smgr = None
        self.desktop = None
        self.process = None

        self._ensure_connection()
        self._initialized = True

    def _ensure_connection(self):
        if not self._try_connect():
            self._start()
            self._wait()

    def _try_connect(self):
        try:
            local_ctx = uno.getComponentContext()
            resolver = local_ctx.ServiceManager.createInstanceWithContext(
                "com.sun.star.bridge.UnoUrlResolver", local_ctx)

            self.ctx = resolver.resolve(
                f"uno:socket,host={self.host},port={self.port};urp;StarOffice.ComponentContext")

            self.smgr = self.ctx.ServiceManager
            self.desktop = self.smgr.createInstanceWithContext(
                "com.sun.star.frame.Desktop", self.ctx)
            return True
        except Exception:
            return False

    def _start(self):
        self.process = subprocess.Popen([
            self.soffice_path,
            "--headless",
            "--nologo",
            "--nodefault",
            "--nofirststartwizard",
            "--norestore",
            f"--accept=socket,host={self.host},port={self.port};urp;"
        ])

    def _kill(self):
        if platform.system() == "Windows":
            os.system("taskkill /IM soffice.exe /F")
        else:
            os.system("pkill -f soffice")

    def restart(self):
        self._kill()
        time.sleep(2)
        self._start()
        self._wait()

    def _wait(self, timeout=20):
        start = time.time()
        while time.time() - start < timeout:
            if self._try_connect():
                return
            time.sleep(1)
        raise RuntimeError("LibreOffice 起動失敗")

    def safe_execute(self, func, retries=3):
        for _ in range(retries):
            try:
                return func()
            except Exception:
                self.restart()
        raise RuntimeError("リカバリ失敗")

# =========================
# Excel操作
# =========================
class LibreOfficeExcel:

    def __init__(self):
        self.manager = LibreOfficeManager()
        self.desktop = self.manager.desktop
        self.document = None
        self.sheet = None

    # -------------------------
    # ファイル
    # -------------------------
    def open(self, path):
        def _open():
            url = uno.systemPathToFileUrl(path)
            self.document = self.desktop.loadComponentFromURL(url, "_blank", 0, ())
            self.sheet = self.document.Sheets.getByIndex(0)
        self.manager.safe_execute(_open)

    def save(self, path=None):
        def _save():
            if path:
                self.document.storeAsURL(uno.systemPathToFileUrl(path), ())
            else:
                self.document.store()
        self.manager.safe_execute(_save)

    def close(self):
        self.manager.safe_execute(lambda: self.document.close(True))

    # -------------------------
    # テンプレート
    # -------------------------
    def create_from_template(self, template, output):
        import shutil
        shutil.copy(template, output)
        self.open(output)

    # -------------------------
    # A1変換
    # -------------------------
    def _a1_to_index(self, cell):
        m = re.match(r"([A-Z]+)([0-9]+)", cell.upper())
        col_str, row = m.groups()

        col = 0
        for c in col_str:
            col = col * 26 + (ord(c) - 64)

        return int(row) - 1, col - 1

    # -------------------------
    # 型変換
    # -------------------------
    def _convert_value(self, cell):
        val = cell.getValue()
        if cell.Type.value == "VALUE":
            if val > 20000:
                return datetime(1899, 12, 30) + timedelta(days=val)
            return val
        return cell.getString()

    # -------------------------
    # セル
    # -------------------------
    def get_cell(self, cell, formatted=True):
        def _get():
            r, c = self._a1_to_index(cell)
            obj = self.sheet.getCellByPosition(c, r)
            return obj.getString() if formatted else self._convert_value(obj)
        return self.manager.safe_execute(_get)

    def set_cell(self, cell, value):
        def _set():
            r, c = self._a1_to_index(cell)
            obj = self.sheet.getCellByPosition(c, r)
            if isinstance(value, (int, float)):
                obj.setValue(value)
            else:
                obj.setString(str(value))
        self.manager.safe_execute(_set)

    # -------------------------
    # 範囲(高速)
    # -------------------------
    def set_range_fast(self, start_cell, data):
        def _set():
            r, c = self._a1_to_index(start_cell)
            rows = len(data)
            cols = len(data[0])
            self.sheet.getCellRangeByPosition(
                c, r, c+cols-1, r+rows-1
            ).setDataArray(tuple(tuple(row) for row in data))
        self.manager.safe_execute(_set)

    def get_range_fast(self, range_str):
        def _get():
            s, e = range_str.split(":")
            r1, c1 = self._a1_to_index(s)
            r2, c2 = self._a1_to_index(e)
            return list(self.sheet.getCellRangeByPosition(c1, r1, c2, r2).getDataArray())
        return self.manager.safe_execute(_get)

    # -------------------------
    # pandas
    # -------------------------
    def dataframe_to_sheet(self, df, start_cell="A1", header=True):
        data = df.values.tolist()
        if header:
            data.insert(0, df.columns.tolist())
        self.set_range_fast(start_cell, data)

    def sheet_to_dataframe(self, range_str, header=True):
        import pandas as pd
        data = self.get_range_fast(range_str)
        if header:
            return pd.DataFrame(data[1:], columns=data[0])
        return pd.DataFrame(data)

    # -------------------------
    # 並列
    # -------------------------
    @staticmethod
    def create_parallel_instance(port):
        inst = LibreOfficeExcel()
        inst.manager = LibreOfficeManager(port=port)
        inst.desktop = inst.manager.desktop
##         return inst

test.py

import pandas as pd

excel = LibreOfficeExcel()

# テンプレート
excel.create_from_template("template.xlsx", "output.xlsx")

# DataFrame書き込み
df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
excel.dataframe_to_sheet(df, "A1")

# 高速読み込み
print(excel.get_range_fast("A1:B3"))

excel.save()
excel.close()

xxxclass.py

# excel_libreoffice.py
import uno
from com.sun.star.beans import PropertyValue
from com.sun.star.sheet import CellAddress, CellRangeAddress
import os

class ExcelLibreOffice:
    """
    LibreOffice UNO を使って Excel ファイルを操作するクラス。
    """

    def __init__(self, libreoffice_path: str = None):
        """
        コンストラクタ。LibreOffice が起動していない場合は自動起動。

        :param libreoffice_path: LibreOffice 実行ファイルへのパス
                                 (省略時は PATH から探す)
        """
        self._context = uno.getComponentContext()
        self._resolver = self._context.ServiceManager.createInstanceWithContext(
            "com.sun.star.bridge.UnoUrlResolver", self._context)

        # LibreOffice が起動していない場合は自動で起動
        try:
            self._desktop = self._resolver.resolve("uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext")
        except Exception:
            if libreoffice_path is None:
                libreoffice_path = "libreoffice"  # PATH 上にあることを想定
            os.system(f'{libreoffice_path} --accept="socket,host=localhost,port=2002;urp;" &')
            # 少し待つ(実際は適切な同期処理が必要)
            import time
            time.sleep(3)
            self._desktop = self._resolver.resolve("uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext")

        self._desktop_service = self._context.ServiceManager.createInstanceWithContext(
            "com.sun.star.frame.Desktop", self._context)

    # ------------------------------------------------------------------
    # ファイル操作
    # ------------------------------------------------------------------
    def open_file(self, file_path: str):
        """
        Excel ファイルを開く(既存のドキュメントに対しては新規ウィンドウで開きます)。

        :param file_path: 開くファイルへの絶対パス
        """
        abs_path = os.path.abspath(file_path)
        url = f"file://{abs_path}"
        self._document = self._desktop_service.loadComponentFromURL(
            url, "_blank", 0,
            (PropertyValue(Name="Hidden", Value=True),))
        # ドキュメントが Spreadsheet であることを確認
        if not hasattr(self._document, "Sheets"):
            raise RuntimeError("開いたドキュメントはスプレッドシートではありません")

    def save_file(self, file_path: str = None):
        """
        現在のドキュメントを保存。file_path が None の場合は元ファイルに上書き。

        :param file_path: 保存先パス(省略時はオリジナル)
        """
        if not hasattr(self, "_document"):
            raise RuntimeError("まだドキュメントが開かれていません")

        url = f"file://{os.path.abspath(file_path or self._document.URL)}"
        props = (PropertyValue(Name="FilterName", Value="Calc MS Excel 2007 XML"),)
        self._document.storeToURL(url, props)

    def close_file(self):
        """
        ドキュメントを閉じる。
        """
        if hasattr(self, "_document"):
            self._document.close(True)

    # ------------------------------------------------------------------
    # シート操作
    # ------------------------------------------------------------------
    def select_sheet_by_name(self, sheet_name: str):
        """
        名前でシートを選択し、内部状態に保持します。

        :param sheet_name: シート名
        """
        sheets = self._document.Sheets
        if not sheets.hasByName(sheet_name):
            raise ValueError(f"シート {sheet_name} が存在しません")
        self._current_sheet = sheets.getByName(sheet_name)

    def get_active_sheet(self):
        return self._current_sheet

    # ------------------------------------------------------------------
    # セルアドレス変換(Excel 形式 ↔ CellAddress)
    # ------------------------------------------------------------------
    @staticmethod
    def _excel_to_cell_address(excel_ref: str, sheet_index=0) -> CellAddress:
        """
        Excel 形式(A1, BC12…)を CellAddress に変換。

        :param excel_ref: A1 形式の文字列
        :return: uno.generic.CellAddress
        """
        import re

        match = re.match(r"^([A-Za-z]+)(\d+)$", excel_ref)
        if not match:
            raise ValueError(f"無効なセル参照: {excel_ref}")

        col_letters, row_number = match.groups()
        # 列番号計算(A=1 → 0)
        col_index = 0
        for ch in col_letters.upper():
            col_index = col_index * 26 + (ord(ch) - ord('A') + 1)
        return CellAddress(Sheet=sheet_index, Row=int(row_number)-1, Column=col_index-1)

    @staticmethod
    def _cell_address_to_excel(cell: CellAddress) -> str:
        """
        CellAddress を Excel 形式に変換。

        :param cell: uno.generic.CellAddress
        :return: A1 形式文字列
        """
        col = cell.Column + 1
        letters = ""
        while col > 0:
            col, rem = divmod(col - 1, 26)
            letters = chr(rem + ord('A')) + letters
        return f"{letters}{cell.Row+1}"

    # ------------------------------------------------------------------
    # 行・列操作
    # ------------------------------------------------------------------
    def insert_row(self, row_index: int):
        """
        指定行を挿入。

        :param row_index: 0‑based インデックス
        """
        self._current_sheet.Rows.insertByIndex(row_index, 1)

    def delete_row(self, row_index: int):
        """
        指定行を削除。

        :param row_index: 0‑based インデックス
        """
        self._current_sheet.Rows.removeByIndex(row_index, 1)

    def insert_column(self, col_index: int):
        """
        指定列を挿入。

        :param col_index: 0‑based インデックス
        """
        self._current_sheet.Columns.insertByIndex(col_index, 1)

    def delete_column(self, col_index: int):
        """
        指定列を削除。

        :param col_index: 0‑based インデックス
        """
        self._current_sheet.Columns.removeByIndex(col_index, 1)

    # ------------------------------------------------------------------
    # セル読み書き
    # ------------------------------------------------------------------
    def read_cell(self, ref: str):
        """
        セルの値を取得。

        :param ref: A1 形式
        :return: 値(文字列・数値など)
        """
        addr = self._excel_to_cell_address(ref)
        cell = self._current_sheet.getCellByPosition(addr.Column, addr.Row)
        return cell.Value if cell.Type == 3 else cell.String

    def write_cell(self, ref: str, value):
        """
        セルに値を書き込む。

        :param ref: A1 形式
        :param value: 書き込みたい値(数値、文字列など)
        """
        addr = self._excel_to_cell_address(ref)
        cell = self._current_sheet.getCellByPosition(addr.Column, addr.Row)
        if isinstance(value, (int, float)):
            cell.Value = value
        else:
            cell.String = str(value)

    # ------------------------------------------------------------------
    # 書式コピー/ペースト
    # ------------------------------------------------------------------
    def copy_cell_format(self, src_ref: str, dst_ref: str):
        """
        1セルの書式を別セルへコピー。

        :param src_ref: A1 形式(コピー元)
        :param dst_ref: A1 形式(コピー先)
        """
        src = self._current_sheet.getCellRangeByName(src_ref)
        dst = self._current_sheet.getCellRangeByName(dst_ref)
        dst.CellStyle = src.CellStyle

    # ------------------------------------------------------------------
    # 幅・高さ操作
    # ------------------------------------------------------------------
    def get_column_width(self, col_index: int) -> float:
        """
        列幅をポイント単位で取得。

        :param col_index: 0‑based インデックス
        :return: 幅(ポイント)
        """
        return self._current_sheet.Columns.getByIndex(col_index).Width

    def set_column_width(self, col_index: int, width_pt: float):
        """
        列幅を設定。

        :param col_index: 0‑based インデックス
        :param width_pt: 幅(ポイント)
        """
        self._current_sheet.Columns.getByIndex(col_index).Width = width_pt

    def get_row_height(self, row_index: int) -> float:
        """
        行高さをポイント単位で取得。

        :param row_index: 0‑based インデックス
        :return: 高さ(ポイント)
        """
        return self._current_sheet.Rows.getByIndex(row_index).Height

    def set_row_height(self, row_index: int, height_pt: float):
        """
        行高さを設定。

        :param row_index: 0‑based インデックス
        :param height_pt: 高さ(ポイント)
        """
        self._current_sheet.Rows.getByIndex(row_index).Height = height_pt

# ----------------------------------------------------------------------
# 使用例(コメントアウトしておきます)
# ----------------------------------------------------------------------
# if __name__ == "__main__":
#     xl = ExcelLibreOffice()
#     xl.open_file("/path/to/your/file.xlsx")
#     xl.select_sheet_by_name("Sheet1")
#
#     # 行追加
#     xl.insert_row(2)
#
#     # セル書き込み
#     xl.write_cell("B3", "Hello LibreOffice!")
#
#     # 列幅設定(ポイント単位)
#     xl.set_column_width(1, 80.0)
#
#     # 保存
#     xl.save_file("/path/to/your/file_modified.xlsx")
#     xl.close_file()

example

使い方の簡単な流れ

ステップ コマンド例
LibreOffice を起動(自動) & 接続 xl = ExcelLibreOffice()
ファイルを開く xl.open_file("sample.xlsx")
シート選択 xl.select_sheet_by_name("Sheet1")
行・列操作 xl.insert_row(2) / xl.delete_column(3)
セル読み書き xl.write_cell("C5", 123) / value = xl.read_cell("C5")
書式コピー xl.copy_cell_format("A1", "B1")
幅・高さ設定 xl.set_column_width(0, 90.0) / xl.set_row_height(1, 25.0)
保存 & 閉じる xl.save_file() / xl.close_file()

備考 列幅や行高さはポイント(pt)単位で扱います。必要に応じて mm や in への変換を追加してください。 書式コピーはセルのスタイルだけです。数値書式やフォント属性も同様に取得・設定できます。 uno.getComponentContext() はスクリプトが LibreOffice と同じ JVM 上で動作する場合に有効ですが、独立した Python プロセスから呼び出す場合は com.sun.star.bridge.UnoUrlResolver を通じて接続します。上記コードではその部分をカバーしています。
これで LibreOffice の UNO API を使って Excel ファイルの基本操作が可能になります。ぜひプロジェクトに組み込んで試してみてください!

コメント

タイトルとURLをコピーしました