Skip to content

Actual

Actual

Actual(
    base_url: str = "http://localhost:5006",
    token: str | None = None,
    password: str | None = None,
    file: str | None = None,
    encryption_password: str | None = None,
    data_dir: str | Path | None = None,
    cert: bool | SSLContext | str = True,
    bootstrap: bool = False,
    sa_kwargs: dict | None = None,
    extra_headers: dict[str, str] | None = None,
    timeout: float | Timeout | None = 60.0,
)

Bases: ActualServer

Implements the Python API for the Actual Server to be able to read and modify information on Actual using Python.

Parts of the implementation are available at the following file.

The client is expected to be used as a context manager, just like the following:

from actual import Actual
from actual.queries import get_transactions

with Actual(
        # Url of the Actual Server
        base_url="http://localhost:5006",
        # Password for authentication
        password="<your_password>",
        # Set the file to work with.
        # Can be either the file id or filename (if the name is unique)
        file="<file_id_or_name>",
        # Optional: Password for the file encryption.
        # Will not use it if set to None.
        encryption_password=None,
        # Optional: Directory to store downloaded files.
        # Will use a temporary if not provided
        data_dir="<path_to_data_directory>",
        # Optional: Path to the certificate file to use for the connection.
        # Can be set to `False` to disable SSL verification
        cert="<path_to_cert_file>"
) as actual:
    transactions = get_transactions(actual.session)

Parameters:

  • base_url

    (str, default: 'http://localhost:5006' ) –

    Url of the running Actual server

  • token

    (str | None, default: None ) –

    The token for authentication, if this is available (optional)

  • password

    (str | None, default: None ) –

    The password for authentication. It will be used on the .login() method to retrieve the token.

  • file

    (str | None, default: None ) –

    The name or id of the file to be set

  • encryption_password

    (str | None, default: None ) –

    Password used to configure encryption, if existing. Will raise an exception if it is not set but is required.

  • data_dir

    (str | Path | None, default: None ) –

    Path where to store the downloaded files from the server. If not specified, a temporary folder will be created instead. If database files are already present on the path, the library will try to reuse them by re-computing the sync request. Providing a path should speed up the download process considerably on the next call.

  • cert

    (bool | SSLContext | str, default: True ) –

    If a custom certificate should be used (e.g., self-signed certificate), its path can be provided as a string or as custom ssl.SSLContext. Set to False for no certificate check.

  • bootstrap

    (bool, default: False ) –

    If the server is not bootstrapped, bootstrap it with the password.

  • sa_kwargs

    (dict | None, default: None ) –

    Additional kwargs passed to the SQLAlchemy session maker. Examples are autoflush (enabled by default), autocommit (disabled by default). For a list of all parameters, check the SQLAlchemy documentation.

  • extra_headers

    (dict[str, str] | None, default: None ) –

    Additional headers to be attached to each request to the Actual server.

  • timeout

    (float | Timeout | None, default: 60.0 ) –

    Timeout in seconds applied to all HTTP requests. Set to None to disable. Accepts a float or an httpx.Timeout object for fine-grained control. Defaults to 60 seconds.

Methods:

  • set_file

    Sets the file id for the class for further requests.

  • run_migrations

    Runs the migration files, skipping the ones that have already been run.

  • create_budget

    Creates a budget using the remote server default database and migrations.

  • rename_budget

    Renames the budget with the given name.

  • delete_budget

    Deletes the currently loaded file from the server.

  • cleanup

    Cleans up the database from all deleted transactions, message caches and runs a VACUUM.

  • export_data

    Export your data as a zip file containing db.sqlite and metadata.json files.

  • encrypt

    Encrypts the local database using a new key, and re-uploads to the server.

  • upload_budget

    Uploads the current file to the Actual server.

  • reupload_budget

    Resets the user file on the backend and re-uploads the current copy instead.

  • apply_changes

    Applies a list of sync changes, based on what the sync method returned on the remote.

  • get_metadata

    Gets the content of the metadata.json file.

  • update_metadata

    Updates the metadata.json from the Actual file with the patch fields.

  • download_budget

    Downloads the budget file from the remote, applying all following changes required by the server.

  • download_master_encryption_key

    Downloads and assembles the key for decrypting the budget based on the provided encryption password.

  • import_zip

    Imports a zip file as the current database, as well as generating the local reflected session.

  • create_engine

    Internally creates the engine for the database, and loads the reflected metadata.

  • sync

    Does a sync request and applies all changes that are stored on the server on the local copy of the database.

  • commit

    Adds all pending entries done to the local database, and sends a sync request to the remote server.

  • run_rules

    Runs all the stored rules on the database on all transactions, without any filters.

  • run_bank_sync

    Runs the bank synchronization for the selected account. If missing, all accounts are synchronized.

Attributes:

Source code in actual/__init__.py
def __init__(
    self,
    base_url: str = "http://localhost:5006",
    token: str | None = None,
    password: str | None = None,
    file: str | None = None,
    encryption_password: str | None = None,
    data_dir: str | pathlib.Path | None = None,
    cert: bool | ssl.SSLContext | str = True,
    bootstrap: bool = False,
    sa_kwargs: dict | None = None,
    extra_headers: dict[str, str] | None = None,
    timeout: float | httpx.Timeout | None = 60.0,
):
    """
    Parts of the implementation are [available at the following file.](
    https://github.com/actualbudget/actual/blob/2178da0414958064337b2c53efc95ff1d3abf98a/packages/loot-core/src/server/cloud-storage.ts)

    The client is expected to be used as a context manager, just like the following:

    ```python
    from actual import Actual
    from actual.queries import get_transactions

    with Actual(
            # Url of the Actual Server
            base_url="http://localhost:5006",
            # Password for authentication
            password="<your_password>",
            # Set the file to work with.
            # Can be either the file id or filename (if the name is unique)
            file="<file_id_or_name>",
            # Optional: Password for the file encryption.
            # Will not use it if set to None.
            encryption_password=None,
            # Optional: Directory to store downloaded files.
            # Will use a temporary if not provided
            data_dir="<path_to_data_directory>",
            # Optional: Path to the certificate file to use for the connection.
            # Can be set to `False` to disable SSL verification
            cert="<path_to_cert_file>"
    ) as actual:
        transactions = get_transactions(actual.session)
    ```

    :param base_url: Url of the running Actual server
    :param token: The token for authentication, if this is available (optional)
    :param password: The password for authentication. It will be used on the .login() method to retrieve the token.
    :param file: The name or id of the file to be set
    :param encryption_password: Password used to configure encryption, if existing. Will raise an exception if
                                it is not set but is required.
    :param data_dir: Path where to store the downloaded files from the server. If not specified, a temporary folder
                     will be created instead. If database files are already present on the path, the library will
                     try to reuse them by re-computing the sync request. **Providing a path should speed up
                     the download process considerably on the next call**.
    :param cert: If a custom certificate should be used (e.g., self-signed certificate), its path can be provided
                 as a string or as custom [ssl.SSLContext][ssl.SSLContext]. Set to `False` for no certificate check.
    :param bootstrap: If the server is not bootstrapped, bootstrap it with the password.
    :param sa_kwargs: Additional `kwargs` passed to the SQLAlchemy session maker. Examples are `autoflush` (enabled
                      by default), `autocommit` (disabled by default). For a list of all parameters, check the
                      [SQLAlchemy documentation.](
                      https://docs.sqlalchemy.org/en/20/orm/session_api.html#sqlalchemy.orm.Session.__init__)
    :param extra_headers: Additional headers to be attached to each request to the Actual server.
    :param timeout: Timeout in seconds applied to all HTTP requests. Set to `None` to disable. Accepts a float or
                    an `httpx.Timeout` object for fine-grained control. Defaults to 60 seconds.
    """
    super().__init__(base_url, token, password, bootstrap, cert, extra_headers, timeout)
    self._file: RemoteFileListDTO | None = None
    self._data_dir: pathlib.Path | None = pathlib.Path(data_dir) if data_dir else None
    self.engine: Engine | None = None
    self._session: Session | None = None
    self._hulc_client: HULC_Client | None = None
    self._database_metadata: MetaData | None = None  # stores the metadata loaded from remote
    # set the correct file
    if file:
        self.set_file(file)
    self._encryption_password = encryption_password
    self._master_key: bytes | None = None
    self._in_context = False
    self._sa_kwargs = sa_kwargs or {}
    if "autoflush" not in self._sa_kwargs:
        self._sa_kwargs["autoflush"] = True

file property

Returns the current file. Raises if no file is set.

data_dir property

data_dir: Path

Returns the data directory. Raises if not set.

If the data_dir is not provided on budget creation, the default data directory will be created using a temporary directory using the fileId as primary key.

session property

session: Session

Returns a session for using with the queries.

set_file

set_file(
    file_id: str | RemoteFileListDTO,
) -> RemoteFileListDTO

Sets the file id for the class for further requests.

The file_id argument can be either the name, the remote id or the group id (also known as sync_id) from the file. If there are duplicates for the name, this method will raise UnknownFileId.

Source code in actual/__init__.py
def set_file(self, file_id: str | RemoteFileListDTO) -> RemoteFileListDTO:
    """
    Sets the file id for the class for further requests.

    The file_id argument can be either the name, the remote id or the group id (also known as sync_id) from the
    file. If there are duplicates for the name, this method will raise `UnknownFileId`.
    """
    if isinstance(file_id, RemoteFileListDTO):
        self._file = file_id
        return file_id
    selected_files = []
    user_files = self.list_user_files()
    for file in user_files.data:
        if (file.file_id == file_id or file.name == file_id or file.group_id == file_id) and file.deleted == 0:
            selected_files.append(file)
    if len(selected_files) == 0:
        raise UnknownFileId(f"Could not find a file id or identifier '{file_id}'")
    elif len(selected_files) > 1:
        raise UnknownFileId(f"Multiple files found with identifier '{file_id}'")
    return self.set_file(selected_files[0])

run_migrations

run_migrations(migration_files: list[str])

Runs the migration files, skipping the ones that have already been run.

The files can be retrieved from data_file_index method. This first file is the base database, and the following files are migrations. Migrations can also be .js files. In this case, we have to extract and execute queries from the standard JS.

Source code in actual/__init__.py
def run_migrations(self, migration_files: list[str]):
    """
    Runs the migration files, skipping the ones that have already been run.

    The files can be retrieved from [data_file_index][actual.Actual.data_file_index] method. This first file is
    the base database, and the following files are migrations. Migrations can also be `.js` files. In this case,
    we have to extract and execute queries from the standard JS.
    """
    with sqlite3.connect(self.data_dir / "db.sqlite") as conn:
        for file in migration_files:
            if not file.startswith("migrations"):
                continue  # in case db.sqlite file gets passed as one of the migrations files
            file_id = file.split("_")[0].split("/")[1]
            if conn.execute(f"SELECT id FROM __migrations__ WHERE id = '{file_id}';").fetchall():
                continue  # skip migration as it was already ran
            migration = self.data_file(file)  # retrieves file from actual server
            sql_statements = migration.decode()
            if file.endswith(".js"):
                # There is at least one migration which is a Javascript file.
                # All entries inside db.execQuery(`...`) must be executed
                exec_entries = js_migration_statements(sql_statements)
                sql_statements = "\n".join(exec_entries)
            conn.executescript(sql_statements)
            conn.execute(f"INSERT INTO __migrations__ (id) VALUES ({file_id});")
        conn.commit()
    conn.close()
    # update the metadata by reflecting the model
    if self.engine is None:
        raise ActualError("Engine not initialized. Download or create a budget first.")
    self._database_metadata = reflect_model(self.engine)

create_budget

create_budget(budget_name: str)

Creates a budget using the remote server default database and migrations.

If a password is provided, the budget will be encrypted. It's important to note that create_budget depends on the migration files from the Actual server, and those could be written in Javascript. Even though the library tries to execute all statements in those files, it is not an exact match. It is recommended to create budgets via frontend instead.

Source code in actual/__init__.py
def create_budget(self, budget_name: str):
    """
    Creates a budget using the remote server default database and migrations.

    If a password is provided, the budget will be encrypted. It's important to note that `create_budget`
    depends on the migration files from the Actual server, and those could be written in Javascript. Even though
    the library tries to execute all statements in those files, it is not an exact match. It is recommended
    to create budgets via frontend instead.
    """
    warnings.warn("Creating budgets via actualpy is not recommended due to custom code migrations.")
    migration_files = self.data_file_index()
    file_id = str(uuid.uuid4())
    # create folder for the files
    if not self._data_dir:
        self._data_dir = get_tmp_folder(file_id)
    # first migration file is the default database
    migration = self.data_file(migration_files[0])
    (self.data_dir / "db.sqlite").write_bytes(migration)
    # also write the metadata file with default fields
    random_id = str(uuid.uuid4()).replace("-", "")[:7]
    self.update_metadata(
        {
            "id": f"My-Finances-{random_id}",
            "budgetName": budget_name,
            "userId": self._token,
            "cloudFileId": file_id,
            "resetClock": True,
        }
    )
    self._file = RemoteFileListDTO(name=budget_name, fileId=file_id, groupId=None, deleted=0, encryptKeyId=None)
    # generate a session
    self.engine = create_engine(f"sqlite:///{self._data_dir}/db.sqlite")
    # create engine for downloaded database and run migrations
    self.run_migrations(migration_files[1:])
    if self._in_context:
        self._session = strong_reference_session(Session(self.engine, **self._sa_kwargs))
    # create a clock. Since the clock entry is not tracked, we use a separate session
    with Session(self.engine) as session:
        self._hulc_client = HULC_Client()
        get_or_create_clock(session, self._hulc_client)
        session.commit()

rename_budget

rename_budget(budget_name: str)

Renames the budget with the given name.

Source code in actual/__init__.py
def rename_budget(self, budget_name: str):
    """Renames the budget with the given name."""
    self.update_user_file_name(self.file.file_id, budget_name)

delete_budget

delete_budget()

Deletes the currently loaded file from the server.

Source code in actual/__init__.py
def delete_budget(self):
    """Deletes the currently loaded file from the server."""
    self.delete_user_file(self.file.file_id)
    # reset group id, as file cannot be synced anymore
    self.file.group_id = None

cleanup

cleanup()

Cleans up the database from all deleted transactions, message caches and runs a VACUUM.

Useful to reduce the size of the database before exporting it.

Taken from source code at [actual/packages/loot-core/src/server/sync/reset.ts] (https://github.com/actualbudget/actual/blob/89006275a092d2309ab03162a047e07663789198/packages/loot-core/src/server/sync/reset.ts#L37-L47)

Source code in actual/__init__.py
def cleanup(self):
    """
    Cleans up the database from all deleted transactions, message caches and runs a `VACUUM`.

    Useful to reduce the size of the database before exporting it.

    Taken from source code at [actual/packages/loot-core/src/server/sync/reset.ts]
    (https://github.com/actualbudget/actual/blob/89006275a092d2309ab03162a047e07663789198/packages/loot-core/src/server/sync/reset.ts#L37-L47)
    """
    with sqlite3.connect(self.data_dir / "db.sqlite") as conn:
        conn.executescript(
            """
            DELETE FROM messages_crdt;
            DELETE FROM messages_clock;
            DELETE FROM transactions WHERE tombstone = 1;
            DELETE FROM accounts WHERE tombstone = 1;
            DELETE FROM payees WHERE tombstone = 1;
            DELETE FROM categories WHERE tombstone = 1;
            DELETE FROM category_groups WHERE tombstone = 1;
            DELETE FROM schedules WHERE tombstone = 1;
            DELETE FROM rules WHERE tombstone = 1;
            ANALYZE;
            VACUUM;
        """
        )
    conn.close()

export_data

export_data(
    output_file: (
        str | PathLike[str] | IO[bytes] | None
    ) = None,
    cleanup: bool = True,
) -> bytes

Export your data as a zip file containing db.sqlite and metadata.json files.

It can be imported into another Actual instance by closing an open file (if any), then clicking the “Import file” button, then choosing “Actual”. Even when encryption is enabled, the exported zip file will not have any encryption.

The export will clean up the budget (i.e., remove up all changeset objects and entries marked for deletion), then writing a zip file to disk. See Actual.cleanup method for more information.

Source code in actual/__init__.py
def export_data(self, output_file: str | PathLike[str] | IO[bytes] | None = None, cleanup: bool = True) -> bytes:
    """
    Export your data as a zip file containing db.sqlite and metadata.json files.

    It can be imported into another Actual instance by closing an open file (if any), then clicking the
    _“Import file”_ button, then choosing _“Actual”_. Even when encryption is enabled, the exported zip file
    **will not have any encryption**.

    The export will clean up the budget (i.e., remove up all changeset objects and entries marked for deletion),
    then writing a zip file to disk. See [Actual.cleanup][actual.Actual.cleanup] method for more information.
    """
    if cleanup:
        self.cleanup()
    temp_file = io.BytesIO()
    with zipfile.ZipFile(temp_file, "a", zipfile.ZIP_DEFLATED, False) as z:
        z.write(self.data_dir / "db.sqlite", "db.sqlite")
        z.write(self.data_dir / "metadata.json", "metadata.json")
    content = temp_file.getvalue()
    if output_file:
        if isinstance(output_file, (str, PathLike)):
            with open(output_file, "wb") as f:
                f.write(content)
        else:
            output_file.write(content)
    return content

encrypt

encrypt(encryption_password: str)

Encrypts the local database using a new key, and re-uploads to the server.

WARNING: this resets the file on the server. Make sure you have a copy of the database before attempting this operation.

Source code in actual/__init__.py
def encrypt(self, encryption_password: str):
    """
    Encrypts the local database using a new key, and re-uploads to the server.

    WARNING: this resets the file on the server. Make sure you have a copy of the database before attempting this
    operation.
    """
    if encryption_password and not self.file.encrypt_key_id:
        # password was provided, but encryption key not, create one
        key_id = str(uuid.uuid4())
        salt = make_salt()
        self.user_create_key(self.file.file_id, key_id, encryption_password, salt)
        self.update_metadata({"encryptKeyId": key_id})
        self.file.encrypt_key_id = key_id
    elif self.file.encrypt_key_id:
        key_info = self.user_get_key(self.file.file_id)
        if key_info.data.salt is None:
            raise ActualDecryptionError("Encryption key has no salt.")
        salt = key_info.data.salt
    else:
        raise ActualError("Budget is encrypted but password was not provided")
    self._master_key = create_key_buffer(encryption_password, salt)
    # encrypt binary data with
    encrypted = encrypt(self.file.encrypt_key_id, self._master_key, self.export_data())
    binary_data = io.BytesIO(base64.b64decode(encrypted["value"]))
    encryption_meta = encrypted["meta"]
    self.reset_user_file(self.file.file_id)
    self.upload_user_file(binary_data.getvalue(), self.file.file_id, self.file.name, encryption_meta)
    self.set_file(self.file.file_id)

upload_budget

upload_budget()

Uploads the current file to the Actual server.

If attempting to upload your first budget, make sure you use Actual.create_budget first.

Source code in actual/__init__.py
def upload_budget(self):
    """
    Uploads the current file to the Actual server.

    If attempting to upload your first budget, make sure you use [Actual.create_budget][actual.Actual.create_budget]
    first.
    """
    if not self._data_dir:
        raise UnknownFileId("No current file loaded.")
    if not self._file:
        file_id = str(uuid.uuid4())
        metadata = self.get_metadata()
        budget_name = metadata.get("budgetName", "My Finances")
        self._file = RemoteFileListDTO(name=budget_name, fileId=file_id, groupId=None, deleted=0, encryptKeyId=None)
    binary_data = io.BytesIO()
    with zipfile.ZipFile(binary_data, "a", zipfile.ZIP_DEFLATED, False) as z:
        z.write(self.data_dir / "db.sqlite", "db.sqlite")
        z.write(self.data_dir / "metadata.json", "metadata.json")
    # we have to first upload the user file so the reference id can be used to generate a new encryption key
    self.upload_user_file(binary_data.getvalue(), self._file.file_id, self._file.name)
    # reset local file id to retrieve the grouping id
    self.set_file(self._file.file_id)
    # encrypt the file and re-upload
    if self._encryption_password or self._master_key or self._file.encrypt_key_id:
        self.encrypt(self._encryption_password)

reupload_budget

reupload_budget()

Resets the user file on the backend and re-uploads the current copy instead.

Works similar to the reset sync option from the frontend.

This operation can be destructive, so make sure you generate a copy before attempting to re-upload your budget.

Source code in actual/__init__.py
def reupload_budget(self):
    """
    Resets the user file on the backend and re-uploads the current copy instead.

    Works similar to the reset sync option from the frontend.

    **This operation can be destructive**, so make sure you generate a copy before attempting to re-upload
    your budget.
    """
    self.reset_user_file(self._file.file_id)
    self.update_metadata({"groupId": None})  # since we don't know what the new group id will be
    self.upload_budget()

apply_changes

apply_changes(messages: list[Message]) -> list[Changeset]

Applies a list of sync changes, based on what the sync method returned on the remote.

Source code in actual/__init__.py
def apply_changes(self, messages: list[Message]) -> list[Changeset]:
    """Applies a list of sync changes, based on what the sync method returned on the remote."""
    if not self.engine:
        raise UnknownFileId("No valid file available, download one with download_budget()")
    with Session(self.engine) as s:
        changes = []
        # use the current value to group updates together to the same row
        current_table, current_id, current_value = None, None, {}
        for message in messages:
            if message.dataset == "prefs":
                # write it to metadata.json instead
                self.update_metadata({message.row: message.get_value()})
                continue
            table = get_class_from_reflected_table_name(self._reflected_metadata, message.dataset)
            if table is None:
                raise ActualError(
                    f"Actual found a table not supported by the library: table '{message.dataset}' not found\n"
                )
            column = get_attribute_from_reflected_table_name(
                self._reflected_metadata, message.dataset, message.column
            )
            if column is None:
                raise ActualError(
                    f"Actual found a column not supported by the library: "
                    f"column '{message.column}' at table '{message.dataset}' not found\n"
                )
            # if the current id exists, and it's different from the next one, we update the values
            next_id = message.row
            if current_id and current_table is not None and (current_id != next_id or table != current_table):
                apply_change(s, current_table, current_id, current_value)
                # update changes
                change = Changeset(get_class_by_table_name(str(current_table.name)), current_id, current_value)
                changes.append(change)
                # update local cache
                current_table, current_id, current_value = table, next_id, {column: message.get_value()}
            # otherwise update the cache with the current value
            else:
                current_table, current_id, current_value[column] = table, next_id, message.get_value()
        # if after finishing all values there is a value left, update it too
        if current_table is not None and current_id is not None and current_value is not None:
            apply_change(s, current_table, current_id, current_value)
            # return a list of changes on this endpoint
            change = Changeset(get_class_by_table_name(str(current_table.name)), current_id, current_value)
            changes.append(change)
        s.commit()
        return changes

get_metadata

get_metadata() -> dict

Gets the content of the metadata.json file.

Source code in actual/__init__.py
def get_metadata(self) -> dict:
    """Gets the content of the `metadata.json` file."""
    metadata_file = self.data_dir / "metadata.json"
    return json.loads(metadata_file.read_text())

update_metadata

update_metadata(patch: dict)

Updates the metadata.json from the Actual file with the patch fields.

The patch is a dictionary that will then be merged on the metadata and written again to a file.

Source code in actual/__init__.py
def update_metadata(self, patch: dict):
    """
    Updates the `metadata.json` from the Actual file with the patch fields.

    The patch is a dictionary that will then be merged on the metadata and written again to a file.
    """
    metadata_file = self.data_dir / "metadata.json"
    if metadata_file.is_file():
        config = self.get_metadata()
        config.update(patch)
    else:
        config = patch
    metadata_file.write_text(json.dumps(config, separators=(",", ":")))

download_budget

download_budget(encryption_password: str | None = None)

Downloads the budget file from the remote, applying all following changes required by the server.

After the file is downloaded, the sync endpoint queries for the list of pending changes. The changes are individual row updates that are then applied one by one to the downloaded database state. See the sync method for more information.

If the budget is password-protected, the password needs to be present to download the budget. Otherwise, the budget download will fail with ActualDecryptionError.

When a data_dir was provided, the method will try to use the local downloaded copy by first checking if the sync id (named group id) remains the same. If it does, then the sync is executed using the stored files. Otherwise, the file is re-downloaded.

Source code in actual/__init__.py
def download_budget(self, encryption_password: str | None = None):
    """
    Downloads the budget file from the remote, applying all following changes required by the server.

    After the file is downloaded, the sync endpoint queries for the list of pending changes. The changes are
    individual row updates that are then applied one by one to the downloaded database state. See the
    [sync method][actual.Actual.sync] for more information.

    If the budget is password-protected, the password needs to be present to download the budget. Otherwise, the
    budget download will fail with [ActualDecryptionError][actual.exceptions.ActualDecryptionError].

    When a `data_dir` was provided, the method will try to use the local downloaded copy by first checking if the
    sync id (named group id) remains the same. If it does, then the sync is executed using the stored files.
    Otherwise, the file is re-downloaded.
    """
    # check if file has an encryption key and retrieve it
    encryption_password = encryption_password or self._encryption_password
    self.download_master_encryption_key(encryption_password)
    # then download user file if the data_dir is set and both files are present
    if self._data_dir and all((self.data_dir / path).is_file() for path in ["db.sqlite", "metadata.json"]):
        group_id = self.get_metadata().get("groupId")
        # handle the case where a new group id exists and the file needs to be re-downloaded
        if self.file.group_id != group_id:
            warnings.warn("Sync id has been reset on remote database, re-downloading the budget.")
            (self.data_dir / "db.sqlite").unlink()
            (self.data_dir / "metadata.json").unlink()
            return self.download_budget(encryption_password)
        # resume budget
        self.create_engine()
    else:
        file_bytes = self.download_user_file(self.file.file_id)
        if encryption_password is not None and self.file.encrypt_key_id:
            if self._master_key is None:
                raise ActualDecryptionError("Master encryption key is not set.")
            file_info = self.get_user_file_info(self.file.file_id)
            # decrypt file bytes
            file_bytes = decrypt_from_meta(self._master_key, file_bytes, file_info.data.encrypt_meta)
        self.import_zip(io.BytesIO(file_bytes))
        # sometimes downloaded budgets will not have the groupId
        self.update_metadata({"groupId": self.file.group_id})
    # actual js always calls validation
    self.validate()
    # run migrations if needed
    migration_files = self.data_file_index()
    self.run_migrations(migration_files[1:])
    self.sync()
    # create session if not existing
    if self._in_context and not self._session:
        self._session = strong_reference_session(Session(self.engine, **self._sa_kwargs))

download_master_encryption_key

download_master_encryption_key(
    encryption_password: str | None,
) -> bytes | None

Downloads and assembles the key for decrypting the budget based on the provided encryption password.

If the user file is not encryption, no key will be returned. If the file was encrypted, the key is assembled using the key salt and the password with the PBKDF2HMAC algorithm.

Source code in actual/__init__.py
def download_master_encryption_key(self, encryption_password: str | None) -> bytes | None:
    """
    Downloads and assembles the key for decrypting the budget based on the provided encryption password.

    If the user file is not encryption, no key will be returned. If the file was encrypted, the key is assembled
    using the key salt and the password with the PBKDF2HMAC algorithm.
    """
    if self.file.encrypt_key_id and encryption_password is None:
        raise ActualDecryptionError("File is encrypted but no encryption password was provided.")
    if encryption_password is not None and self.file.encrypt_key_id:
        key_info = self.user_get_key(self.file.file_id)
        if key_info.data.salt is None:
            raise ActualDecryptionError("Encryption key has no salt.")
        self._master_key = create_key_buffer(encryption_password, key_info.data.salt)
    return self._master_key

import_zip

import_zip(file_bytes: str | PathLike[str] | IO[bytes])

Imports a zip file as the current database, as well as generating the local reflected session.

This function enables you to inspect backups by loading them directly, instead of unzipping the contents.

Source code in actual/__init__.py
def import_zip(self, file_bytes: str | PathLike[str] | IO[bytes]):
    """
    Imports a zip file as the current database, as well as generating the local reflected session.

    This function enables you to inspect backups by loading them directly, instead of unzipping the contents.
    """
    try:
        zip_file = zipfile.ZipFile(file_bytes)
    except zipfile.BadZipfile as e:
        raise InvalidZipFile(f"Invalid zip file: {e}") from None
    # try to extract the file_id from the metadata.json
    if not self._data_dir:
        try:
            metadata = zip_file.read("metadata.json")
            file_id = json.loads(metadata).get("cloudFileId", None)
        except (KeyError, ValueError):
            file_id = None  # can happen if zip does not contain the file or file is not proper JSON
        self._data_dir = get_tmp_folder(file_id)
    # this should extract 'db.sqlite' and 'metadata.json' to the folder
    zip_file.extractall(self._data_dir)
    self.create_engine()

create_engine

create_engine()

Internally creates the engine for the database, and loads the reflected metadata.

Source code in actual/__init__.py
def create_engine(self):
    """Internally creates the engine for the database, and loads the reflected metadata."""
    self.engine = create_engine(f"sqlite:///{self._data_dir}/db.sqlite")
    self._database_metadata = reflect_model(self.engine)
    # load the client id
    with Session(self.engine) as session:
        clock = get_or_create_clock(session)
        self._hulc_client = clock.get_timestamp()

sync

sync() -> list[Changeset]

Does a sync request and applies all changes that are stored on the server on the local copy of the database.

Returns a list of changes that were applied to the local database, so you can also use this method to retrieve all changes that were made to the budget. See Changeset for more information.

Source code in actual/__init__.py
def sync(self) -> list[Changeset]:
    """
    Does a sync request and applies all changes that are stored on the server on the local copy of the database.

    Returns a list of changes that were applied to the local database, so you can also use this method to retrieve
    all changes that were made to the budget. See [Changeset][actual.utils.changeset.Changeset] for more
    information.
    """
    # after downloading the budget, some pending transactions still need to be retrieved using sync
    request = SyncRequest(
        {
            "messages": [],
            "fileId": self.file.file_id,
            "groupId": self.file.group_id,
            "keyId": self.file.encrypt_key_id,
        }
    )
    request.set_timestamp(
        client_id=self._sync_client.client_id,
        now=self._sync_client.ts,
        initial_count=self._sync_client.initial_count,
    )
    changes = self.sync_sync(request)
    messages = changes.get_messages(self._master_key)
    changeset = self.apply_changes(messages)
    # after receiving changes, update the client clock with the latest value
    if messages:
        self._hulc_client = HULC_Client.from_timestamp(changes.messages[-1].timestamp)
        # store timestamp also inside database. Session might not be available here, so we create one
        with Session(self.engine) as session:
            get_or_create_clock(session, self._hulc_client)
            session.commit()
    return changeset

commit

commit()

Adds all pending entries done to the local database, and sends a sync request to the remote server.

Only after a commit operation will the remote frontend show the new data.

It's important to note that this process is not atomic, so if the process is interrupted before it completes successfully, the files would end up in a unknown state, leading you to have to redo the budget download.

Source code in actual/__init__.py
def commit(self):
    """
    Adds all pending entries done to the local database, and sends a sync request to the remote server.

    Only **after** a commit operation will the remote frontend show the new data.

    It's important to note that this process is not atomic, so if the process is interrupted
    before it completes successfully, the files would end up in a unknown state, leading you to have to redo
    the budget download."""
    session = self.session
    # create sync request based on the session reference that is tracked
    req = SyncRequest({"fileId": self.file.file_id, "groupId": self.file.group_id})
    if self.file.encrypt_key_id:
        req.keyId = self.file.encrypt_key_id
    req.set_null_timestamp(client_id=self._sync_client.client_id)
    # flush to database, so that all data is evaluated on the database for consistency
    session.flush()
    # first we add all new entries and modify is required
    if "messages" in session.info:
        req.set_messages(session.info["messages"], self._sync_client, master_key=self._master_key)
    # commit to local database to clear the current flush cache
    session.commit()
    # sync all changes to the server
    if self.file.group_id:  # only files with a group id can be synced
        self.sync_sync(req)

run_rules

run_rules(
    transactions: Sequence[Transactions] | None = None,
)

Runs all the stored rules on the database on all transactions, without any filters.

Source code in actual/__init__.py
def run_rules(self, transactions: Sequence[Transactions] | None = None):
    """Runs all the stored rules on the database on all transactions, without any filters."""
    if transactions is None:
        transactions = get_transactions(self.session)
    ruleset = get_ruleset(self.session)
    ruleset.run(transactions)

run_bank_sync

run_bank_sync(
    account: str | Accounts | None = None,
    start_date: date | None = None,
    run_rules: bool = False,
) -> list[Transactions]

Runs the bank synchronization for the selected account. If missing, all accounts are synchronized.

If a start_date is provided, is used as a reference; otherwise, the last timestamp of each account will be used. If the account does not have any transaction, the last 90 days are considered instead.

If the start_date is not provided and the account does not have any transaction, a reconciled transaction will be generated to match the expected balance of the account. This would correct the account balance with the remote one.

If run_rules is set, the rules will be run for the imported transactions. Please note that unlike Actual, the rules here are run at the final imported objects. This is unlikely to cause data mismatches, but if you find any issue, feel free to report it on the Github repository.

Source code in actual/__init__.py
def run_bank_sync(
    self, account: str | Accounts | None = None, start_date: datetime.date | None = None, run_rules: bool = False
) -> list[Transactions]:
    """
    Runs the bank synchronization for the selected account. If missing, all accounts are synchronized.

    If a `start_date` is provided, is used as a reference; otherwise, the last timestamp of each account will be
    used. If the account does not have any transaction, the last 90 days are considered instead.

    If the `start_date` is not provided and the account does not have any transaction, a reconciled transaction will
    be generated to match the expected balance of the account. This would correct the account balance with the
    remote one.

    If `run_rules` is set, the rules will be run for the imported transactions. Please note that unlike Actual,
    the rules here are run at the final imported objects. This is unlikely to cause data mismatches,
    but if you find any issue, feel free to report it on the Github repository.
    """
    # if no account is provided, sync all of them, otherwise just the account provided
    if account is None:
        accounts = get_accounts(self.session)
    else:
        matched_account = get_account(self.session, account)
        if matched_account is None:
            raise ActualError(f"Account '{account}' not found.")
        accounts = [matched_account]
    imported_transactions = []

    is_first_sync: bool = False
    for acct in accounts:
        sync_method = acct.account_sync_source
        account_id = acct.account_id
        if not (account_id and sync_method):
            continue
        status = self.bank_sync_status(sync_method.lower())
        if not status.data.configured:
            continue
        if start_date is not None:
            default_start_date = start_date
        else:
            all_transactions = get_transactions(self.session, account=acct)
            if all_transactions:
                default_start_date = all_transactions[0].get_date()
            else:
                is_first_sync = True
                default_start_date = datetime.date.today() - datetime.timedelta(days=90)
        transactions = self._run_bank_sync_account(acct, default_start_date, is_first_sync)
        imported_transactions.extend(transactions)
    if run_rules:
        self.run_rules(imported_transactions)
    return imported_transactions