HEX
Server: Apache/2.4.58 (Ubuntu)
System: Linux ns3133907 6.8.0-86-generic #87-Ubuntu SMP PREEMPT_DYNAMIC Mon Sep 22 18:03:36 UTC 2025 x86_64
User: cssnetorguk (1024)
PHP: 8.2.28
Disabled: NONE
Upload Files
File: //proc/thread-self/root/usr/bin/add-apt-repository
#!/usr/bin/python3

import apt_pkg
import io
import os
import re
import sys
import gettext
import locale
import argparse
import subprocess

from softwareproperties.shortcuthandler import ShortcutException
from softwareproperties.shortcuts import shortcut_handler
from softwareproperties.ppa import PPAShortcutHandler
from softwareproperties.cloudarchive import CloudArchiveShortcutHandler
from softwareproperties.sourceslist import SourcesListShortcutHandler
from softwareproperties.uri import URIShortcutHandler
from softwareproperties.sourceutils import *


from aptsources.sourceslist import SourcesList, SourceEntry
from aptsources.distro import get_distro
from copy import copy
from gettext import gettext as _


apt_pkg.init()

SOURCESLIST = apt_pkg.config.find_file("Dir::Etc::sourcelist")


class AddAptRepository(object):
    def __init__(self):
        gettext.textdomain("software-properties")
        self.distro = get_distro()
        self.sourceslist = SourcesList()
        self.distro.get_sources(self.sourceslist)

    def parse_args(self, args):
        description = "Only ONE of -P, -C, -U, -S, or old-style 'line' can be specified"

        parser = argparse.ArgumentParser(description=description)
        parser.add_argument("-d", "--debug", action="store_true",
                            help=_("Print debug"))
        parser.add_argument("-r", "--remove", action="store_true",
                            help=_("Disable repository"))
        parser.add_argument("-s", "--enable-source", action="count", default=0,
                            help=_("Allow downloading of the source packages from the repository"))
        parser.add_argument("-c", "--component", action="append", default=[],
                            help=_("Components to use with the repository"))
        parser.add_argument("-p", "--pocket",
                            help=_("Add entry for this pocket"))
        parser.add_argument("-y", "--yes", action="store_true",
                            help=_("Assume yes to all queries"))
        parser.add_argument("-n", "--no-update", dest="update", action="store_false",
                            help=_("Do not update package cache after adding"))
        parser.add_argument("-u", "--update", action="store_true", default=True,
                            help=argparse.SUPPRESS)
        parser.add_argument("-l", "--login", action="store_true",
                            help=_("Login to Launchpad."))
        parser.add_argument("--dry-run", action="store_true",
                            help=_("Don't actually make any changes."))

        group = parser.add_mutually_exclusive_group()
        group.add_argument("-L", "--list", action="store_true",
                           help=_("List currently configured repositories"))
        group.add_argument("-P", "--ppa",
                           help=_("PPA to add"))
        group.add_argument("-C", "--cloud",
                           help=_("Cloud Archive to add"))
        group.add_argument("-U", "--uri",
                           help=_("Archive URI to add"))
        group.add_argument("-S", "--sourceslist", nargs='+', default=[],
                           help=_("Full sources.list entry line to add"))
        group.add_argument("line", nargs='*', default=[],
                           help=_("sources.list line to add (deprecated)"))

        self.parser = parser
        self.options = self.parser.parse_args(args)

    @property
    def dry_run(self):
        return self.options.dry_run

    @property
    def enable_source(self):
        return self.options.enable_source > 0

    @property
    def components(self):
        return self.options.component

    @property
    def pocket(self):
        if self.options.pocket:
            return self.options.pocket.lower()
        return None

    @property
    def source_type(self):
        return self.distro.source_type

    @property
    def binary_type(self):
        return self.distro.binary_type

    def is_components(self, comps):
        if not comps:
            return False
        return set(comps.split()) <= set([comp.name for comp in self.distro.source_template.components])

    def apt_update(self):
        if self.options.update and not self.dry_run:
            # We prefer to run apt-get update here. The built-in update support
            # does not have any progress, and only works for shortcuts. Moving
            # it to something like save() and using apt.progress.text would
            # solve the problem, but the new errors might cause problems with
            # the dbus server or other users of the API. Also, it's unclear
            # how good the text progress is or how to pass it best.
            subprocess.run(['apt-get', 'update'])

    def prompt_user(self):
        if self.dry_run:
            print(_("DRY-RUN mode: no modifications will be made"))
            return
        if not self.options.yes and sys.stdin.isatty() and not "FORCE_ADD_APT_REPOSITORY" in os.environ:
            try:
                input(_("Press [ENTER] to continue or Ctrl-c to cancel."))
            except KeyboardInterrupt:
                print(_("Aborted."))
                sys.exit(1)

    def prompt_user_shortcut(self, shortcut):
        '''Display more information about the shortcut / ppa info'''
        print(_("Repository: '%s'") % shortcut.SourceEntry().line)
        if shortcut.description:
            # strip ANSI escape sequences
            description = re.sub(r"(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]",
                                 "", shortcut.description)
            print(_("Description:"))
            print(description)
        if shortcut.web_link:
            print(_("More info: %s") % shortcut.web_link)
        if self.options.remove:
            print(_("Removing repository."))
        else:
            print(_("Adding repository."))
        self.prompt_user()

    def _add_components(self, sources):
        components = {}
        for entry in sources:
            key = (entry.uri, entry.dist, entry.type)
            if key in components:
                components[key] += entry.comps
            else:
                components[key] = copy(entry.comps)

        for (uri, dist, type), comps in components.items():
            added = set(self.components) - set(comps)
            if added:
                comps = list(set(comps) | set(self.components))
                entry = self.sourceslist.add(
                    type=type,
                    dist=dist,
                    uri=uri,
                    orig_comps=comps,
                    file=SOURCESLIST
                )
                print(_("Added %s to: %s") % (' '.join(added), str(entry)))

    def _remove_components(self, sources):
        for entry in sources:
            removed = set(entry.comps) & set(self.components)
            if removed:
                entry.comps = list(set(entry.comps) - set(self.components))
                print(_("Removed %s from: %s") % (' '.join(removed), str(entry)))
                if not entry.comps:
                    self.sourceslist.remove(entry)

    def change_components(self):
        sources = [s for s in self.sourceslist if not s.invalid \
                   and not s.disabled and s.file == SOURCESLIST]

        if self.options.remove:
            self._remove_components(sources)
        else:
            self._add_components(sources)

        if not self.dry_run:
            self.sourceslist.save()

    def _add_pocket(self, sources):
        binary_entries = {}
        for s in sources:
            if s.invalid or s.disabled:
                continue
            if s.type != self.binary_type or s.file != SOURCESLIST:
                continue

            suite = get_source_entry_suite(s)
            if (s.uri, suite) in binary_entries:
                binary_entries[(s.uri, suite)].append(s)
            else:
                binary_entries[(s.uri, suite)] = [s]

        for (uri, suite), entries in binary_entries.items():
            pockets = []
            have_pocket = False
            have_release = False

            for e in entries:
                p = get_source_entry_pocket(e)
                if p == self.pocket:
                    print(_("Existing: %s") % str(e))
                    have_pocket = True
                elif p == 'release':
                    have_release = True
                pockets.append(p)

            if have_pocket:
                continue

            if have_release:
                comps = []
                for e in entries:
                    if get_source_entry_pocket(e) == 'release':
                        comps += e.comps
                comps = list(set(comps))
            else:
                comps = ['main', 'restricted']

            entry = sources.add(
                type=self.binary_type,
                uri=uri,
                dist='{}-{}'.format(suite, self.pocket),
                orig_comps=comps,
                file=SOURCESLIST
            )
            print(_("Adding: %s") % str(entry))

    def _remove_pocket(self, sources):
        for entry in sources:
            if entry.invalid or entry.disabled or entry.file != SOURCESLIST:
                continue

            if get_source_entry_pocket(entry) != self.pocket:
                continue

            entry.set_enabled(False)
            print(_("Disabled: %s") % str(entry))

    def change_pocket(self):
        if self.options.remove:
            self._remove_pocket(self.sourceslist)
        else:
            self._add_pocket(self.sourceslist)

        if not self.dry_run:
            self.sourceslist.save()

    def _enable_source(self):
        # Check each disabled deb-src line, enable if matching deb line exists
        for s in self.sourceslist:
            if s.invalid or not s.disabled or s.type != self.source_type:
                continue

            b_entry = None
            for b in self.sourceslist:
                if b.type != self.binary_type or b.disabled:
                    continue

                if (b.uri, b.dist, b.comps) != (s.uri, s.dist, s.comps):
                    continue

                b_entry = b
                break

            if not b_entry:
                # no matching binary lines, leave the source line disabled
                continue
            disabled_comps = list(set(s.comps) - set(b_entry.comps))
            enabled_comps = list(set(s.comps) & set(b_entry.comps))
            if not enabled_comps:
                # we can't enable any of the line
                continue
            if disabled_comps:
                index = sources.list.index(s)
                tmp = replace_source_entry(s, comps=disabled_comps)
                self.sourceslist.list.insert(index + 1, tmp)
            s.comps = enabled_comps
            s.set_enabled(True)
            print(_("Enabled: %s") % str(s).strip())

        # Check each enabled deb line, to warn about missing deb-src lines, or add one if -ss
        for b in self.sourceslist:
            if b.invalid or b.disabled or b.type != self.binary_type:
                continue

            s = replace_source_entry(b, type=self.source_type)
            s_entry = get_source_entry_from_list(self.sourceslist, s)

            scomps = set(s_entry.comps if s_entry else [])
            missing_comps = list(set(b.comps) - scomps)
            if not missing_comps:
                continue
            s.comps = missing_comps
            if self.options.enable_source > 1:
                # with multiple -s, add new deb-src entries if needed for all deb entries
                self.sourceslist.add(
                    type=s.type,
                    uri=s.uri,
                    dist=s.dist,
                    orig_comps=s.comps,
                    file=s.file,
                )
                print(_("Added: %s") % str(s).strip())
            else:
                # if only one -s used, don't add missing deb-src, just notify
                print(_("Warning, missing deb-src for: %s") % str(s).strip())

    def _disable_source(self):
        for s in self.sourceslist:
            if s.invalid or s.disabled or s.type != self.source_type:
                continue
            s.set_enabled(False)
            print(_("Disabled: %s") % str(s).strip())

    def change_source(self):
        if self.options.remove:
            self._disable_source()
        else:
            self._enable_source()

        if not self.dry_run:
            self.sourceslist.save()

    def global_change(self):
        if self.components:
            if self.options.remove:
                print(_("Removing component(s) '%s' from all repositories.") % ', '.join(self.components))
            else:
                print(_("Adding component(s) '%s' to all repositories.") % ', '.join(self.components))
        if self.pocket:
            if self.options.remove:
                print(_("Removing pocket %s for all repositories.") % self.pocket)
            else:
                print(_("Adding pocket %s for all repositories.") % self.pocket)
        if self.enable_source:
            if self.options.remove:
                print(_("Disabling %s for all repositories.") % self.source_type)
            else:
                print(_("Enabling %s for all repositories.") % self.source_type)
        self.prompt_user()
        if self.components:
            self.change_components()
        if self.pocket:
            self.change_pocket()
        if self.enable_source:
            self.change_source()

    def show_list(self):
        merged = {}
        for s in SourcesList(deb822=True):
            if s.invalid or s.disabled:
                continue
            if not self.enable_source and s.type == self.source_type:
                continue

            k = (s.type, s.uri, s.dist)
            if k in merged:
                merged[k].comps += list(set(s.comps) - set(merged[k].comps))
            else:
                merged[k] = s

        for s in merged.values():
            print(s, '\n')

    def main(self, args=sys.argv[1:]):
        self.parse_args(args)

        if not any((self.dry_run, self.options.list, os.geteuid() == 0)):
            print(_("Error: must run as root"))
            return False

        line = ' '.join(self.options.line)
        if line == '-':
            line = sys.stdin.readline().strip()

        # if 'line' is only (valid) components, handle as if only -c was used with no line
        if self.is_components(line):
            self.options.component += line.split()
            line = ''

        if self.options.ppa:
            source = self.options.ppa
            if not ':' in source:
                source = 'ppa:' + source
            handler = PPAShortcutHandler
        elif self.options.cloud:
            source = self.options.cloud
            if not ':' in source:
                source = 'uca:' + source
            handler = CloudArchiveShortcutHandler
        elif self.options.uri:
            source = self.options.uri
            handler = URIShortcutHandler
        elif self.options.sourceslist:
            source = ' '.join(self.options.sourceslist)
            handler = SourcesListShortcutHandler
        elif line:
            source = line
            handler = shortcut_handler
        elif self.options.list:
            self.show_list()
            return True
        elif any((self.enable_source, self.components, self.pocket)):
            self.global_change()
            self.apt_update()
            return True
        else:
            print(_("Error: no actions requested."))
            self.parser.print_help()
            return False

        try:
            shortcut_params = {
                'login': self.options.login,
                'enable_source': self.enable_source,
                'dry_run': self.dry_run,
                'components': self.components,
                'pocket': self.pocket,
            }
            shortcut = handler(source, **shortcut_params)
        except ShortcutException as e:
            print(e)
            return False

        self.prompt_user_shortcut(shortcut)

        if self.options.remove:
            shortcut.remove()
        else:
            shortcut.add()

        self.apt_update()
        return True

if __name__ == '__main__':
    addaptrepo = AddAptRepository()
    sys.exit(0 if addaptrepo.main() else 1)