Skip to content

Commit

Permalink
node_backup and state_exporter roles were added
Browse files Browse the repository at this point in the history
Signed-off-by: kogeler <[email protected]>
  • Loading branch information
kogeler committed Oct 6, 2023
1 parent a470aa4 commit 8498b76
Show file tree
Hide file tree
Showing 31 changed files with 1,264 additions and 1 deletion.
18 changes: 18 additions & 0 deletions .github/workflows/pr-node-backup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: check PR (node_backup)

on:
pull_request:
paths:
- roles/node_backup/**
- .github/**

jobs:
run-molecule-tests:
strategy:
fail-fast: false
matrix:
molecule-driver: [lxd, docker]
uses: ./.github/workflows/reusable-molecule.yml
with:
role-name: node
molecule-driver: ${{ matrix.molecule-driver }}
2 changes: 1 addition & 1 deletion galaxy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace: paritytech
name: chain

# The version of the collection. Must be compatible with semantic versioning
version: 1.5.1
version: 1.6.0

# The path to the Markdown (.md) readme file. This path is relative to the root of the collection
readme: README.md
Expand Down
33 changes: 33 additions & 0 deletions roles/node_backup/.yamllint
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
# Based on ansible-lint config
extends: default

rules:
braces:
max-spaces-inside: 1
level: error
brackets:
max-spaces-inside: 1
level: error
colons:
max-spaces-after: -1
level: error
commas:
max-spaces-after: -1
level: error
comments: disable
comments-indentation: disable
document-start: disable
empty-lines:
max: 3
level: error
hyphens:
level: error
indentation: disable
key-duplicates: enable
line-length: disable
new-line-at-end-of-file: disable
new-lines:
type: unix
trailing-spaces: disable
truthy: disable
8 changes: 8 additions & 0 deletions roles/node_backup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
node_backup
=========
This role will template out the backup script and the backup Prometheus exporter. Also, it creates the relevant systemd units.<br>
The nodes that we deploy on the same instance, are normal substrate nodes that are syncing the chain.
The backup is made from the local database. These nodes don't have to do any other work other than synchronization.<br>
Nodes are stopped during the backup process of the given chain because otherwise, the database will be changing during
the backup. It corrupts the backup.
<br><br>
40 changes: 40 additions & 0 deletions roles/node_backup/defaults/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
---


# R2 configuration
node_backup_r2_access_key_id: ""
node_backup_r2_secret_access_key: ""
node_backup_r2_api_url: ""

node_backup_max_concurrent_requests: 50

node_backup_schedule:
- "*-*-* 01:00:00"

node_backup_user: "polkadot"

node_backup_base_path: "/opt/node_backup"
node_backup_tmp_path: "/tmp"

# It wipes a local cash of the node-bakcup expoter.
# It's useful if you rename or remove some backups from the 'node_backup_targets' variable
node_backup_wipe_cache_enable: false

# List of the nodes deployed to the host
# service_name - is used to extract information about db type and should be following:
# node_chain-<[paritydb|rocksdb]-[prune|archive]
# where: `node_chain` is value of `node_chain` variable from `node` role.
node_backup_targets: []
# - service_name: polkadot-rocksdb-prune
# local_path: /opt/polkadot-rocksdb-prune/chains/polkadot/db
# rpc_port: 9934
# # old way of backups. It takes more time to restore and backup
# # it's true by default
# tar: false
# # type of backup. can be 'gcp-native', 'gcp-rclone' or 'r2-rclone'
# type: 'gcp-rclone'
# # name of the bucket
# bucket_name: "backup"
# # the public domain name of the bucket
# # it's empty by default
# bucket_domain: "backup.polkadot.io"
202 changes: 202 additions & 0 deletions roles/node_backup/files/exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import sys
import pickle
import json
import logging
import threading
import traceback
import io
from http.server import BaseHTTPRequestHandler, HTTPServer
from prometheus_client import start_http_server, Gauge


LOGGING_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'


cache_filename = os.path.dirname(__file__) + '/exporter.cache'

backup_labels = ['id', 'storage', 'bucket_name', 'service_name', 'version']
backup_metrics = {
"timestamp": Gauge('node_backup_timestamp',
'Time of the last backup (unix timestamp)',
backup_labels),
"size": Gauge('node_backup_size',
'Size of the last backup (byte)',
backup_labels),
"last_block": Gauge('node_backup_last_block',
'Last block in the last backup (byte)',
backup_labels),
"last_backup": Gauge('node_backup_last_backup',
'Last backup',
backup_labels + ['backup_name', 'tar_backup_path', 'backup_path']),
"total_size": Gauge('node_backup_total_size',
'Size of all backups (byte)',
['storage', 'bucket_name'])
}


def update_cache(key, value):
if os.path.exists(cache_filename) and os.path.getsize(cache_filename) > 0:
with open(cache_filename, 'rb') as f:
data = pickle.load(f)
else:
data = {}
data[key] = value
with open(cache_filename, 'wb') as f:
pickle.dump(data, f)


def fetch_cache():
if os.path.exists(cache_filename) and os.path.getsize(cache_filename) > 0:
with open(cache_filename, 'rb') as f:
data = pickle.load(f)
logging.info(f"Fetched from cache: {data}")
return data
else:
return {}


def clean_metrics(id, backup_name, version):
"""
Purge records with old versions
"""

def check_record(key_value) -> bool:
return (
id in key_value['labels'] and
key_value['name'] != 'node_backup_total_size' and
(
(key_value['name'] == 'node_backup_last_backup' and backup_name not in key_value['labels']) or
version not in key_value['labels']
)
)

for metric in backup_metrics.items():
current_metrics=[{'name': i.name, 'labels': list(i.labels.values()), 'value': i.value} for i in metric[1].collect()[0].samples]
old_metrics = list(filter(check_record, current_metrics))
for old_metric in old_metrics:
logging.info(f"clean {old_metric['name']} metric with label set: {str(old_metric['labels'])}")
metric[1].remove(*old_metric['labels'])


def set_metrics(data):
id = f"{data['storage']}-{data['bucketName']}-{data['serviceName']}"
common_labels={'id': id,
'storage': data['storage'],
'bucket_name': data['bucketName'],
'service_name': data['serviceName'],
'version': data['version']}
if data['bucketDomain'] != '':
backup_path=f"https://{data['bucketDomain']}/{data['serviceName']}/{data['backupName']}"
tar_backup_path=f"https://{data['bucketDomain']}/tar/{data['serviceName']}/{data['backupName']}.tar"
elif data['bucketDomain'] == '' and data['storage'] == 'gcp':
backup_path=f"gs://{data['bucketName']}/{data['serviceName']}/{data['backupName']}"
tar_backup_path=f"https://storage.googleapis.com/{data['bucketName']}/tar/{data['serviceName']}/{data['backupName']}.tar"
else:
raise Exception("'bucketDomain' has to be defined")
clean_metrics(id, data['backupName'], data['version'])
backup_metrics['timestamp'].labels(**common_labels).set(int(data['timeStamp']))
backup_metrics['size'].labels(**common_labels).set(int(data['size']))
backup_metrics['last_block'].labels(**common_labels).set(int(data['lastBlock']))
backup_metrics['last_backup'].labels(**common_labels,
backup_name=data['backupName'],
backup_path=backup_path,
tar_backup_path=tar_backup_path).set(1)
backup_metrics['total_size'].labels(storage=data['storage'],
bucket_name=data['bucketName']).set(int(data['totalSize']))
update_cache((data['storage'], data['bucketName'], data['serviceName']), data)
logging.info(f"request was processed successfully. data: {data}")


class HttpProcessor(BaseHTTPRequestHandler):
"""
HTTP Server
"""
BaseHTTPRequestHandler.server_version = 'Python API'

def log_message(self, format, *args):
message = f"{self.address_string()} {format % args}"
logging.info(message)

def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'application/json; charset=utf-8')
self.end_headers()


def do_POST(self):
if self.headers.get('Content-Type') != 'application/json':
self.send_error(400, "Only application/json supported")
self.end_headers()
return
data = ""
try:
# read the message and convert it into a python dictionary
length = int(self.headers['content-length'])
data = self.rfile.read(length)

set_metrics(json.loads(data))
self.send_response(200)

self._set_headers()
self.wfile.write(json.dumps({"status": "OK"}).encode("utf8"))
except json.decoder.JSONDecodeError as e:
tb_output = io.StringIO()
traceback.print_tb(e.__traceback__, file=tb_output)
logging.error(f"JSON decoding error. error: '{e}', JSON: '{data}'")
logging.error(f"JSON decoding error. traceback:\n{tb_output.getvalue()}")
tb_output.close()
self.send_error(400, 'JSONDecodeError')
return
except Exception as e:
tb_output = io.StringIO()
traceback.print_tb(e.__traceback__, file=tb_output)
logging.error(f"request processing error. error: '{e}'")
logging.error(f"request processing error. traceback:\n{tb_output.getvalue()}")
tb_output.close()
self.send_error(500)
return


def start_servers():
"""
Start HTTP Servers
"""
# Start up the server to expose the metrics.
start_http_server(9109) # Metrics server
server_address = ('127.0.0.1', 60101) # Data reception server
server = HTTPServer(server_address, HttpProcessor)
server.serve_forever()


if __name__ == '__main__':

# set up console log handler
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter(LOGGING_FORMAT)
console.setFormatter(formatter)
# set up basic logging config
logging.basicConfig(format=LOGGING_FORMAT, level=logging.INFO, handlers=[console])


for backup in fetch_cache().items():
try:
set_metrics(backup[1])
except KeyError as e:
logging.error(f"cache fetching error. error: {e}, key: {backup[0]}, value: {backup[1]}")
except Exception as e:
tb_output = io.StringIO()
traceback.print_tb(e.__traceback__, file=tb_output)
logging.error(f"cache fetching error. error: '{e}'")
logging.error(f"cache fetching error. traceback:\n{tb_output.getvalue()}")
tb_output.close()
sys.exit(1)

thread = threading.Thread(target=start_servers, args=())
thread.daemon = True
thread.start()
thread.join()
15 changes: 15 additions & 0 deletions roles/node_backup/handlers/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---

- name: restart node-backup exporter
ansible.builtin.systemd:
name: "node-backup-exporter"
state: restarted
enabled: true
daemon_reload: true

- name: restart node-backup timer
ansible.builtin.systemd:
name: "node-backup.timer"
state: restarted
enabled: true
daemon_reload: true
25 changes: 25 additions & 0 deletions roles/node_backup/molecule/default/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
### Collection

Molecula should install collection automatically, If id did not happened run:
```commandline
mkdir molecule/default/collections
ansible-galaxy collection install -f -r molecule/default/collections.yml -p ./molecule/default/collections
```

### Molecule
#### Docker
Test role with docker driver
```shell
molecule create
molecule converge
molecule destroy
```

#### LXD
Test role with LXD driver
```shell
DRIVER=lxd molecule create
DRIVER=lxd molecule converge
DRIVER=lxd molecule destroy
```

4 changes: 4 additions & 0 deletions roles/node_backup/molecule/default/collections.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
collections:
- name: https://github.com/paritytech/ansible-galaxy.git
type: git
version: main
7 changes: 7 additions & 0 deletions roles/node_backup/molecule/default/converge.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
- name: Converge
hosts: all
tasks:
- name: "Include node backup"
ansible.builtin.include_role:
name: "node_backup"
Loading

0 comments on commit 8498b76

Please sign in to comment.