Skip to content

Commit

Permalink
Merge pull request #141 from Chia-Network/develop
Browse files Browse the repository at this point in the history
Release 1.0.36
  • Loading branch information
TheLastCicada authored Nov 13, 2023
2 parents f0041f3 + f540df3 commit 1a2ba70
Show file tree
Hide file tree
Showing 52 changed files with 479 additions and 465 deletions.
4 changes: 4 additions & 0 deletions .isort.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[settings]
line_length = 120
profile=black
skip_gitignore=true
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Climate Token Driver Suite

![Minimum Chia Version](https://raw.githubusercontent.com/Chia-Network/core-registry-api/main/minimumChiaVersion.svg)
![Tested Up to Chia Version](https://raw.githubusercontent.com/Chia-Network/core-registry-api/main/testedChiaVersion.svg)

This application can run in 4 modes, each providing a separate application with a distinct use case:
Expand Down
22 changes: 17 additions & 5 deletions app/api/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
from __future__ import annotations

import enum
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from pathlib import Path
from typing import Iterator
from typing import AsyncGenerator, AsyncIterator

from chia.rpc.full_node_rpc_client import FullNodeRpcClient
from chia.rpc.rpc_client import RpcClient
from chia.rpc.wallet_rpc_client import WalletRpcClient
from chia.util.config import load_config
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.ints import uint16
from sqlalchemy.orm import Session

from app.config import settings
from app.db.session import get_session_local_cls
from app.logger import logger


def get_db_session_context() -> AbstractAsyncContextManager[Session]:
return asynccontextmanager(get_db_session)()


async def get_db_session() -> Session:
SessionLocal = await get_session_local_cls()

Expand All @@ -37,17 +45,20 @@ async def _get_rpc_client(
self_hostname: str,
rpc_port: int,
root_path: Path = DEFAULT_ROOT_PATH,
) -> Iterator[RpcClient]:
) -> AsyncGenerator[RpcClient, None]:
rpc_client_cls = {
NodeType.FULL_NODE: FullNodeRpcClient,
NodeType.WALLET: WalletRpcClient,
}.get(node_type)

if rpc_client_cls is None:
raise ValueError(f"Invalid node_type: {node_type}")

config = load_config(root_path, "config.yaml")

client = await rpc_client_cls.create(
self_hostname=self_hostname,
port=rpc_port,
port=uint16(rpc_port),
root_path=root_path,
net_config=config,
)
Expand All @@ -61,7 +72,7 @@ async def _get_rpc_client(
await client.await_closed()


async def get_wallet_rpc_client() -> Iterator[WalletRpcClient]:
async def get_wallet_rpc_client() -> AsyncIterator[WalletRpcClient]:
async for _ in _get_rpc_client(
node_type=NodeType.WALLET,
self_hostname=settings.CHIA_HOSTNAME,
Expand All @@ -71,7 +82,8 @@ async def get_wallet_rpc_client() -> Iterator[WalletRpcClient]:
yield _


async def get_full_node_rpc_client() -> Iterator[FullNodeRpcClient]:
@asynccontextmanager
async def get_full_node_rpc_client() -> AsyncIterator[FullNodeRpcClient]:
async for _ in _get_rpc_client(
node_type=NodeType.FULL_NODE,
self_hostname=settings.CHIA_HOSTNAME,
Expand Down
2 changes: 2 additions & 0 deletions app/api/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from __future__ import annotations

from app.api.v1.core import router
19 changes: 7 additions & 12 deletions app/api/v1/activities.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Dict, List, Optional
from pydantic import ValidationError
from typing import Any, Dict, List, Optional

from fastapi import APIRouter, Depends
from fastapi.encoders import jsonable_encoder
from pydantic import ValidationError
from sqlalchemy.orm import Session

from app import crud, models, schemas
Expand All @@ -11,7 +12,7 @@
from app.errors import ErrorCode
from app.logger import logger
from app.utils import disallow
import pprint

router = APIRouter()


Expand All @@ -26,15 +27,15 @@ async def get_activity(
limit: int = 10,
sort: str = "desc",
db: Session = Depends(deps.get_db_session),
):
) -> schemas.ActivitiesResponse:
"""Get activity.
This endpoint is to be called by the explorer.
"""

db_crud = crud.DBCrud(db=db)

activity_filters = {"or": [], "and": []}
activity_filters: Dict[str, Any] = {"or": [], "and": []}
cw_filters = {}
match search_by:
case schemas.ActivitySearchBy.ONCHAIN_METADATA:
Expand Down Expand Up @@ -91,17 +92,11 @@ async def get_activity(
limit=limit,
)
if len(activities) == 0:
logger.warning(
f"No data to get from activities. filters:{activity_filters} page:{page} limit:{limit}"
)
logger.warning(f"No data to get from activities. filters:{activity_filters} page:{page} limit:{limit}")
return schemas.ActivitiesResponse()

pp = pprint.PrettyPrinter(indent=4)

pp.pprint(f"Got {len(activities)} activities from activities table.")
activities_with_cw: List[schemas.ActivityWithCW] = []
for activity in activities:
pp.pprint(f"Checking activity: {activity}")
unit = units.get(activity.asset_id)
if unit is None:
continue
Expand Down
6 changes: 5 additions & 1 deletion app/api/v1/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from __future__ import annotations

from typing import Dict

from fastapi import APIRouter

from app.api.v1 import activities, cron, keys, tokens, transactions
Expand All @@ -9,7 +13,7 @@


@router.get("/info")
async def get_info():
async def get_info() -> Dict[str, str]:
return {
"blockchain_name": "Chia Network",
"blockchain_name_short": "chia",
Expand Down
108 changes: 71 additions & 37 deletions app/api/v1/cron.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import asyncio
from typing import Any, Dict, List, Optional
import json
from typing import List

from blspy import G1Element
from chia.consensus.block_record import BlockRecord
Expand All @@ -18,7 +21,7 @@
from app.errors import ErrorCode
from app.logger import logger
from app.models import State
from app.utils import as_async_contextmanager, disallow
from app.utils import disallow

router = APIRouter()
errorcode = ErrorCode()
Expand All @@ -27,7 +30,7 @@

@router.on_event("startup")
@disallow([ExecutionMode.REGISTRY, ExecutionMode.CLIENT])
async def init_db():
async def init_db() -> None:
Engine = await get_engine_cls()

if not database_exists(Engine.url):
Expand All @@ -38,7 +41,7 @@ async def init_db():

Base.metadata.create_all(Engine)

async with as_async_contextmanager(deps.get_db_session) as db:
async with deps.get_db_session_context() as db:
state = State(id=1, current_height=settings.BLOCK_START, peak_height=None)
db_state = [jsonable_encoder(state)]

Expand Down Expand Up @@ -70,34 +73,67 @@ async def _scan_token_activity(

logger.info(f"Scanning blocks {start_height} - {end_height} for activity")

climate_units: Dict[
str, Any
] = climate_warehouse.combine_climate_units_and_metadata(search={})
for unit in climate_units:
token: Optional[Dict] = unit.get("token")

# is None or empty
if not token:
logger.warning(f"Can not get token in climate warehouse unit. unit:{unit}")
continue
# Check if SCAN_ALL_ORGANIZATIONS is defined and True, otherwise treat as False
scan_all = getattr(settings, "SCAN_ALL_ORGANIZATIONS", False)

public_key = G1Element.from_bytes(hexstr_to_bytes(token["public_key"]))
all_organizations = climate_warehouse.get_climate_organizations()
if not scan_all:
# Convert to a list of organizations where `isHome` is True
climate_organizations = [org for org in all_organizations.values() if org.get("isHome", False)]
else:
# Convert to a list of all organizations
climate_organizations = list(all_organizations.values())

activities: List[schemas.Activity] = await blockchain.get_activities(
org_uid=token["org_uid"],
warehouse_project_id=token["warehouse_project_id"],
vintage_year=token["vintage_year"],
sequence_num=token["sequence_num"],
public_key=public_key,
start_height=start_height,
end_height=end_height,
peak_height=state.peak_height,
)
for org in climate_organizations:
org_uid = org["orgUid"]
org_name = org["name"]

if len(activities) == 0:
org_metadata = climate_warehouse.get_climate_organizations_metadata(org_uid)
if not org_metadata:
logger.warning(f"Cannot get metadata in CADT organization: {org_name}")
continue

db_crud.batch_insert_ignore_activity(activities)
for key, value_str in org_metadata.items():
try:
tokenization_dict = json.loads(value_str)
required_fields = [
"org_uid",
"warehouse_project_id",
"vintage_year",
"sequence_num",
"public_key",
"index",
]
optional_fields = ["permissionless_retirement", "detokenization"]

if not all(field in tokenization_dict for field in required_fields) or not any(
field in tokenization_dict for field in optional_fields
):
# not a tokenization record
continue

public_key = G1Element.from_bytes(hexstr_to_bytes(tokenization_dict["public_key"]))
activities: List[schemas.Activity] = await blockchain.get_activities(
org_uid=tokenization_dict["org_uid"],
warehouse_project_id=tokenization_dict["warehouse_project_id"],
vintage_year=tokenization_dict["vintage_year"],
sequence_num=tokenization_dict["sequence_num"],
public_key=public_key,
start_height=state.current_height,
end_height=end_height,
peak_height=state.peak_height,
)

if len(activities) == 0:
continue

db_crud.batch_insert_ignore_activity(activities)
logger.info(f"Activities for {org_name} and asset id: {key} added to the database.")

except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON for key {key} in organization {org_name}: {str(e)}")
except Exception as e:
logger.error(f"An error occurred for organization {org_name} under key {key}: {str(e)}")

db_crud.update_block_state(current_height=target_start_height)
return True
Expand All @@ -112,13 +148,11 @@ async def scan_token_activity() -> None:

async with (
lock,
as_async_contextmanager(deps.get_db_session) as db,
as_async_contextmanager(deps.get_full_node_rpc_client) as full_node_client,
deps.get_db_session_context() as db,
deps.get_full_node_rpc_client() as full_node_client,
):
db_crud = crud.DBCrud(db=db)
climate_warehouse = crud.ClimateWareHouseCrud(
url=settings.CADT_API_SERVER_HOST, api_key=settings.CADT_API_KEY
)
climate_warehouse = crud.ClimateWareHouseCrud(url=settings.CADT_API_SERVER_HOST, api_key=settings.CADT_API_KEY)
blockchain = crud.BlockChainCrud(full_node_client=full_node_client)

try:
Expand All @@ -145,9 +179,9 @@ async def scan_token_activity() -> None:
async def _scan_blockchain_state(
db_crud: crud.DBCrud,
full_node_client: FullNodeRpcClient,
):
state: Dict = await full_node_client.get_blockchain_state()
peak: Dict = state.get("peak")
) -> None:
state = await full_node_client.get_blockchain_state()
peak = state.get("peak")

if peak is None:
logger.warning("Full node is not synced")
Expand All @@ -162,8 +196,8 @@ async def _scan_blockchain_state(
@disallow([ExecutionMode.REGISTRY, ExecutionMode.CLIENT])
async def scan_blockchain_state() -> None:
async with (
as_async_contextmanager(deps.get_db_session) as db,
as_async_contextmanager(deps.get_full_node_rpc_client) as full_node_client,
deps.get_db_session_context() as db,
deps.get_full_node_rpc_client() as full_node_client,
):
db_crud = crud.DBCrud(db=db)

Expand Down
17 changes: 6 additions & 11 deletions app/api/v1/keys.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Optional
from typing import Optional

from blspy import G1Element, PrivateKey
from chia.consensus.coinbase import create_puzzlehash_for_pk
Expand All @@ -7,10 +7,7 @@
from chia.util.bech32m import decode_puzzle_hash, encode_puzzle_hash
from chia.util.byte_types import hexstr_to_bytes
from chia.util.ints import uint32
from chia.wallet.derive_keys import (
master_sk_to_wallet_sk,
master_sk_to_wallet_sk_unhardened,
)
from chia.wallet.derive_keys import master_sk_to_wallet_sk, master_sk_to_wallet_sk_unhardened
from fastapi import APIRouter, Depends

from app import schemas
Expand All @@ -31,20 +28,18 @@ async def get_key(
derivation_index: int = 0,
prefix: str = "bls1238",
wallet_rpc_client: WalletRpcClient = Depends(deps.get_wallet_rpc_client),
):
) -> schemas.Key:
fingerprint: int = await wallet_rpc_client.get_logged_in_fingerprint()

result: Dict = await wallet_rpc_client.get_private_key(fingerprint)
result = await wallet_rpc_client.get_private_key(fingerprint)

secret_key = PrivateKey.from_bytes(hexstr_to_bytes(result["sk"]))

wallet_secret_key: PrivateKey
if hardened:
wallet_secret_key = master_sk_to_wallet_sk(secret_key, uint32(derivation_index))
else:
wallet_secret_key = master_sk_to_wallet_sk_unhardened(
secret_key, uint32(derivation_index)
)
wallet_secret_key = master_sk_to_wallet_sk_unhardened(secret_key, uint32(derivation_index))

wallet_public_key: G1Element = wallet_secret_key.get_g1()
puzzle_hash: bytes32 = create_puzzlehash_for_pk(wallet_public_key)
Expand All @@ -62,7 +57,7 @@ async def get_key(
)
async def parse_key(
address: str,
):
) -> Optional[schemas.Key]:
try:
puzzle_hash: bytes = decode_puzzle_hash(address)
except ValueError:
Expand Down
Loading

0 comments on commit 1a2ba70

Please sign in to comment.