File: //proc/self/root/lib/python3/dist-packages/UpdateManager/backend/__init__.py
#!/usr/bin/env python
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
"""Integration of package managers into UpdateManager"""
# (c) 2005-2009 Canonical, GPL
import gi
gi.require_version("Gtk", "3.0")
gi.require_version("Snapd", "2")
from gi.repository import GLib, Gtk, Snapd
from apt import Cache
import json
import logging
import os
import re
import subprocess
from gettext import gettext as _
from threading import Thread
from UpdateManager.Core.MyCache import MyCache
from UpdateManager.Core.utils import inhibit_sleep, get_dist_version
from UpdateManager.Dialogs import Dialog
class InstallBackend(Dialog):
    ACTION_UPDATE = 0
    ACTION_PRE_INSTALL = 1
    ACTION_INSTALL = 2
    def __init__(self, window_main, action):
        Dialog.__init__(self, window_main)
        self.action = action
        self.sleep_cookie = None
    def start(self):
        os.environ["APT_LISTCHANGES_FRONTEND"] = "none"
        # Do not suspend during the update process
        self.sleep_cookie = inhibit_sleep()
        if self.action == self.ACTION_PRE_INSTALL:
            unfresh_cache = self.window_main.cache
            fresh_cache = Cache(rootdir=self.window_main.cache.rootdir)
            # Install OEM packages, update, then do ACTION_INSTALL
            pkgs_install_oem = []
            pkgs_upgrade_oem = []
            for pkg in self.window_main.oem_metapackages:
                unfresh_pkg = unfresh_cache[pkg]
                fresh_pkg = fresh_cache[pkg]
                if unfresh_pkg.marked_install and not fresh_pkg.is_installed:
                    pkgs_install_oem.append(pkg)
                elif unfresh_pkg.marked_upgrade and fresh_pkg.is_upgradable:
                    pkgs_upgrade_oem.append(pkg)
            self.commit_oem(pkgs_install_oem, pkgs_upgrade_oem)
        elif self.action == self.ACTION_INSTALL:
            # Get the packages which should be installed and update
            pkgs_install = []
            pkgs_upgrade = []
            pkgs_remove = []
            # Get a fresh cache in case update-manager's is outdated to
            # skip operations that already took place
            fresh_cache = Cache(rootdir=self.window_main.cache.rootdir)
            for pkg in self.window_main.cache:
                try:
                    if (
                        pkg.marked_install
                        and not fresh_cache[pkg.name].is_installed
                    ):
                        pkgname = pkg.name
                        if pkg.is_auto_installed:
                            pkgname += "#auto"
                        pkgs_install.append(pkgname)
                    elif (
                        pkg.marked_upgrade
                        and fresh_cache[pkg.name].is_upgradable
                    ):
                        pkgs_upgrade.append(pkg.name)
                    elif (
                        pkg.marked_delete
                        and fresh_cache[pkg.name].is_installed
                    ):
                        pkgs_remove.append(pkg.name)
                except KeyError:
                    # pkg missing from fresh_cache can't be modified
                    pass
            self.commit(pkgs_install, pkgs_upgrade, pkgs_remove)
        else:
            self.update()
    def update(self):
        """Run a update to refresh the package list"""
        raise NotImplementedError
    def commit_oem(self, pkgs_install_oem, pkgs_upgrade_oem):
        """Install these OEM packages"""
        self._action_done(
            self.ACTION_PRE_INSTALL,
            authorized=True,
            success=True,
            error_string=None,
            error_desc=None,
            trans_failed=None,
        )
    def commit(self, pkgs_install, pkgs_upgrade, pkgs_remove):
        """Commit the cache changes"""
        raise NotImplementedError
    def get_snap_seeds(self):
        seeded_snaps = {}
        unseeded_snaps = {}
        curr_channel = "stable/ubuntu-" + get_dist_version()
        cache = self.window_main.cache
        try:
            d2s_file = open(
                "/usr/share/ubuntu-release-upgrader/deb2snap.json", "r"
            )
            d2s = json.load(d2s_file)
            d2s_file.close()
            for snap in d2s["seeded"]:
                seed = d2s["seeded"][snap]
                deb = seed.get("deb", None)
                to_channel = seed.get("to_channel", curr_channel)
                metapkg = seed.get("metapkg", None)
                if metapkg not in cache:
                    continue
                if metapkg and cache[metapkg].is_installed is False:
                    continue
                seeded_snaps[snap] = (deb, to_channel)
            for snap in d2s["unseeded"]:
                unseed = d2s["unseeded"][snap]
                from_channel = unseed.get("from_channel", curr_channel)
                metapkg = seed.get("metapkg", None)
                if metapkg not in cache:
                    continue
                if metapkg and cache[metapkg].is_installed is False:
                    continue
                unseeded_snaps[snap] = from_channel
        except Exception as e:
            logging.debug("error reading deb2snap.json file (%s)" % e)
        return seeded_snaps, unseeded_snaps
    def get_deb2snap_dups(self):
        # update and grab the latest cache
        try:
            if self.window_main.cache is None:
                self.window_main.cache = MyCache(None)
            else:
                self.window_main.cache.open(None)
                self.window_main.cache._initDepCache()
            cache = self.window_main.cache
        except Exception as e:
            # just return an empty array for now, it's perfectly safe to
            # postpone this duplicates check to a later update.
            logging.debug("error reading cache (%s)" % e)
            return []
        duplicates = []
        seeded_snaps, _ = self.get_snap_seeds()
        for snap, (deb, _) in seeded_snaps.items():
            # if the deb is installed and was not manually installed,
            # replace it
            if deb in cache and cache[deb].is_installed:
                deb_is_auto = True
                cache[deb].mark_delete()
                for pkg in cache.get_changes():
                    if (
                        pkg.is_installed
                        and pkg.marked_delete
                        and not pkg.is_auto_installed
                    ):
                        deb_is_auto = False
                        break
                cache.clear()
                if deb_is_auto:
                    duplicates.append(deb)
        return duplicates
    def get_snap_transitions(self):
        # populate snap_list with deb2snap transitions
        snap_list = {}
        seeded_snaps, unseeded_snaps = self.get_snap_seeds()
        for snap, (deb, to_channel) in seeded_snaps.items():
            snap_object = {}
            # check if the snap is already installed
            snap_info = subprocess.Popen(
                ["snap", "info", snap],
                universal_newlines=True,
                stdout=subprocess.PIPE,
            ).communicate()
            if re.search("^installed: ", snap_info[0], re.MULTILINE):
                logging.debug("Snap %s is installed" % snap)
                continue
            elif deb in self.window_main.duplicate_packages:
                # install the snap if the deb was just marked delete
                snap_object["command"] = "install"
                snap_object["channel"] = to_channel
                snap_list[snap] = snap_object
        for snap, (from_channel) in unseeded_snaps.items():
            snap_object = {}
            # check if the snap is already installed
            snap_info = subprocess.Popen(
                ["snap", "info", snap],
                universal_newlines=True,
                stdout=subprocess.PIPE,
            ).communicate()
            if re.search("^installed: ", snap_info[0], re.MULTILINE):
                logging.debug("Snap %s is installed" % snap)
                # its not tracking the release channel so don't remove
                re_channel = "stable/ubuntu-[0-9][0-9].[0-9][0-9]"
                if not re.search(
                    r"^tracking:.*%s" % re_channel, snap_info[0], re.MULTILINE
                ):
                    logging.debug(
                        "Snap %s is not tracking the release channel" % snap
                    )
                    continue
                snap_object["command"] = "remove"
                # check if this snap is being used by any other snaps
                conns = subprocess.Popen(
                    ["snap", "connections", snap],
                    universal_newlines=True,
                    stdout=subprocess.PIPE,
                ).communicate()
                for conn in conns[0].split("\n"):
                    conn_cols = conn.split()
                    if len(conn_cols) != 4:
                        continue
                    plug = conn_cols[1]
                    slot = conn_cols[2]
                    if slot.startswith(snap + ":"):
                        plug_snap = plug.split(":")[0]
                        if (
                            plug_snap != "-"
                            and plug_snap not in unseeded_snaps
                        ):
                            logging.debug(
                                "Snap %s is being used by %s. "
                                "Switching it to stable track"
                                % (snap, plug_snap)
                            )
                            snap_object["command"] = "refresh"
                            snap_object["channel"] = "stable"
                            break
                snap_list[snap] = snap_object
        return snap_list
    def update_snap_cb(self, client, change, _, user_data):
        index, count, progress_bar = user_data
        if not progress_bar:
            return
        # determine how much of this change has been done
        task_total = 0
        task_done = 0
        for task in change.get_tasks():
            task_total += task.get_progress_total()
            task_done += task.get_progress_done()
        task_fraction = task_done / task_total
        # determine how much total progress has been made
        total_fraction = (task_fraction / count) + (index / count)
        # change.get_tasks() can increase between callbacks so we must
        # avoid jumping backward in progress here
        if total_fraction > progress_bar.get_fraction():
            GLib.idle_add(progress_bar.set_fraction, total_fraction)
    def update_snaps(self):
        # update status and progress bar
        def update_status(status):
            GLib.idle_add(self.label_details.set_label, status)
        def update_progress(progress_bar):
            progress_bar.pulse()
            return True
        update_status(_("Updating snaps"))
        progress_bar = None
        progress_timer = None
        progress_bars = self.progressbar_slot.get_children()
        if progress_bars and isinstance(progress_bars[0], Gtk.ProgressBar):
            progress_bar = progress_bars[0]
            progress_timer = GLib.timeout_add(
                100, update_progress, progress_bar
            )
        # populate snap_list with deb2snap transitions
        snap_list = self.get_snap_transitions()
        if progress_timer:
            GLib.source_remove(progress_timer)
            progress_bar.set_fraction(0)
        # (un)install (un)seeded snap(s)
        try:
            client = Snapd.Client()
            client.connect_sync()
            index = 0
            count = len(snap_list)
            for snap, snap_object in snap_list.items():
                command = snap_object["command"]
                if command == "refresh":
                    update_status(_("Refreshing %s snap" % snap))
                    client.refresh_sync(
                        snap,
                        snap_object["channel"],
                        self.update_snap_cb,
                        progress_callback_data=(index, count, progress_bar),
                    )
                elif command == "remove":
                    update_status(_("Removing %s snap" % snap))
                    client.remove_sync(
                        snap,
                        self.update_snap_cb,
                        progress_callback_data=(index, count, progress_bar),
                    )
                else:
                    update_status(_("Installing %s snap" % snap))
                    client.install_sync(
                        snap,
                        snap_object["channel"],
                        self.update_snap_cb,
                        progress_callback_data=(index, count, progress_bar),
                    )
                index += 1
        except GLib.Error as e:
            logging.debug("error updating snaps (%s)" % e)
            GLib.idle_add(
                self.window_main.start_error,
                False,
                _("Upgrade only partially completed."),
                _(
                    "An error occurred while updating snaps. "
                    "Please check your network connection."
                ),
            )
            return
        # continue with the rest of the updates
        GLib.idle_add(self.window_main.start_available)
    def _action_done(
        self,
        action,
        authorized,
        success,
        error_string,
        error_desc,
        trans_failed=False,
    ):
        # If the progress dialog should be closed automatically afterwards
        # settings = Gio.Settings.new("com.ubuntu.update-manager")
        # close_after_install = settings.get_boolean(
        #    "autoclose-install-window")
        # FIXME: confirm with mpt whether this should still be a setting
        # close_after_install = False
        if action == self.ACTION_PRE_INSTALL and success:
            # Now do the regular updates
            self.action = self.ACTION_INSTALL
            self.start()
        elif action == self.ACTION_INSTALL:
            if (
                success
                and os.path.exists("/usr/bin/snap")
                and hasattr(self, "pane_update_progress")
            ):
                Thread(target=self.update_snaps).start()
            elif success:
                self.window_main.start_available()
            elif error_string:
                self.window_main.start_error(
                    trans_failed, error_string, error_desc
                )
            else:
                # exit gracefuly, we can't just exit as this will trigger
                # a crash if system.exit() is called in a exception handler
                GLib.timeout_add(1, self.window_main.exit)
        else:
            if error_string:
                self.window_main.start_error(True, error_string, error_desc)
            elif (
                success
                and os.path.exists("/usr/bin/snap")
                and hasattr(self, "pane_update_progress")
            ):
                self.window_main.duplicate_packages = self.get_deb2snap_dups()
                self.window_main.start_available()
            else:
                is_cancelled_update = not success
                self.window_main.start_available(is_cancelled_update)
# try aptdaemon
if (
    os.path.exists("/usr/sbin/aptd")
    and "UPDATE_MANAGER_FORCE_BACKEND_SYNAPTIC" not in os.environ
):
    # check if the gtkwidgets are installed as well
    try:
        from .InstallBackendAptdaemon import InstallBackendAptdaemon
    except ImportError:
        logging.exception("importing aptdaemon")
# try synaptic
if (
    os.path.exists("/usr/sbin/synaptic")
    and "UPDATE_MANAGER_FORCE_BACKEND_APTDAEMON" not in os.environ
):
    try:
        from .InstallBackendSynaptic import InstallBackendSynaptic
    except ImportError:
        logging.exception("importing synaptic")
def get_backend(*args, **kwargs):
    """Select and return a package manager backend."""
    # try aptdaemon
    if (
        os.path.exists("/usr/sbin/aptd")
        and "UPDATE_MANAGER_FORCE_BACKEND_SYNAPTIC" not in os.environ
    ):
        # check if the gtkwidgets are installed as well
        try:
            return InstallBackendAptdaemon(*args, **kwargs)
        except NameError:
            logging.exception("using aptdaemon failed")
    # try synaptic
    if (
        os.path.exists("/usr/sbin/synaptic")
        and "UPDATE_MANAGER_FORCE_BACKEND_APTDAEMON" not in os.environ
    ):
        try:
            return InstallBackendSynaptic(*args, **kwargs)
        except NameError:
            pass
    # nothing found, raise
    raise Exception(
        "No working backend found, please try installing "
        "aptdaemon or synaptic"
    )