Skip to content

Commit

Permalink
match demonstrator to verified backend
Browse files Browse the repository at this point in the history
  • Loading branch information
arolle committed May 17, 2024
1 parent 3426619 commit d0bc44d
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 304 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.PHONY: default
default: demo

.PHONY: run_auth run_mixnet run_webserver demo
run_auth:
Expand Down
279 changes: 175 additions & 104 deletions webdemo/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
import base64
import io
import json
Expand All @@ -18,9 +19,11 @@

mimetypes.add_type("application/wasm", ".wasm")

# Items are tuple
# (signature reference/signature, vote, is freja online)
SIGNED_VOTES = []
# list of encrypted votes
# set of user ids that have voted (also obtainable from SIGNATURES)
VOTED_IDS = set()
# dictionary assigning session id to signature reference
SIGN_REFS = dict()

def get_auth_server_url():
parsed_url = urlparse(os.getenv('AUTH_SERVER_URL'))
Expand Down Expand Up @@ -76,52 +79,24 @@ def decorated_function(*args, **kwargs):
return decorated_function


def _check_for_signed_votes():
it = len(SIGNED_VOTES) - 1
votes_for_verified_backend = []
while (it >= 0):
signed_vote = SIGNED_VOTES[it]
signature_reference, encrypted_vote, freja_online = signed_vote
if freja_online:
signature, has_signed = _confirm_if_user_has_signed(signature_reference)
if _has_user_already_voted(signature):
del SIGNED_VOTES[it]
return render_template("poll.html", data=POLL_DATA, stats=STATS, vote=None)

if signature is not None and has_signed:
modified_response_object = {
'vote': encrypted_vote,
'signature': signature,
}
_append_vote_to_ciphertexts(encrypted_vote)
_record_signature(signature)
del SIGNED_VOTES[it]
print(modified_response_object)
votes_for_verified_backend.append(modified_response_object)
else:
# `signature_reference` is signature in case of offline votes
if _has_user_already_voted(signature_reference):
flash('You have already voted')
del SIGNED_VOTES[it]
return render_template("poll.html", data=POLL_DATA, stats=STATS, vote=None)

modified_response_object = {
'vote': encrypted_vote,
'signature': signature_reference,
}
_append_vote_to_ciphertexts(encrypted_vote)
_record_signature(signature_reference)
del SIGNED_VOTES[it]
print(modified_response_object)
votes_for_verified_backend.append(modified_response_object)

it -= 1
if len(votes_for_verified_backend) == 0:
return render_template("poll.html", data=POLL_DATA, stats=STATS, vote=None)

return render_template("poll.html", data=POLL_DATA, stats=STATS, show_success=True, vote=json.dumps(votes_for_verified_backend))


def _check_for_signed_votes(request):
sign_ref = get_user_sign_ref(request)
if sign_ref is None:
# ensure cookie is set
response = make_response(render_template("poll.html", data=POLL_DATA, stats=STATS))
userno = get_or_gen_userid(request)
set_userid(response,userno)
return response
signature = _confirm_if_user_has_signed(sign_ref)
if signature is None:
flash('Waiting for signing','signing_wait')
return render_template("poll.html", data=POLL_DATA, stats=STATS, signing_waiting=True)
del_user_sign_ref(request)
flash('The encrypted vote is signed and ready for submission','msg')
return render_template("poll.html", data=POLL_DATA, stats=STATS, show_success=True, signature=signature)

# format: json array with two ByteTree elements, which each are represented as
# byte arrays (list of integers < 256)
def _append_vote_to_ciphertexts(vote):
with open(FILENAME, "a") as f:
print(vote, file=f)
Expand All @@ -132,23 +107,10 @@ def _record_signature(signature):
with open(SIGNATURES, "a") as f:
f.write(f"{signature}\n")


def _has_user_already_voted(candidate_signature):
if candidate_signature is None:
return False

if not os.path.exists(SIGNATURES):
return False

with open(SIGNATURES) as f:
current_signatures = f.readlines()

for current_signature in current_signatures:
if _get_userInfo_from_signature(current_signature) \
== _get_userInfo_from_signature(candidate_signature):
return True
return False

return _get_userInfo_from_signature(candidate_signature) in VOTED_IDS

def _get_userInfo_from_signature(signature):
jws_payload = signature.split('.')[1]
Expand All @@ -165,29 +127,103 @@ def _confirm_if_user_has_signed(sign_ref):
)

if r.status_code == 200:
return (r.json()['signature'], True)

return (None, None)
return r.json()['signature']

return None


# TODO drop exemption
@csrf.exempt
@app.route("/vote_submission", methods=["POST"])
def vote_submission():
# check if this is a vote submission
pre_vote = request.form.get("ballot")
vote = _validate_vote(pre_vote)

if pre_vote and isinstance(vote, dict):
signature = vote["signature"]
# a list of byte arrays
enc_vote_ba_lst = vote["vote"]
# json encoding of above
enc_vote_str = json.dumps(list(map((lambda x: list(bytes(x))),enc_vote_ba_lst)))
# byte array of above
enc_vote_ba = ByteTree(list(map((lambda x: ByteTree.from_byte_array(x)),enc_vote_ba_lst))).to_byte_array()
if _validate_vote_auth(enc_vote_ba,signature):
user_id = _get_userInfo_from_signature(signature)
logging.error(user_id)
has_voted = _has_user_already_voted(signature)
if not user_id:
flash('Invalid signature')
return redirect(url_for('root'))
if has_voted:
logging.error(f"User {user_id} attempted to revote")
flash('You have already voted')
return redirect(url_for('root'))
# add to store of votes
_append_vote_to_ciphertexts(enc_vote_str)
_record_signature(signature)
VOTED_IDS.add(user_id)
logging.error(f"{user_id} just voted successfully" )
# confirm submission
flash('Successful submission of encrypted vote.','success')
return redirect(url_for('root'))
logging.error(f"Error verifying encrypted vote and signature. Could not submit your vote.")
flash('Error verifying encrypted vote and signature. Could not submit your vote.','error')
return redirect(url_for('root'))

def get_userid(request):
if 'userno' in request.cookies:
return request.cookies.get('userno')
else:
return None

def get_or_gen_userid(request):
if 'userno' in request.cookies:
return request.cookies.get('userno')
return str(uuid.uuid1())

def set_userid(response,userno):
response.set_cookie('userno', userno)

def get_user_sign_ref(request):
userno = get_userid(request)
if not (userno is None) and userno in SIGN_REFS:
return SIGN_REFS[userno]
return None

def set_user_sign_ref(request,sign_ref, userno):
SIGN_REFS[userno] = sign_ref

def del_user_sign_ref(request):
userno = get_userid(request)
del SIGN_REFS[userno]

# either checking for signed votes,
# or accepting hash signing requests or vote submissions
@app.route("/", methods=("GET", "POST"))
def root():
if POLL_DATA["publicKey"] is None:
return "Missing public key!"

if request.method == "GET":
return _check_for_signed_votes()
# flash('You have already voted','error')
return _check_for_signed_votes(request)

vote = request.form.get("field")
# assume this is a signing request
vote_hash = request.form.get("field")
user_email = request.form.get('email-for-signing')
error = _validate_vote(vote)
if error:
return error

encrypted_vote = str(vote).encode('utf-8')
hashed_encryption = sha256()
hashed_encryption.update(encrypted_vote)
hex_string = hashed_encryption.digest().hex()
beautified_hex_string = ' '.join([hex_string[i:i+4] for i in range(0, len(hex_string), 4)])
error = _validate_hash256(vote_hash)
if error:
return error

# ensure cookie later
userno = get_or_gen_userid(request)
# encrypted_vote = str(vote).encode('utf-8')
# hashed_encryption = sha256()
# hashed_encryption.update(encrypted_vote)
# hex_string = hashed_encryption.digest().hex()
beautified_hex_string = ' '.join([vote_hash[i:i+4] for i in range(0, len(vote_hash), 4)])
logging.error(f"Hex-string: {beautified_hex_string}")

sign_request = requests.post(
Expand All @@ -202,29 +238,81 @@ def root():
if sign_request.status_code == 200:
response_object = sign_request.json()
signature_reference = response_object['signRef']
SIGNED_VOTES.append((signature_reference, eval(vote), True))

return render_template("poll.html", data=POLL_DATA, stats=STATS, show_success=True, hash=beautified_hex_string)

# update outstanding signing request
set_user_sign_ref(request, signature_reference, userno)
response = make_response(render_template("poll.html", data=POLL_DATA, stats=STATS, show_success=True, hash=beautified_hex_string))
set_userid(response,userno)
return response

if sign_request.status_code == 418:
flash(sign_request.json()['message'])
flash(sign_request.json()['message'],'msg')
return redirect(url_for('root'))

flash('Could not cast your vote.')
flash('Could not cast your vote.','error')
return redirect(url_for('root'))



def base64urldec(string):
padlen = 4 - len(string) % 4
return base64.urlsafe_b64decode(string + '=' * padlen)


# get hash value from signature and validate
def _validate_vote_auth(enc_vote_ba,signature):
hashed_encryption = sha256()
hashed_encryption.update(enc_vote_ba)
hash_dgst = hashed_encryption.digest()

# with open('sample-signed-vote.json') as f:
# sample_signed_vote = json.loads(f.read())

parts = signature.split('.')
if len(parts) != 3:
return "malformed signature: expect three components in signature"
jws_payload = parts[1]
try:
jws_payload_decoded = base64urldec(jws_payload)
payload_json = json.loads(jws_payload_decoded)["signatureData"]["userSignature"]
signed = payload_json.split('.')
hash_val = ''.join(base64urldec(signed[1]).decode('ascii').split(' '))
return hash_dgst.hex() == hash_val
except Exception as e:
return None


def _validate_hash256(vote_hash_str):
len_hash = len(vote_hash_str)
if len_hash != 64:
return f"Expected hash of length 64, got {len_hash}: {vote_hash_str}"
try:
x = int(vote_hash_str, 16)
except ValueError:
return f"Expected vote hash, got {vote_hash_str}"
return None


# returns dict with byte tree and signature string
def _validate_vote(vote):
try:
x = json.loads(vote)
enc_vote = ByteTree.from_byte_array(bytes.fromhex(x["vote"]))
nodes = enc_vote.dest_node()
if len(nodes) != 2:
return f"Vote format error of encrypted vote"
enc_vote = list(map((lambda x: x.to_byte_array()), nodes))
except json.JSONDecodeError:
return "JSON Decode Error"
return "Vote format error (cannot decode JSON)"
except KeyError:
return "Vote format error (missing key: vote)"

len_x = len(x)
if len_x != 2:
return f"Expected 2 elements, got {len_x}"
newdict = {k: v for k, v in x.items() if k == "signature" or k == "vote"}
if len(newdict) != 2:
return f"Vote format error (missing key: signature)"
# TODO authenticate

return None
newdict["vote"] = enc_vote
return newdict


def _delete_file(file):
Expand Down Expand Up @@ -290,23 +378,6 @@ def _is_authenticated(user_identification):

return False

@app.route("/offline_vote")
def offline_vote():
"""
Endpoint for casting a vote when FrejaEID is offline.
"""

with open(os.path.join(app.static_folder, 'sample-signed-vote.json')) as f:
sample_signed_vote = json.loads(f.read())

encrypted_vote = sample_signed_vote['vote']
signature = sample_signed_vote['signature']

SIGNED_VOTES.append((signature, encrypted_vote, False))

return redirect(url_for('root'))


@csrf.exempt
@app.route("/publicKey", methods=("GET", "POST"))
def publickey():
Expand Down
5 changes: 5 additions & 0 deletions webdemo/bytetree.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def is_node(self) -> bool:
def is_leaf(self) -> bool:
return self.type == ByteTree.LEAF

def dest_node(self) -> ["ByteTree"]:
if not self.is_node:
return []
return self.value

@classmethod
def from_byte_array(cls, source: ByteString) -> "ByteTree":
"""
Expand Down
Loading

0 comments on commit d0bc44d

Please sign in to comment.