Skip to content

Commit

Permalink
add MessageFilter & PostFilter; remove limit parameter; rename Aleph …
Browse files Browse the repository at this point in the history
…to aleph.im
  • Loading branch information
MHHukiewitz committed Oct 5, 2023
1 parent d95e5fa commit f2b4eb3
Show file tree
Hide file tree
Showing 14 changed files with 412 additions and 432 deletions.
164 changes: 20 additions & 144 deletions src/aleph/sdk/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import logging
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import (
Any,
Expand All @@ -26,42 +25,33 @@
from aleph_message.models.execution.program import Encoding
from aleph_message.status import MessageStatus

from aleph.sdk.models import PostsResponse
from aleph.sdk.types import GenericMessage, StorageEnum
from .models.message import MessageFilter
from .models.post import PostFilter, PostsResponse
from .types import GenericMessage, StorageEnum

DEFAULT_PAGE_SIZE = 200


class BaseAlephClient(ABC):
@abstractmethod
async def fetch_aggregate(
self,
address: str,
key: str,
limit: int = 100,
) -> Dict[str, Dict]:
async def fetch_aggregate(self, address: str, key: str) -> Dict[str, Dict]:
"""
Fetch a value from the aggregate store by owner address and item key.
:param address: Address of the owner of the aggregate
:param key: Key of the aggregate
:param limit: Maximum number of items to fetch (Default: 100)
"""
pass

@abstractmethod
async def fetch_aggregates(
self,
address: str,
keys: Optional[Iterable[str]] = None,
limit: int = 100,
self, address: str, keys: Optional[Iterable[str]] = None
) -> Dict[str, Dict]:
"""
Fetch key-value pairs from the aggregate store by owner address.
:param address: Address of the owner of the aggregate
:param keys: Keys of the aggregates to fetch (Default: all items)
:param limit: Maximum number of items to fetch (Default: 100)
"""
pass

Expand All @@ -70,15 +60,7 @@ async def get_posts(
self,
pagination: int = DEFAULT_PAGE_SIZE,
page: int = 1,
types: Optional[Iterable[str]] = None,
refs: Optional[Iterable[str]] = None,
addresses: Optional[Iterable[str]] = None,
tags: Optional[Iterable[str]] = None,
hashes: Optional[Iterable[str]] = None,
channels: Optional[Iterable[str]] = None,
chains: Optional[Iterable[str]] = None,
start_date: Optional[Union[datetime, float]] = None,
end_date: Optional[Union[datetime, float]] = None,
post_filter: Optional[PostFilter] = None,
ignore_invalid_messages: Optional[bool] = True,
invalid_messages_log_level: Optional[int] = logging.NOTSET,
) -> PostsResponse:
Expand All @@ -87,60 +69,28 @@ async def get_posts(
:param pagination: Number of items to fetch (Default: 200)
:param page: Page to fetch, begins at 1 (Default: 1)
:param types: Types of posts to fetch (Default: all types)
:param refs: If set, only fetch posts that reference these hashes (in the "refs" field)
:param addresses: Addresses of the posts to fetch (Default: all addresses)
:param tags: Tags of the posts to fetch (Default: all tags)
:param hashes: Specific item_hashes to fetch
:param channels: Channels of the posts to fetch (Default: all channels)
:param chains: Chains of the posts to fetch (Default: all chains)
:param start_date: Earliest date to fetch messages from
:param end_date: Latest date to fetch messages from
:param post_filter: Filter to apply to the posts (Default: None)
:param ignore_invalid_messages: Ignore invalid messages (Default: True)
:param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET)
"""
pass

async def get_posts_iterator(
self,
types: Optional[Iterable[str]] = None,
refs: Optional[Iterable[str]] = None,
addresses: Optional[Iterable[str]] = None,
tags: Optional[Iterable[str]] = None,
hashes: Optional[Iterable[str]] = None,
channels: Optional[Iterable[str]] = None,
chains: Optional[Iterable[str]] = None,
start_date: Optional[Union[datetime, float]] = None,
end_date: Optional[Union[datetime, float]] = None,
post_filter: Optional[PostFilter] = None,
) -> AsyncIterable[PostMessage]:
"""
Fetch all filtered posts, returning an async iterator and fetching them page by page. Might return duplicates
but will always return all posts.
:param types: Types of posts to fetch (Default: all types)
:param refs: If set, only fetch posts that reference these hashes (in the "refs" field)
:param addresses: Addresses of the posts to fetch (Default: all addresses)
:param tags: Tags of the posts to fetch (Default: all tags)
:param hashes: Specific item_hashes to fetch
:param channels: Channels of the posts to fetch (Default: all channels)
:param chains: Chains of the posts to fetch (Default: all chains)
:param start_date: Earliest date to fetch messages from
:param end_date: Latest date to fetch messages from
:param post_filter: Filter to apply to the posts (Default: None)
"""
page = 1
resp = None
while resp is None or len(resp.posts) > 0:
resp = await self.get_posts(
page=page,
types=types,
refs=refs,
addresses=addresses,
tags=tags,
hashes=hashes,
channels=channels,
chains=chains,
start_date=start_date,
end_date=end_date,
post_filter=post_filter,
)
page += 1
for post in resp.posts:
Expand All @@ -165,18 +115,7 @@ async def get_messages(
self,
pagination: int = DEFAULT_PAGE_SIZE,
page: int = 1,
message_type: Optional[MessageType] = None,
message_types: Optional[Iterable[MessageType]] = None,
content_types: Optional[Iterable[str]] = None,
content_keys: Optional[Iterable[str]] = None,
refs: Optional[Iterable[str]] = None,
addresses: Optional[Iterable[str]] = None,
tags: Optional[Iterable[str]] = None,
hashes: Optional[Iterable[str]] = None,
channels: Optional[Iterable[str]] = None,
chains: Optional[Iterable[str]] = None,
start_date: Optional[Union[datetime, float]] = None,
end_date: Optional[Union[datetime, float]] = None,
message_filter: Optional[MessageFilter] = None,
ignore_invalid_messages: Optional[bool] = True,
invalid_messages_log_level: Optional[int] = logging.NOTSET,
) -> MessagesResponse:
Expand All @@ -185,69 +124,28 @@ async def get_messages(
:param pagination: Number of items to fetch (Default: 200)
:param page: Page to fetch, begins at 1 (Default: 1)
:param message_type: [DEPRECATED] Filter by message type, can be "AGGREGATE", "POST", "PROGRAM", "VM", "STORE" or "FORGET"
:param message_types: Filter by message types, can be any combination of "AGGREGATE", "POST", "PROGRAM", "VM", "STORE" or "FORGET"
:param content_types: Filter by content type
:param content_keys: Filter by aggregate key
:param refs: If set, only fetch posts that reference these hashes (in the "refs" field)
:param addresses: Addresses of the posts to fetch (Default: all addresses)
:param tags: Tags of the posts to fetch (Default: all tags)
:param hashes: Specific item_hashes to fetch
:param channels: Channels of the posts to fetch (Default: all channels)
:param chains: Filter by sender address chain
:param start_date: Earliest date to fetch messages from
:param end_date: Latest date to fetch messages from
:param message_filter: Filter to apply to the messages
:param ignore_invalid_messages: Ignore invalid messages (Default: True)
:param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET)
"""
pass

async def get_messages_iterator(
self,
message_type: Optional[MessageType] = None,
content_types: Optional[Iterable[str]] = None,
content_keys: Optional[Iterable[str]] = None,
refs: Optional[Iterable[str]] = None,
addresses: Optional[Iterable[str]] = None,
tags: Optional[Iterable[str]] = None,
hashes: Optional[Iterable[str]] = None,
channels: Optional[Iterable[str]] = None,
chains: Optional[Iterable[str]] = None,
start_date: Optional[Union[datetime, float]] = None,
end_date: Optional[Union[datetime, float]] = None,
message_filter: Optional[MessageFilter] = None,
) -> AsyncIterable[AlephMessage]:
"""
Fetch all filtered messages, returning an async iterator and fetching them page by page. Might return duplicates
but will always return all messages.
:param message_type: Filter by message type, can be "AGGREGATE", "POST", "PROGRAM", "VM", "STORE" or "FORGET"
:param content_types: Filter by content type
:param content_keys: Filter by content key
:param refs: If set, only fetch posts that reference these hashes (in the "refs" field)
:param addresses: Addresses of the posts to fetch (Default: all addresses)
:param tags: Tags of the posts to fetch (Default: all tags)
:param hashes: Specific item_hashes to fetch
:param channels: Channels of the posts to fetch (Default: all channels)
:param chains: Filter by sender address chain
:param start_date: Earliest date to fetch messages from
:param end_date: Latest date to fetch messages from
:param message_filter: Filter to apply to the messages
"""
page = 1
resp = None
while resp is None or len(resp.messages) > 0:
resp = await self.get_messages(
page=page,
message_type=message_type,
content_types=content_types,
content_keys=content_keys,
refs=refs,
addresses=addresses,
tags=tags,
hashes=hashes,
channels=channels,
chains=chains,
start_date=start_date,
end_date=end_date,
message_filter=message_filter,
)
page += 1
for message in resp.messages:
Expand All @@ -272,34 +170,12 @@ async def get_message(
@abstractmethod
def watch_messages(
self,
message_type: Optional[MessageType] = None,
message_types: Optional[Iterable[MessageType]] = None,
content_types: Optional[Iterable[str]] = None,
content_keys: Optional[Iterable[str]] = None,
refs: Optional[Iterable[str]] = None,
addresses: Optional[Iterable[str]] = None,
tags: Optional[Iterable[str]] = None,
hashes: Optional[Iterable[str]] = None,
channels: Optional[Iterable[str]] = None,
chains: Optional[Iterable[str]] = None,
start_date: Optional[Union[datetime, float]] = None,
end_date: Optional[Union[datetime, float]] = None,
message_filter: Optional[MessageFilter] = None,
) -> AsyncIterable[AlephMessage]:
"""
Iterate over current and future matching messages asynchronously.
:param message_type: [DEPRECATED] Type of message to watch
:param message_types: Types of messages to watch
:param content_types: Content types to watch
:param content_keys: Filter by aggregate key
:param refs: References to watch
:param addresses: Addresses to watch
:param tags: Tags to watch
:param hashes: Hashes to watch
:param channels: Channels to watch
:param chains: Chains to watch
:param start_date: Start date from when to watch
:param end_date: End date until when to watch
:param message_filter: Filter to apply to the messages
"""
pass

Expand All @@ -318,7 +194,7 @@ async def create_post(
sync: bool = False,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Create a POST message on the Aleph network. It is associated with a channel and owned by an account.
Create a POST message on the aleph.im network. It is associated with a channel and owned by an account.
:param post_content: The content of the message
:param post_type: An arbitrary content type that helps to describe the post_content
Expand Down Expand Up @@ -368,7 +244,7 @@ async def create_store(
sync: bool = False,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Create a STORE message to store a file on the Aleph network.
Create a STORE message to store a file on the aleph.im network.
Can be passed either a file path, an IPFS hash or the file's content as raw bytes.
Expand Down Expand Up @@ -422,7 +298,7 @@ async def create_program(
:param persistent: Whether the program should be persistent or not (Default: False)
:param encoding: Encoding to use (Default: Encoding.zip)
:param volumes: Volumes to mount
:param subscriptions: Patterns of Aleph messages to forward to the program's event receiver
:param subscriptions: Patterns of aleph.im messages to forward to the program's event receiver
:param metadata: Metadata to attach to the message
"""
pass
Expand Down
Loading

0 comments on commit f2b4eb3

Please sign in to comment.