-
Notifications
You must be signed in to change notification settings - Fork 29
/
cosign.py
352 lines (293 loc) · 12.1 KB
/
cosign.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
#
# SPDX-License-Identifier: Apache-2.0
import enum
import hashlib
import json
import logging
import ci.log
import ci.util
import oci.model as om
import oci.client as oc
ci.log.configure_default_logging()
logger = logging.getLogger(__name__)
_no_overwrite_sentinel = object()
class OnExist(enum.StrEnum):
SKIP = 'skip'
APPEND = 'append'
OVERWRITE = 'overwrite'
def payload_bytes(
image_reference: om.OciImageReference | str,
annotations: dict | None=None,
overwrite_docker_reference: str=_no_overwrite_sentinel,
) -> bytes:
'''
returns payload for given OCI Image Reference + optional annotations as output by
`cosign generate`, except `docker-reference` can be overwritten to allow replication of
artefacts as well as their signature artefacts (digest has to stay the same)
Passed image-reference must have digest-tag.
'''
image_reference = om.OciImageReference.to_image_ref(image_reference)
if not image_reference.has_digest_tag:
raise ValueError('image-reference must have digest tag', image_reference)
if overwrite_docker_reference is _no_overwrite_sentinel:
docker_reference = image_reference.ref_without_tag
else:
docker_reference = overwrite_docker_reference
payload = {
'critical': {
'identity': {
'docker-reference': docker_reference,
},
'image': {
'docker-manifest-digest': image_reference.tag,
},
'type': 'gardener.vnd/oci/cosign-signature',
},
'optional': annotations,
}
return json.dumps(
obj=payload,
separators=(',', ':'),
sort_keys=True,
).encode('utf-8')
def cfg_blob_bytes(
payload_digest: str,
) -> bytes:
'''
returns a dummy cfg-blob as generated by cosign
'''
cfg_blob = {
'architecture': '',
'config': {},
'created': '0001-01-01T00:00:00Z',
'history': [{'created': '0001-01-01T00:00:00Z'}],
'os': '',
'rootfs': {
'diff_ids': [payload_digest],
'type': 'layers',
},
}
return json.dumps(
obj=cfg_blob,
separators=(',', ':'),
sort_keys=True,
).encode('utf-8')
def default_signature_image_reference(
image_ref: str | om.OciImageReference,
) -> om.OciImageReference:
'''
calculate the (default) image reference of the cosign signature for a specific image.
This image-reference is by default used/expected by cosign if no alternative signature
repository is specified.
'''
parsed_image_ref = om.OciImageReference.to_image_ref(image_ref)
if not parsed_image_ref.has_digest_tag:
ValueError('only images that are referenced via digest are allowed')
parsed_digest = parsed_image_ref.parsed_digest_tag
alg, val = parsed_digest
cosign_sig_ref = f'{parsed_image_ref.ref_without_tag}:{alg}-{val}.sig'
return om.OciImageReference(cosign_sig_ref)
def image_reference_from_signature_reference(
signature_image_reference: str | om.OciImageReference,
) -> om.OciImageReference:
signature_image_reference = om.OciImageReference.to_image_ref(signature_image_reference)
# "sha256-<image-digest>.sig" -> "<image-digest>"
image_digest = signature_image_reference.tag.split('-')[-1].split('.')[0]
image_reference = f'{signature_image_reference.ref_without_tag}@sha256:{image_digest}'
return om.OciImageReference.to_image_ref(image_reference)
'''
annotation name used to store public-key along cosign signatures within cosign signature artefacts
storing public-key in addition to signature is a preparation for using signature algorithms that
yield different signatures even if signing the same content using same private key (e.g. RSSA-PSS)
'''
_public_key_annotation_name = 'gardener.cloud/cosign-public-key'
_signing_algorithm_annotation_name = 'gardener.cloud/cosign-signing-algorithm'
'''
annotation name used by cosign to store signatures for referenced layer-blob
'''
_cosign_signature_annotation_name = 'dev.cosignproject.cosign/signature'
def sign_manifest(
manifest: om.OciImageManifest,
payload_size: int,
payload_digest: str,
signature: str,
signing_algorithm: str=None,
public_key: str=None,
on_exist: OnExist | str=OnExist.APPEND,
) -> om.OciImageManifest:
on_exist = OnExist(on_exist)
if on_exist is OnExist.SKIP and manifest.layers:
logger.info('manifest already contains a signature - skipping')
return manifest
if on_exist is OnExist.APPEND:
for layer_idx, layer in enumerate(manifest.layers):
a = layer.annotations
existing_signature = a.get(_cosign_signature_annotation_name, None)
# todo: ideally, signatures should be parsed and compared independent of format
# cosign seems to expect base64-part w/o PEM-headers (BEGIN/END), though
if existing_signature == signature:
logger.info(f'found signature in {layer_idx=}')
logger.info('skipping (will not redundantly add signature again)')
return manifest
existing_public_key = a.get(_public_key_annotation_name, None)
if not existing_public_key or not public_key:
continue
existing_signing_algorithm = a.get(_signing_algorithm_annotation_name, None)
if (
signing_algorithm
and existing_signing_algorithm
and signing_algorithm != existing_signing_algorithm
):
# we found an existing signature, however with different signing algorithm
# -> do not skip, as resigning with different algorithm is likely to be
# caller's intent
continue
if payload_digest != layer.digest:
# the just calculated payload digest is different from the one of the existing
# signature, so don't skip as resigning with different payload is likely to be
# callers intent
continue
if existing_public_key == public_key:
logger.info(f'found matching public key in {layer_idx=}')
logger.info('skipping (will not redundantly add signature again)')
return manifest
# if this line is reached, we did not find the signature we are about to append
signature_layer = om.OciBlobRef(
digest=payload_digest,
size=payload_size,
mediaType='application/vnd.dev.cosign.simplesigning.v1+json',
annotations={
_cosign_signature_annotation_name: signature,
},
)
if public_key:
signature_layer.annotations[_public_key_annotation_name] = public_key
if signing_algorithm:
signature_layer.annotations[_signing_algorithm_annotation_name] = signing_algorithm
if on_exist is OnExist.APPEND:
manifest.layers.append(signature_layer)
else:
manifest.layers = [
signature_layer,
]
return manifest
def sign_image(
image_reference: om.OciImageReference | str,
signature: str,
signing_algorithm: str=None,
public_key: str=None,
on_exist: OnExist | str=OnExist.APPEND,
signature_image_reference: om.OciImageReference | str=None,
oci_client: oc.Client=None,
payload: bytes | None=None,
annotations: dict[str, str] | None=None,
) -> tuple[str | om.OciImageReference, om.OciImageManifest]:
'''
creates an OCI Image signature as understood by cosign. if passed, public-key is added
as additional annotation `gardener.cloud/cosign-public-key`
If on_exist is set to `append`, existing signatures (dev.cosignproject/signature annotation)
will be inspected. If given signature is already present, it will not be added again, thus
making this function idempotent in this mode.
In addition, If public-key is passed, existing annotations bearing
public-key (gardener.cloud/cosign-public-key) will be compared to passed
public_key and signature algorithm (stored in annotation
gardener.cloud/cosign-signing-algorithm). If present, given signature will
not be added, even if it differs from existing one. This behaviour is a
preparation for different signature methods yielding different signatures
even if private key and signed payload did not change (such as RSSA-PSS).
'''
on_exist = OnExist(on_exist)
if not signature_image_reference:
signature_image_reference = default_signature_image_reference(image_reference)
else:
signature_image_reference = om.OciImageReference.to_image_ref(signature_image_reference)
if not oci_client:
import ccc.oci
oci_client = ccc.oci.oci_client()
image_reference = oci_client.to_digest_hash(image_reference)
if on_exist in (OnExist.SKIP, OnExist.APPEND):
exists = bool(oci_client.head_manifest(
image_reference=signature_image_reference,
absent_ok=True,
))
if on_exist is OnExist.SKIP and exists:
logger.info(f'signature artefact exists: {signature_image_reference} - skipping')
return
if exists:
manifest = oci_client.manifest(
image_reference=signature_image_reference,
)
else:
manifest = None
if not payload:
# payload is normalised JSON w/ reference to signed image. It is expected as (only)
# layer-blob for signature artefact
payload = payload_bytes(
image_reference=image_reference,
)
payload_size = len(payload)
payload_digest = f'sha256:{hashlib.sha256(payload).hexdigest()}'
if not manifest or not any(l.digest == payload_digest for l in manifest.layers):
oci_client.put_blob(
image_reference=signature_image_reference,
digest=payload_digest,
octets_count=payload_size,
data=payload,
)
cfg_blob = cfg_blob_bytes(
payload_digest=payload_digest,
)
cfg_blob_size = len(cfg_blob)
cfg_blob_digest = f'sha256:{hashlib.sha256(cfg_blob).hexdigest()}'
if not manifest or manifest.config.digest != cfg_blob_digest:
oci_client.put_blob(
image_reference=image_reference,
digest=cfg_blob_digest,
octets_count=cfg_blob_size,
data=cfg_blob,
)
if manifest:
# update manifest's cfg blob to the one we just created, it may be different because
# we used to create those cfg blobs using the cosign command itself (which has slightly
# different semantics) in the past or because the contained payload digest is different
manifest.config = om.OciBlobRef(
digest=cfg_blob_digest,
mediaType='application/vnd.oci.image.config.v1+json',
size=cfg_blob_size,
)
if annotations:
manifest.annotations = manifest.annotations | annotations
else:
manifest = om.OciImageManifest(
config=om.OciBlobRef(
digest=cfg_blob_digest,
mediaType='application/vnd.oci.image.config.v1+json',
size=cfg_blob_size,
),
mediaType='application/vnd.oci.image.manifest.v1+json',
layers=[],
annotations=annotations or {},
)
signed_manifest = sign_manifest(
manifest=manifest,
payload_size=payload_size,
payload_digest=payload_digest,
signature=signature,
signing_algorithm=signing_algorithm,
public_key=public_key,
on_exist=on_exist,
)
manifest_bytes = json.dumps(signed_manifest.as_dict()).encode('utf-8')
manifest_digest = f'sha256:{hashlib.sha256(manifest_bytes).hexdigest()}'
manifest_digest_ref = f'{signature_image_reference.ref_without_tag}@{manifest_digest}'
# re-uploading manifest with same digest is a no-op
if not oci_client.head_manifest(
image_reference=manifest_digest_ref,
absent_ok=True,
):
oci_client.put_manifest(
image_reference=signature_image_reference,
manifest=manifest_bytes,
)
return signature_image_reference, signed_manifest