File: //proc/thread-self/root/usr/share/unattended-upgrades/unattended-upgrade-shutdown
#!/usr/bin/python3
# Copyright (c) 2009-2018 Canonical Ltd
#
# AUTHOR:
# Michael Vogt <mvo@ubuntu.com>
# Balint Reczey <rbalint@ubuntu.com>
#
# unattended-upgrade-shutdown - helper that checks if a
# unattended-upgrade is in progress and waits until it exists
#
# This file is part of unattended-upgrades
#
# unattended-upgrades is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or (at
# your option) any later version.
#
# unattended-upgrades is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with unattended-upgrades; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
#
import copy
import dbus
import signal
import sys
import time
import logging
import logging.handlers
import gettext
import subprocess
import os.path
# for dbus signal handling
try:
    from dbus.mainloop.glib import DBusGMainLoop
    from gi.repository import GLib
except ImportError:
    pass
from optparse import OptionParser, Values
Values  # pyflakes
from gettext import gettext as _
from threading import Event
try:
    import apt_pkg
except Exception:
    # if there is no python-apt no unattended-upgrades can run so not
    # need to stop the shutdown
    logging.exception("importing of apt_pkg failed, exiting")
    sys.exit(0)
def do_usplash(msg):
    # type: (str) -> None
    if os.path.exists("/sbin/usplash_write"):
        logging.debug("Running usplash_write")
        subprocess.call(["/sbin/usplash_write", "TEXT", msg])
        subprocess.call(["/sbin/usplash_write", "PULSATE"])
def do_plymouth(msg):
    # type: (str) -> None
    if os.path.exists("/bin/plymouth"):
        for line in msg.split("\n"):
            logging.debug("Running plymouth --text")
            subprocess.call(["/bin/plymouth", "message", "--text", line])
def log_msg(msg, level=logging.WARN):
    # type: (str, int) -> None
    """ helper that will print msg to usplash, plymouth, console """
    logging.log(level, msg)
    do_plymouth(msg)
    do_usplash(msg)
def log_progress():
    # type: () -> None
    """ helper to log the install progress (if any) """
    # wait a some seconds and try again
    msg = _("Unattended-upgrade in progress during shutdown, "
            "please don't turn off the computer")
    # progress info
    progress = "/var/run/unattended-upgrades.progress"
    if os.path.exists(progress):
        msg += "\n" + open(progress).read()
    # log it
    log_msg(msg)
def signal_stop_unattended_upgrade():
    """ send SIGTERM to running unattended-upgrade if there is any """
    pidfile = "/var/run/unattended-upgrades.pid"
    if os.path.exists(pidfile):
        pid = int(open(pidfile).read())
        logging.debug("found running unattended-upgrades pid %s" % pid)
        try:
            os.kill(pid, signal.SIGTERM)
        except ProcessLookupError:
            logging.debug("sending SIGTERM failed because unattended-upgrades "
                          "already stopped")
def exit_log_result(success):
    if success:
        log_msg(_("All upgrades installed"), logging.INFO)
        sys.exit(0)
    else:
        log_msg(_("Unattended-upgrades stopped. There may be upgrades"
                  " left to be installed in the next run."), logging.INFO)
        sys.exit(1)
class UnattendedUpgradesShutdown():
    def __init__(self, options):
        # type: (Values) -> None
        self.options = options
        self.max_delay = options.delay * 60
        self.iter_timer_set = False
        self.apt_pkg_reinit_done = None
        self.shutdown_pending = False
        self.on_shutdown_mode = None
        self.on_shutdown_mode_uu_proc = None
        self.start_time = None
        self.lock_was_taken = False
        self.signal_sent = False
        self.stop_signal_received = Event()
        try:
            hasattr(GLib, "MainLoop")
            DBusGMainLoop(set_as_default=True)
        except NameError:
            pass
        try:
            self.inhibit_lock = self.get_inhibit_shutdown_lock()
        except dbus.exceptions.DBusException:
            logging.warning("Could not get delay inhibitor lock")
            self.inhibit_lock = None
        self.logind_proxy = None
        self.wait_period = min(3, self.get_inhibit_max_delay() / 3)
        self.preparing_for_shutdown = False
    def get_logind_proxy(self):
        """ Get logind dbus proxy object """
        if not self.logind_proxy:
            bus = dbus.SystemBus()
            if self.inhibit_lock is None:
                # try to get inhibit_lock or throw exception quickly when
                # logind is down
                self.inhibit_lock = self.get_inhibit_shutdown_lock()
            self.logind_proxy = bus.get_object(
                'org.freedesktop.login1', '/org/freedesktop/login1')
        return self.logind_proxy
    def get_inhibit_shutdown_lock(self):
        """ Take delay inhibitor lock """
        bus = dbus.SystemBus()
        return bus.call_blocking(
            'org.freedesktop.login1', '/org/freedesktop/login1',
            'org.freedesktop.login1.Manager', 'Inhibit', 'ssss',
            ('shutdown', 'Unattended Upgrades Shutdown',
             _('Stop ongoing upgrades or perform upgrades before shutdown'),
             'delay'), timeout=2.0)
    def get_inhibit_max_delay(self):
        try:
            logind_proxy = self.get_logind_proxy()
            getter_interface = dbus.Interface(
                logind_proxy,
                dbus_interface='org.freedesktop.DBus.Properties')
            return (getter_interface.Get(
                "org.freedesktop.login1.Manager", "InhibitDelayMaxUSec")
                    / (1000 * 1000))
        except dbus.exceptions.DBusException:
            return 3
    def is_preparing_for_shutdown(self):
        if not self.shutdown_pending:
            try:
                logind_proxy = self.get_logind_proxy()
                getter_interface = dbus.Interface(
                    logind_proxy,
                    dbus_interface='org.freedesktop.DBus.Properties')
                self.shutdown_pending = getter_interface.Get(
                    "org.freedesktop.login1.Manager", "PreparingForShutdown")
            except dbus.exceptions.DBusException:
                return False
        return self.shutdown_pending
    def start_iterations(self):
        if not self.iter_timer_set:
            try:
                GLib.timeout_add(self.wait_period * 1000, self.iter)
                # schedule first iteration immediately
                GLib.timeout_add(0, lambda: self.iter() and False)
            except NameError:
                pass
    def run_polling(self, signal_handler):
        logging.warning(
            _("Unable to monitor PrepareForShutdown() signal, polling "
              "instead."))
        logging.warning(
            _("To enable monitoring the PrepareForShutdown() signal "
              "instead of polling please install the python3-gi package"))
        signal.signal(signal.SIGTERM, signal_handler)
        signal.signal(signal.SIGHUP, signal_handler)
        # poll for PrepareForShutdown then run final iterations
        if self.options.wait_for_signal:
            logging.debug("Waiting for signal to start operation ")
            while (not self.stop_signal_received.is_set()
                   and not self.is_preparing_for_shutdown()):
                self.stop_signal_received.wait(self.wait_period)
        else:
            logging.debug("Skip waiting for signals, starting operation "
                          "now")
        while not self.iter():
            # TODO iter on sigterm and sighup, too
            time.sleep(self.wait_period)
    def run(self):
        """ delay shutdown and wait for PrepareForShutdown or other signals"""
        # set signal handlers
        def signal_handler(signum, frame):
            # type: (object) -> None
            logging.warning(
                "SIGTERM or SIGHUP received, stopping unattended-upgrades "
                "only if it is running")
            self.stop_signal_received.set()
            self.start_iterations()
        # fall back to polling without GLib
        try:
            hasattr(GLib, "MainLoop")
        except NameError:
            self.run_polling(signal_handler)
            return
        for sig in (signal.SIGTERM, signal.SIGHUP):
            GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, sig,
                                 signal_handler, None, None)
        if self.options.wait_for_signal:
            def prepare_for_shutdown_handler(active):
                """ Handle PrepareForShutdown() """
                if not active:
                    logging.warning("PrepareForShutdown(false) received, "
                                    "this should not happen")
                # PrepareForShutdown arrived, starting final iterations
                self.start_iterations()
            try:
                self.get_logind_proxy().connect_to_signal(
                    "PrepareForShutdown", prepare_for_shutdown_handler)
            except dbus.exceptions.DBusException:
                logging.warning(
                    _("Unable to monitor PrepareForShutdown() signal, polling "
                      "instead."))
                logging.warning(
                    _("Maybe systemd-logind service is not running."))
                self.run_polling(signal_handler)
                return
            logging.debug("Waiting for signal to start operation ")
        else:
            # starting final iterations immediately
            logging.debug("Skip waiting for signals, starting operation "
                          "now")
            self.start_iterations()
        GLib.MainLoop().run()
    def try_iter_on_shutdown(self):
        # check if we need to run unattended-upgrades on shutdown and if
        # so, run it
        try:
            if self.apt_pkg_reinit_done is None:
                logging.debug("Initializing apt_pkg configuration")
                apt_pkg.init_config()
                self.apt_pkg_reinit_done = True
        except apt_pkg.Error as error:
            # apt may be in a transient state due to unattended-upgrades
            # running, thus assuming non shutdown mode is reasonable
            logging.error(_("Apt returned an error thus shutdown mode is "
                            "disabled"))
            logging.error(_("error message: '%s'"), error)
            self.apt_pkg_reinit_done = False
        if self.on_shutdown_mode is None:
            self.on_shutdown_mode = (
                not self.options.stop_only
                and not self.stop_signal_received.is_set()
                and self.apt_pkg_reinit_done
                and apt_pkg.config.find_b(
                    "Unattended-Upgrade::InstallOnShutdown", False))
            if self.on_shutdown_mode:
                env = copy.copy(os.environ)
                env["UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN"] = "1"
                logging.debug("starting unattended-upgrades in shutdown mode")
                self.on_shutdown_mode_uu_proc = subprocess.Popen(
                    ["unattended-upgrade"], env=env)
                log_msg(_("Running unattended-upgrades in shutdown mode"))
                # run u-u, but switch to stopping when receiving stop signal
                # because it means shutdown progressed despite holding the lock
        if self.on_shutdown_mode:
            log_progress()
            if self.on_shutdown_mode_uu_proc.poll() is not None:
                # unattended-upgrades stopped on its own
                exit_log_result(True)
            else:
                return True
        return False
    def iter(self):
        if self.start_time is None:
            self.start_time = time.time()
            logging.debug("Starting countdown of %s minutes",
                          self.max_delay / 60)
        else:
            if (time.time() - self.start_time) > self.max_delay:
                logging.warning(_(
                    "Giving up on lockfile after %s minutes of delay"),
                                self.max_delay / 60)
                sys.exit(1)
        if not self.stop_signal_received.is_set():
            if self.try_iter_on_shutdown():
                return True
        # run monitoring and keep "UI" updated
        res = apt_pkg.get_lock(self.options.lock_file)
        logging.debug("get_lock returned %i" % res)
        # exit here if there is no lock
        if res > 0:
            logging.debug("lock not taken")
            if self.lock_was_taken:
                exit_log_result(self.signal_sent)
            else:
                sys.exit(0)
        self.lock_was_taken = True
        signal_stop_unattended_upgrade()
        self.signal_sent = True
        # show log
        log_progress()
        return True
def main():
    # setup gettext
    localesApp = "unattended-upgrades"
    localesDir = "/usr/share/locale"
    gettext.bindtextdomain(localesApp, localesDir)
    gettext.textdomain(localesApp)
    # use a normal logfile instead of syslog too as on shutdown its too
    # easy to get syslog killed
    logdir = "/var/log/unattended-upgrades/"
    try:
        apt_pkg.init_config()
        logdir = apt_pkg.config.find_dir(
            "Unattended-Upgrade::LogDir", logdir)
    except apt_pkg.Error as error:
        logging.error(_("Apt returned an error when loading configuration, "
                        "using default values"))
        logging.error(_("error message: '%s'"), error)
    parser = OptionParser()
    parser.add_option("", "--debug",
                      action="store_true", dest="debug",
                      default=apt_pkg.config.find_b(
                          "Unattended-Upgrade::Debug", False),
                      help="print debug messages")
    parser.add_option("", "--delay", default=25, type="int",
                      help="delay in minutes to wait for unattended-upgrades")
    parser.add_option("", "--lock-file",
                      default="/var/run/unattended-upgrades.lock",
                      help="lock file location")
    parser.add_option("", "--stop-only",
                      action="store_true", dest="stop_only", default=False,
                      help="only stop running unattended-upgrades, don't "
                      "start it even when "
                      "Unattended-Upgrade::InstallOnShutdown is true")
    parser.add_option("", "--wait-for-signal",
                      action="store_true", dest="wait_for_signal",
                      default=False,
                      help="wait for TERM signal before starting operation")
    (options, args) = parser.parse_args()
    # setup logging
    level = logging.INFO
    if options.debug:
        level = logging.DEBUG
    if not os.path.exists(logdir):
        os.makedirs(logdir)
    logfile = os.path.join(logdir, "unattended-upgrades-shutdown.log")
    logging.basicConfig(filename=logfile,
                        level=level,
                        format="%(asctime)s %(levelname)s - %(message)s")
    UnattendedUpgradesShutdown(options).run()
if __name__ == "__main__":
    main()