-
Notifications
You must be signed in to change notification settings - Fork 16
/
generator.py
142 lines (116 loc) · 4.66 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Desc: Jenkins update center generator
# depend:
# yum -y install make gcc automake autoconf python3-devel
# pip install pycrypto
import os
import json
import base64
import binascii
import http.client
import urllib.request
from Crypto.Hash import SHA512, SHA
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
class JenkinsUpdateCenter:
def __init__(self):
self.updateCenterVersion = "1"
self.core = None
self.warnings = None
self.plugins = None
self.id = "default"
self.connectionCheckUrl = None
self._private_key = None
self._cert = [None]
def _sha1_digest(self, body):
digest = base64.b64encode(SHA.new(body).digest()).decode("utf-8")
return digest
def _sha512_digest(self, body):
digest = binascii.hexlify(SHA512.new(body).digest()).decode("utf-8")
return digest
def _sign(self, body, algo = "SHA-1"):
signer = PKCS1_v1_5.new(self._private_key)
if algo == "SHA-1":
digest = SHA.new()
else:
digest = SHA512.new()
digest.update(body)
try:
signature = signer.sign(digest)
except Exception as err:
raise Exception("Could not make sign. "+str(err))
return signature
def _sha1_signature(self, body):
signature = base64.b64encode(self._sign(body, "SHA-1")).decode("utf-8")
return signature
def _sha512_signature(self, body):
signature = binascii.hexlify(self._sign(body, "SHA-512")).decode("utf-8")
return signature
def load_private(self, key_path):
try:
with open(key_path, "r") as fd:
self._private_key = RSA.importKey(fd.read())
except Exception as err:
raise Exception("Could not load private key "+key_path+". "+str(err))
def load_public(self, key_path):
try:
with open(key_path, "rb") as fd:
self._cert = base64.b64encode(fd.read()).decode("utf-8")
except Exception as err:
raise Exception("Could not load public key "+key_path+". "+str(err))
def out(self, fd):
output = {}
output["updateCenterVersion"] = self.updateCenterVersion
if self.core is not None:
output["core"] = self.core
if self.warnings is not None:
output["warnings"] = self.warnings
if self.plugins is not None:
output["plugins"] = self.plugins
output["id"] = self.id
if self.connectionCheckUrl is not None:
output["connectionCheckUrl"] = self.connectionCheckUrl
payload = (json.dumps(output, separators=(",", ":"), sort_keys=True, ensure_ascii=False).encode("utf-8"))
output["signature"] = {"certificates":[self._cert]}
output["signature"]["correct_digest"] = self._sha1_digest(payload)
output["signature"]["correct_digest512"] = self._sha512_digest(payload)
output["signature"]["correct_signature"] = self._sha1_signature(payload)
output["signature"]["correct_signature512"] = self._sha512_signature(payload)
try:
fd.write("updateCenter.post(\n"+json.dumps(output, separators=(",", ":"), sort_keys=True)+"\n);")
except Exception as err:
raise Exception("Could not write output. "+str(err))
def main():
mirrors_file = "mirrors.json"
private_key = "rootCA/update-center.key"
public_key = "rootCA/update-center.crt"
original_download_url = "https://updates.jenkins.io/download/"
original_update_center_url = "https://updates.jenkins-ci.org/current/update-center.json"
original_file = urllib.request.urlopen(original_update_center_url)
try:
original_context = original_file.read()
except http.client.IncompleteRead as e:
original_context = e.partial.decode('utf-8')
original = json.loads(original_context.replace(str.encode("updateCenter.post(\n"), str.encode("")).replace(str.encode("\n);"), str.encode("")))
uc = JenkinsUpdateCenter()
uc.load_private(private_key)
uc.load_public(public_key)
uc.warnings = original["warnings"]
try:
with open(mirrors_file, "r") as fd:
mirrors_url = json.loads(fd.read())
except Exception as err:
raise Exception("Could not load mirrors " + mirrors_file +". " + str(err))
for site,mirror_url in mirrors_url.items():
print("Generate:", mirror_url)
uc.plugins = json.loads(json.dumps(original["plugins"]).replace(original_download_url, mirror_url))
uc.core = json.loads(json.dumps(original["core"]).replace(original_download_url, mirror_url))
site_path = "updates/" + site
if not os.path.exists(site_path):
os.makedirs(site_path)
with open(site_path + "/update-center.json", "w") as fd:
uc.out(fd)
if __name__ == '__main__':
main()