-
Notifications
You must be signed in to change notification settings - Fork 9
/
reindex_batch_raw_experimental.py
221 lines (178 loc) · 7.83 KB
/
reindex_batch_raw_experimental.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# Import necessary modules here
import time
import os
import os.path
import shutil
from ops.data_ops import sort_list_dict, get_home, make_folder
from ops.account_ops import change_balance, increase_produced_count, get_totals, index_totals
from ops.block_ops import get_block_ends_info, get_block, set_latest_block_info, update_child_in_latest_block
from genesis import make_genesis, create_indexers
from ops.log_ops import get_logger, logging
from ops.sqlite_ops import DbHandler
import sqlite3
to_wipeout = ["index"]
def batch_insert_blocks(blocks_data):
db_path = f"{get_home()}/index/blocks.db"
# Prepare data for insertion
insert_data = [(block["block_hash"], block["block_number"]) for block in blocks_data]
with sqlite3.connect(db_path) as conn:
try:
cursor = conn.cursor()
cursor.executemany("INSERT INTO block_index (block_hash, block_number) VALUES (?, ?)", insert_data)
conn.commit()
except sqlite3.Error as e:
print(f"An error occurred during batch insert: {e}")
def delete(to_wipeout):
for folder in to_wipeout:
print(f"Removing {folder}")
path = f"{get_home()}/{folder}"
if os.path.exists(path):
shutil.rmtree(path)
print(f"Removed {path}")
delete(to_wipeout)
make_folder(f"{get_home()}/index")
make_folder(f"{get_home()}/index/producer_sets")
make_folder(f"{get_home()}/index/transactions")
create_indexers()
logger = get_logger(file="reindex.log", logger_name="reindex_logger")
make_genesis(
address="ndo18c3afa286439e7ebcb284710dbd4ae42bdaf21b80137b",
balance=1000000000000000000,
ip="78.102.98.72",
port=9173,
timestamp=1669852800,
logger=logger,
)
block_ends = get_block_ends_info(logger=logger)
print(block_ends["latest_block"])
update_child_in_latest_block(
child_hash="3abbfe409d446d997fbf65767c97e3f59ecb943d61a000240432e1627187966b",
logger=logger,
parent=block_ends["latest_block"]
)
first_block = block_ends["latest_block"]
block = first_block
blocks_data = []
transaction_data = []
totals_data = []
account_balances = {}
def batch_process_transactions(transaction_data):
txs_to_index = []
for transaction in transaction_data:
txs_to_index.append((transaction["data"]['txid'],
transaction["block_number"],
transaction["data"]['sender'],
transaction["data"]['recipient']))
# Database handling logic
height_db = 666 # Adjust this value as needed
db_path = f"{get_home()}/index/transactions/block_range_{height_db}.db"
if not os.path.exists(db_path):
tx_handler = DbHandler(db_file=db_path)
tx_handler.db_execute(
query="CREATE TABLE tx_index(txid TEXT, block_number INTEGER, sender TEXT, recipient TEXT)")
tx_handler.db_execute(query="CREATE INDEX seek_index ON tx_index(txid, sender, recipient)")
tx_handler = DbHandler(db_file=db_path)
tx_handler.db_executemany("INSERT INTO tx_index VALUES (?,?,?,?)", txs_to_index)
tx_handler.close()
def insert_aggregated_totals(account_balances):
total_produced = sum(details['produced'] for details in account_balances.values())
total_fees = sum(details['fees'] for details in account_balances.values())
total_burned = sum(details['burned'] for details in account_balances.values())
db_path = f"{get_home()}/index/accounts.db"
try:
conn = sqlite3.connect(db_path)
conn.execute('INSERT INTO totals_index (produced, fees, burned) VALUES (?, ?, ?)',
(total_produced, total_fees, total_burned))
conn.commit()
except sqlite3.Error as e:
print(f"Database error: {e}")
raise
finally:
if conn:
conn.close()
def update_account_details(account, amount, is_produced=False, is_burned=False, fee=0):
if account not in account_balances:
account_balances[account] = {'balance': 0, 'produced': 0, 'burned': 0, 'fees': 0}
# Update balance for all accounts
if not is_burned: # prevent doubling (stupid logic here, burns are called twice)
account_balances[account]['balance'] += amount
account_balances[account]['fees'] += fee
if is_produced:
account_balances[account]['produced'] += amount
if is_burned:
account_balances[account]['burned'] += amount
block_count = 0
while True: #["block_number"] < 4000:
block_ends = get_block_ends_info(logger=logger)
if not block["child_hash"]:
set_latest_block_info(latest_block=block,
logger=logger)
break
block = get_block(block=block["child_hash"])
if block["block_number"] > 0:
sorted_transactions = sort_list_dict(block["block_transactions"])
blocks_data.append(block)
if sorted_transactions:
for transaction in sorted_transactions:
sender = transaction['sender']
recipient = transaction['recipient']
amount = transaction['amount']
fee = transaction['fee'] if block["block_number"] > 111111 else 0
# Deduct fee from sender's account and amount
update_account_details(sender, -amount - fee, fee=fee)
# Credit amount to recipient's account
update_account_details(recipient, amount)
# If recipient is "burn", mark the amount as burned for the sender
if recipient == "burn":
update_account_details(sender, amount, is_burned=True)
transaction_data.append({"data": sorted_transactions[0],
"block_number": block["block_number"]})
totals = get_totals(block=block)
totals_data.extend(totals)
# Credit block reward to block creator
update_account_details(block["block_creator"], block["block_reward"], is_produced=True)
block_count += 1
if block_count % 5000 == 0:
# Perform batch operations here after every 5000 loops
batch_process_transactions(transaction_data)
batch_insert_blocks(blocks_data)
blocks_data.clear()
transaction_data.clear()
totals_data.clear()
print(f"Batch processing after {block_count} loops")
# Perform batch operations at the end
batch_process_transactions(transaction_data)
batch_insert_blocks(blocks_data)
blocks_data.clear()
transaction_data.clear()
totals_data.clear()
print(f"Batch processing until {block_count} loops")
# Function to save account details to the database
def save_account_details(account_balances):
db_path = f"{get_home()}/index/accounts.db"
try:
conn = sqlite3.connect(db_path)
conn.execute('''CREATE TABLE IF NOT EXISTS acc_index
(account TEXT PRIMARY KEY, balance INT, produced INT, burned INT)''')
for account, details in account_balances.items():
cur = conn.cursor()
cur.execute('SELECT balance, produced, burned FROM acc_index WHERE address = ?', (account,))
row = cur.fetchone()
if row:
new_balance = row[0] + details['balance']
new_produced = row[1] + details['produced']
new_burned = row[2] + details['burned']
cur.execute('UPDATE acc_index SET balance = ?, produced = ?, burned = ? WHERE address = ?',
(new_balance, new_produced, new_burned, account))
else:
cur.execute('INSERT INTO acc_index (address, balance, produced, burned) VALUES (?, ?, ?, ?)',
(account, details['balance'], details['produced'], details['burned']))
conn.commit()
except sqlite3.Error as e:
print(e)
raise
finally:
if conn:
conn.close()
save_account_details(account_balances)
insert_aggregated_totals(account_balances)