#!/usr/bin/env python # # Copyright 2021 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ `fsverity_metadata_generator` generates fsverity metadata and signature to a container file This actually is a simple wrapper around the `fsverity` program. A file is signed by the program which produces the PKCS#7 signature file, merkle tree file , and the fsverity_descriptor file. Then the files are packed into a single output file so that the information about the signing stays together. Currently, the output of this script is used by `fd_server` which is the host- side backend of an authfs filesystem. `fd_server` uses this file in case when the underlying filesystem (ext4, etc.) on the device doesn't support the fsverity feature natively in which case the information is read directly from the filesystem using ioctl. """ import argparse import os import re import shutil import subprocess import sys import tempfile from struct import * class TempDirectory(object): def __enter__(self): self.name = tempfile.mkdtemp() return self.name def __exit__(self, *unused): shutil.rmtree(self.name) class FSVerityMetadataGenerator: def __init__(self, fsverity_path): self._fsverity_path = fsverity_path # Default values for some properties self.set_hash_alg("sha256") self.set_signature('none') def set_key_format(self, key_format): self._key_format = key_format def set_key(self, key): self._key = key def set_cert(self, cert): self._cert = cert def set_hash_alg(self, hash_alg): self._hash_alg = hash_alg def set_signature(self, signature): self._signature = signature def _raw_signature(pkcs7_sig_file): """ Extracts raw signature from DER formatted PKCS#7 detached signature file Do that by parsing the ASN.1 tree to get the location of the signature in the file and then read the portion. """ # Note: there seems to be no public python API (even in 3p modules) that # provides direct access to the raw signature at this moment. So, `openssl # asn1parse` commandline tool is used instead. cmd = ['openssl', 'asn1parse'] cmd.extend(['-inform', 'DER']) cmd.extend(['-in', pkcs7_sig_file]) out = subprocess.check_output(cmd, universal_newlines=True) # The signature is the last element in the tree last_line = out.splitlines()[-1] m = re.search('(\d+):.*hl=\s*(\d+)\s*l=\s*(\d+)\s*.*OCTET STRING', last_line) if not m: raise RuntimeError("Failed to parse asn1parse output: " + out) offset = int(m.group(1)) header_len = int(m.group(2)) size = int(m.group(3)) with open(pkcs7_sig_file, 'rb') as f: f.seek(offset + header_len) return f.read(size) def digest(self, input_file): cmd = [self._fsverity_path, 'digest', input_file] cmd.extend(['--compact']) cmd.extend(['--hash-alg', self._hash_alg]) out = subprocess.check_output(cmd, universal_newlines=True).strip() return bytes(bytearray.fromhex(out)) def generate(self, input_file, output_file=None): if self._signature != 'none': if not self._key: raise RuntimeError("key must be specified.") if not self._cert: raise RuntimeError("cert must be specified.") if not output_file: output_file = input_file + '.fsv_meta' with TempDirectory() as temp_dir: self._do_generate(input_file, output_file, temp_dir) def _do_generate(self, input_file, output_file, work_dir): # temporary files desc_file = os.path.join(work_dir, 'desc') merkletree_file = os.path.join(work_dir, 'merkletree') sig_file = os.path.join(work_dir, 'signature') # run the fsverity util to create the temporary files cmd = [self._fsverity_path] if self._signature == 'none': cmd.append('digest') cmd.append(input_file) else: cmd.append('sign') cmd.append(input_file) cmd.append(sig_file) # If key is DER, convert DER private key to PEM if self._key_format == 'der': pem_key = os.path.join(work_dir, 'key.pem') key_cmd = ['openssl', 'pkcs8'] key_cmd.extend(['-inform', 'DER']) key_cmd.extend(['-in', self._key]) key_cmd.extend(['-nocrypt']) key_cmd.extend(['-out', pem_key]) subprocess.check_call(key_cmd) else: pem_key = self._key cmd.extend(['--key', pem_key]) cmd.extend(['--cert', self._cert]) cmd.extend(['--hash-alg', self._hash_alg]) cmd.extend(['--block-size', '4096']) cmd.extend(['--out-merkle-tree', merkletree_file]) cmd.extend(['--out-descriptor', desc_file]) subprocess.check_call(cmd, stdout=open(os.devnull, 'w')) with open(output_file, 'wb') as out: # 1. version out.write(pack('