Merge "Merge "Support generating partial OTAs from extracted target_files" am: c3216a321b" into stage-aosp-master

This commit is contained in:
Automerger Merge Worker 2023-05-04 00:56:02 +00:00 committed by Android (Google) Code Review
commit abb6ca7ab3
3 changed files with 135 additions and 106 deletions

View file

@ -754,6 +754,33 @@ def ReadFromInputFile(input_file, fn):
return ReadBytesFromInputFile(input_file, fn).decode()
def WriteBytesToInputFile(input_file, fn, data):
"""Write bytes |data| contents to fn of input zipfile or directory."""
if isinstance(input_file, zipfile.ZipFile):
with input_file.open(fn, "w") as entry_fp:
return entry_fp.write(data)
elif zipfile.is_zipfile(input_file):
with zipfile.ZipFile(input_file, "r", allowZip64=True) as zfp:
with zfp.open(fn, "w") as entry_fp:
return entry_fp.write(data)
else:
if not os.path.isdir(input_file):
raise ValueError(
"Invalid input_file, accepted inputs are ZipFile object, path to .zip file on disk, or path to extracted directory. Actual: " + input_file)
path = os.path.join(input_file, *fn.split("/"))
try:
with open(path, "wb") as f:
return f.write(data)
except IOError as e:
if e.errno == errno.ENOENT:
raise KeyError(fn)
def WriteToInputFile(input_file, fn, str: str):
"""Write str content to fn of input file or directory"""
return WriteBytesToInputFile(input_file, fn, str.encode())
def ExtractFromInputFile(input_file, fn):
"""Extracts the contents of fn from input zipfile or directory into a file."""
if isinstance(input_file, zipfile.ZipFile):

View file

@ -270,7 +270,7 @@ import care_map_pb2
import common
import ota_utils
from ota_utils import (UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata,
PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME, CopyTargetFilesDir)
PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME, ExtractTargetFiles, CopyTargetFilesDir)
from common import DoesInputFileContain, IsSparseImage
import target_files_diff
from check_target_files_vintf import CheckVintfIfTrebleEnabled
@ -519,15 +519,10 @@ def GetTargetFilesZipWithoutPostinstallConfig(input_file):
Returns:
The filename of target-files.zip that doesn't contain postinstall config.
"""
# We should only make a copy if postinstall_config entry exists.
with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
if POSTINSTALL_CONFIG not in input_zip.namelist():
return input_file
target_file = common.MakeTempFile(prefix="targetfiles-", suffix=".zip")
shutil.copyfile(input_file, target_file)
common.ZipDelete(target_file, POSTINSTALL_CONFIG)
return target_file
config_path = os.path.join(input_file, POSTINSTALL_CONFIG)
if os.path.exists(config_path):
os.unlink(config_path)
return input_file
def ParseInfoDict(target_file_path):
@ -544,6 +539,17 @@ def GetTargetFilesZipForCustomVABCCompression(input_file, vabc_compression_param
Returns:
The path to modified target-files.zip
"""
if os.path.isdir(input_file):
dynamic_partition_info_path = os.path.join(
input_file, "META", "dynamic_partitions_info.txt")
with open(dynamic_partition_info_path, "r") as fp:
dynamic_partition_info = fp.read()
dynamic_partition_info = ModifyVABCCompressionParam(
dynamic_partition_info, vabc_compression_param)
with open(dynamic_partition_info_path, "w") as fp:
fp.write(dynamic_partition_info)
return input_file
target_file = common.MakeTempFile(prefix="targetfiles-", suffix=".zip")
shutil.copyfile(input_file, target_file)
common.ZipDelete(target_file, DYNAMIC_PARTITION_INFO)
@ -571,23 +577,7 @@ def GetTargetFilesZipForPartialUpdates(input_file, ab_partitions):
The filename of target-files.zip used for partial ota update.
"""
def AddImageForPartition(partition_name):
"""Add the archive name for a given partition to the copy list."""
for prefix in ['IMAGES', 'RADIO']:
image_path = '{}/{}.img'.format(prefix, partition_name)
if image_path in namelist:
copy_entries.append(image_path)
map_path = '{}/{}.map'.format(prefix, partition_name)
if map_path in namelist:
copy_entries.append(map_path)
return
raise ValueError("Cannot find {} in input zipfile".format(partition_name))
with zipfile.ZipFile(input_file, allowZip64=True) as input_zip:
original_ab_partitions = input_zip.read(
AB_PARTITIONS).decode().splitlines()
namelist = input_zip.namelist()
original_ab_partitions = common.ReadFromInputFile(input_file, AB_PARTITIONS)
unrecognized_partitions = [partition for partition in ab_partitions if
partition not in original_ab_partitions]
@ -596,50 +586,65 @@ def GetTargetFilesZipForPartialUpdates(input_file, ab_partitions):
unrecognized_partitions)
logger.info("Generating partial updates for %s", ab_partitions)
for subdir in ["IMAGES", "RADIO", "PREBUILT_IMAGES"]:
image_dir = os.path.join(subdir)
if not os.path.exists(image_dir):
continue
for filename in os.listdir(image_dir):
filepath = os.path.join(image_dir, filename)
if filename.endswith(".img"):
partition_name = filename.removesuffix(".img")
if partition_name not in ab_partitions:
os.unlink(filepath)
copy_entries = ['META/update_engine_config.txt']
for partition_name in ab_partitions:
AddImageForPartition(partition_name)
common.WriteToInputFile(input_file, 'META/ab_partitions.txt',
'\n'.join(ab_partitions))
CARE_MAP_ENTRY = "META/care_map.pb"
if DoesInputFileContain(input_file, CARE_MAP_ENTRY):
caremap = care_map_pb2.CareMap()
caremap.ParseFromString(
common.ReadBytesFromInputFile(input_file, CARE_MAP_ENTRY))
filtered = [
part for part in caremap.partitions if part.name in ab_partitions]
del caremap.partitions[:]
caremap.partitions.extend(filtered)
common.WriteBytesToInputFile(input_file, CARE_MAP_ENTRY,
caremap.SerializeToString())
# Use zip2zip to avoid extracting the zipfile.
partial_target_file = common.MakeTempFile(suffix='.zip')
cmd = ['zip2zip', '-i', input_file, '-o', partial_target_file]
cmd.extend(['{}:{}'.format(name, name) for name in copy_entries])
common.RunAndCheckOutput(cmd)
for info_file in ['META/misc_info.txt', DYNAMIC_PARTITION_INFO]:
if not DoesInputFileContain(input_file, info_file):
logger.warning('Cannot find %s in input zipfile', info_file)
continue
partial_target_zip = zipfile.ZipFile(partial_target_file, 'a',
allowZip64=True)
with zipfile.ZipFile(input_file, allowZip64=True) as input_zip:
common.ZipWriteStr(partial_target_zip, 'META/ab_partitions.txt',
'\n'.join(ab_partitions))
CARE_MAP_ENTRY = "META/care_map.pb"
if CARE_MAP_ENTRY in input_zip.namelist():
caremap = care_map_pb2.CareMap()
caremap.ParseFromString(input_zip.read(CARE_MAP_ENTRY))
filtered = [
part for part in caremap.partitions if part.name in ab_partitions]
del caremap.partitions[:]
caremap.partitions.extend(filtered)
common.ZipWriteStr(partial_target_zip, CARE_MAP_ENTRY,
caremap.SerializeToString())
content = common.ReadFromInputFile(input_file, info_file)
modified_info = UpdatesInfoForSpecialUpdates(
content, lambda p: p in ab_partitions)
if OPTIONS.vabc_compression_param and info_file == DYNAMIC_PARTITION_INFO:
modified_info = ModifyVABCCompressionParam(
modified_info, OPTIONS.vabc_compression_param)
common.WriteToInputFile(input_file, info_file, modified_info)
for info_file in ['META/misc_info.txt', DYNAMIC_PARTITION_INFO]:
if info_file not in input_zip.namelist():
logger.warning('Cannot find %s in input zipfile', info_file)
continue
content = input_zip.read(info_file).decode()
modified_info = UpdatesInfoForSpecialUpdates(
content, lambda p: p in ab_partitions)
if OPTIONS.vabc_compression_param and info_file == DYNAMIC_PARTITION_INFO:
modified_info = ModifyVABCCompressionParam(
modified_info, OPTIONS.vabc_compression_param)
common.ZipWriteStr(partial_target_zip, info_file, modified_info)
def IsInPartialList(postinstall_line: str):
idx = postinstall_line.find("=")
if idx < 0:
return False
key = postinstall_line[:idx]
logger.info("%s %s", key, ab_partitions)
for part in ab_partitions:
if key.endswith("_" + part):
return True
return False
# TODO(xunchang) handle META/postinstall_config.txt'
postinstall_config = common.ReadFromInputFile(input_file, POSTINSTALL_CONFIG)
postinstall_config = [
line for line in postinstall_config.splitlines() if IsInPartialList(line)]
if postinstall_config:
postinstall_config = "\n".join(postinstall_config)
common.WriteToInputFile(input_file, POSTINSTALL_CONFIG, postinstall_config)
else:
os.unlink(os.path.join(input_file, POSTINSTALL_CONFIG))
common.ZipClose(partial_target_zip)
return partial_target_file
return input_file
def GetTargetFilesZipForRetrofitDynamicPartitions(input_file,
@ -664,21 +669,12 @@ def GetTargetFilesZipForRetrofitDynamicPartitions(input_file,
replace = {'OTA/super_{}.img'.format(dev): 'IMAGES/{}.img'.format(dev)
for dev in super_block_devices}
target_file = common.MakeTempFile(prefix="targetfiles-", suffix=".zip")
shutil.copyfile(input_file, target_file)
with zipfile.ZipFile(input_file, allowZip64=True) as input_zip:
namelist = input_zip.namelist()
input_tmp = common.UnzipTemp(input_file, RETROFIT_DAP_UNZIP_PATTERN)
# Remove partitions from META/ab_partitions.txt that is in
# dynamic_partition_list but not in super_block_devices so that
# brillo_update_payload won't generate update for those logical partitions.
ab_partitions_file = os.path.join(input_tmp, *AB_PARTITIONS.split('/'))
with open(ab_partitions_file) as f:
ab_partitions_lines = f.readlines()
ab_partitions = [line.strip() for line in ab_partitions_lines]
ab_partitions_lines = common.ReadFromInputFile(
input_file, AB_PARTITIONS).split("\n")
ab_partitions = [line.strip() for line in ab_partitions_lines]
# Assert that all super_block_devices are in ab_partitions
super_device_not_updated = [partition for partition in super_block_devices
if partition not in ab_partitions]
@ -686,15 +682,6 @@ def GetTargetFilesZipForRetrofitDynamicPartitions(input_file,
"{} is in super_block_devices but not in {}".format(
super_device_not_updated, AB_PARTITIONS)
# ab_partitions -= (dynamic_partition_list - super_block_devices)
new_ab_partitions = common.MakeTempFile(
prefix="ab_partitions", suffix=".txt")
with open(new_ab_partitions, 'w') as f:
for partition in ab_partitions:
if (partition in dynamic_partition_list and
partition not in super_block_devices):
logger.info("Dropping %s from ab_partitions.txt", partition)
continue
f.write(partition + "\n")
to_delete = [AB_PARTITIONS]
# Always skip postinstall for a retrofit update.
@ -707,24 +694,28 @@ def GetTargetFilesZipForRetrofitDynamicPartitions(input_file,
# Remove the existing partition images as well as the map files.
to_delete += list(replace.values())
to_delete += ['IMAGES/{}.map'.format(dev) for dev in super_block_devices]
common.ZipDelete(target_file, to_delete)
target_zip = zipfile.ZipFile(target_file, 'a', allowZip64=True)
for item in to_delete:
os.unlink(os.path.join(input_file, item))
# Write super_{foo}.img as {foo}.img.
for src, dst in replace.items():
assert src in namelist, \
assert DoesInputFileContain(input_file, src), \
'Missing {} in {}; {} cannot be written'.format(src, input_file, dst)
unzipped_file = os.path.join(input_tmp, *src.split('/'))
common.ZipWrite(target_zip, unzipped_file, arcname=dst)
source_path = os.path.join(input_file, *src.split("/"))
target_path = os.path.join(input_file, *dst.split("/"))
os.rename(source_path, target_path)
# Write new ab_partitions.txt file
common.ZipWrite(target_zip, new_ab_partitions, arcname=AB_PARTITIONS)
new_ab_partitions = os.paht.join(input_file, AB_PARTITIONS)
with open(new_ab_partitions, 'w') as f:
for partition in ab_partitions:
if (partition in dynamic_partition_list and
partition not in super_block_devices):
logger.info("Dropping %s from ab_partitions.txt", partition)
continue
f.write(partition + "\n")
common.ZipClose(target_zip)
return target_file
return input_file
def GetTargetFilesZipForCustomImagesUpdates(input_file, custom_images):
@ -833,14 +824,20 @@ def SupportsMainlineGkiUpdates(target_file):
return pattern.search(output) is not None
def ExtractOrCopyTargetFiles(target_file):
if os.path.isdir(target_file):
return CopyTargetFilesDir(target_file)
else:
return ExtractTargetFiles(target_file)
def GenerateAbOtaPackage(target_file, output_file, source_file=None):
"""Generates an Android OTA package that has A/B update payload."""
# If input target_files are directories, create a copy so that we can modify
# them directly
if os.path.isdir(target_file):
target_file = CopyTargetFilesDir(target_file)
if source_file is not None and os.path.isdir(source_file):
source_file = CopyTargetFilesDir(source_file)
target_file = ExtractOrCopyTargetFiles(target_file)
if source_file is not None:
source_file = ExtractOrCopyTargetFiles(source_file)
# Stage the output zip package for package signing.
if not OPTIONS.no_signing:
staging_file = common.MakeTempFile(suffix='.zip')
@ -851,7 +848,7 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None):
allowZip64=True)
if source_file is not None:
source_file = ota_utils.ExtractTargetFiles(source_file)
source_file = ExtractTargetFiles(source_file)
assert "ab_partitions" in OPTIONS.source_info_dict, \
"META/ab_partitions.txt is required for ab_update."
assert "ab_partitions" in OPTIONS.target_info_dict, \
@ -948,10 +945,10 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None):
elif OPTIONS.partial:
target_file = GetTargetFilesZipForPartialUpdates(target_file,
OPTIONS.partial)
elif OPTIONS.vabc_compression_param:
if OPTIONS.vabc_compression_param:
target_file = GetTargetFilesZipForCustomVABCCompression(
target_file, OPTIONS.vabc_compression_param)
elif OPTIONS.skip_postinstall:
if OPTIONS.skip_postinstall:
target_file = GetTargetFilesZipWithoutPostinstallConfig(target_file)
# Target_file may have been modified, reparse ab_partitions
target_info.info_dict['ab_partitions'] = common.ReadFromInputFile(target_file,

View file

@ -1047,10 +1047,15 @@ def Fnmatch(filename, pattersn):
def CopyTargetFilesDir(input_dir):
output_dir = common.MakeTempDir("target_files")
shutil.copytree(os.path.join(input_dir, "IMAGES"), os.path.join(
output_dir, "IMAGES"), dirs_exist_ok=True)
IMAGES_DIR = ["IMAGES", "PREBUILT_IMAGES", "RADIO"]
for subdir in IMAGES_DIR:
if not os.path.exists(os.path.join(input_dir, subdir)):
continue
shutil.copytree(os.path.join(input_dir, subdir), os.path.join(
output_dir, subdir), dirs_exist_ok=True, copy_function=os.link)
shutil.copytree(os.path.join(input_dir, "META"), os.path.join(
output_dir, "META"), dirs_exist_ok=True)
for (dirpath, _, filenames) in os.walk(input_dir):
for filename in filenames:
path = os.path.join(dirpath, filename)