Skip to content

Commit

Permalink
refactor: Pre-SDK v2 cleanup (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
janbuchar authored Jun 7, 2024
1 parent 5e507e8 commit aa5be7f
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 165 deletions.
32 changes: 0 additions & 32 deletions src/crawlee/_utils/data_processing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import json
from datetime import datetime, timezone
from enum import Enum
from typing import TYPE_CHECKING, Any, NoReturn

Expand Down Expand Up @@ -56,37 +55,6 @@ def maybe_parse_body(body: bytes, content_type: str) -> Any:
return body


def maybe_parse_bool(val: str | None) -> bool:
"""Parse a string value to a boolean."""
if val in {'true', 'True', '1'}:
return True
return False


def maybe_parse_datetime(val: str) -> datetime | str:
"""Parse a string value to a datetime object."""
try:
return datetime.strptime(val, '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
except ValueError:
return val


def maybe_parse_float(val: str) -> float | None:
"""Parse a string value to a float."""
try:
return float(val)
except ValueError:
return None


def maybe_parse_int(val: str) -> int | None:
"""Parse a string value to an integer."""
try:
return int(val)
except ValueError:
return None


def raise_on_duplicate_storage(client_type: StorageTypes, key_name: str, value: str) -> NoReturn:
"""Raise an error indicating that a storage with the provided key name and value already exists."""
client_type = maybe_extract_enum_member_value(client_type)
Expand Down
57 changes: 0 additions & 57 deletions src/crawlee/_utils/env_vars.py

This file was deleted.

6 changes: 4 additions & 2 deletions src/crawlee/base_storage_client/base_dataset_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, AsyncIterator
from typing import TYPE_CHECKING, AsyncContextManager, AsyncIterator

if TYPE_CHECKING:
from httpx import Response

from crawlee.models import DatasetItemsListPage, DatasetMetadata
from crawlee.types import JSONSerializable

Expand Down Expand Up @@ -185,7 +187,7 @@ async def stream_items(
skip_hidden: bool = False,
xml_root: str | None = None,
xml_row: str | None = None,
) -> AsyncIterator[dict]:
) -> AsyncContextManager[Response | None]:
"""Retrieves dataset items as a streaming response.
Args:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, AsyncIterator
from typing import TYPE_CHECKING, Any, AsyncContextManager

if TYPE_CHECKING:
from httpx import Response

from crawlee.models import KeyValueStoreListKeysPage, KeyValueStoreMetadata, KeyValueStoreRecord


Expand Down Expand Up @@ -70,7 +72,7 @@ async def get_record(self, key: str) -> KeyValueStoreRecord | None:
"""

@abstractmethod
async def get_record_as_bytes(self, key: str) -> KeyValueStoreRecord | None:
async def get_record_as_bytes(self, key: str) -> KeyValueStoreRecord[bytes] | None:
"""Retrieve the given record from the key-value store, without parsing it.
Args:
Expand All @@ -81,7 +83,7 @@ async def get_record_as_bytes(self, key: str) -> KeyValueStoreRecord | None:
"""

@abstractmethod
async def stream_record(self, key: str) -> AsyncIterator[KeyValueStoreRecord | None]:
async def stream_record(self, key: str) -> AsyncContextManager[KeyValueStoreRecord[Response] | None]:
"""Retrieve the given record from the key-value store, as a stream.
Args:
Expand Down
21 changes: 15 additions & 6 deletions src/crawlee/base_storage_client/base_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from crawlee.models import Request, RequestQueueHead, RequestQueueMetadata, RequestQueueOperationInfo
from crawlee.models import (
BatchRequestsOperationResponse,
ProlongRequestLockResponse,
Request,
RequestListResponse,
RequestQueueHead,
RequestQueueHeadWithLocks,
RequestQueueMetadata,
RequestQueueOperationInfo,
)


class BaseRequestQueueClient(ABC):
Expand Down Expand Up @@ -53,7 +62,7 @@ async def list_head(self, *, limit: int | None = None) -> RequestQueueHead:
"""

@abstractmethod
async def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> dict:
async def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> RequestQueueHeadWithLocks:
"""Fetch and lock a specified number of requests from the start of the queue.
Retrieves and locks the first few requests of a queue for the specified duration. This prevents the requests
Expand Down Expand Up @@ -127,7 +136,7 @@ async def prolong_request_lock(
*,
forefront: bool = False,
lock_secs: int,
) -> dict:
) -> ProlongRequestLockResponse:
"""Prolong the lock on a specific request in the queue.
Args:
Expand Down Expand Up @@ -156,7 +165,7 @@ async def batch_add_requests(
requests: list[Request],
*,
forefront: bool = False,
) -> dict:
) -> BatchRequestsOperationResponse:
"""Add batch of requests to the queue.
Args:
Expand All @@ -165,7 +174,7 @@ async def batch_add_requests(
"""

@abstractmethod
async def batch_delete_requests(self, requests: list[Request]) -> dict:
async def batch_delete_requests(self, requests: list[Request]) -> BatchRequestsOperationResponse:
"""Delete given requests from the queue.
Args:
Expand All @@ -178,7 +187,7 @@ async def list_requests(
*,
limit: int | None = None,
exclusive_start_id: str | None = None,
) -> dict:
) -> RequestListResponse:
"""List requests from the queue.
Args:
Expand Down
6 changes: 4 additions & 2 deletions src/crawlee/memory_storage_client/dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
from datetime import datetime, timezone
from logging import getLogger
from typing import TYPE_CHECKING, Any, AsyncIterator
from typing import TYPE_CHECKING, Any, AsyncContextManager, AsyncIterator

import aiofiles
import aioshutil
Expand All @@ -21,6 +21,8 @@
from crawlee.types import StorageTypes

if TYPE_CHECKING:
from httpx import Response

from crawlee.memory_storage_client import MemoryStorageClient
from crawlee.types import JSONSerializable

Expand Down Expand Up @@ -308,7 +310,7 @@ async def stream_items(
skip_hidden: bool = False,
xml_root: str | None = None,
xml_row: str | None = None,
) -> AsyncIterator:
) -> AsyncContextManager[Response | None]:
raise NotImplementedError('This method is not supported in memory storage.')

@override
Expand Down
8 changes: 5 additions & 3 deletions src/crawlee/memory_storage_client/key_value_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
from datetime import datetime, timezone
from logging import getLogger
from typing import TYPE_CHECKING, Any, AsyncIterator
from typing import TYPE_CHECKING, Any, AsyncContextManager

import aiofiles
import aioshutil
Expand All @@ -31,6 +31,8 @@
from crawlee.types import StorageTypes

if TYPE_CHECKING:
from httpx import Response

from crawlee.memory_storage_client import MemoryStorageClient

logger = getLogger(__name__)
Expand Down Expand Up @@ -240,11 +242,11 @@ async def get_record(self, key: str) -> KeyValueStoreRecord | None:
return await self._get_record_internal(key)

@override
async def get_record_as_bytes(self, key: str) -> KeyValueStoreRecord | None:
async def get_record_as_bytes(self, key: str) -> KeyValueStoreRecord[bytes] | None:
return await self._get_record_internal(key, as_bytes=True)

@override
async def stream_record(self, key: str) -> AsyncIterator[KeyValueStoreRecord | None]:
async def stream_record(self, key: str) -> AsyncContextManager[KeyValueStoreRecord[Response] | None]:
raise NotImplementedError('This method is not supported in memory storage.')

@override
Expand Down
23 changes: 16 additions & 7 deletions src/crawlee/memory_storage_client/request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@
find_or_create_client_by_id_or_name_inner,
persist_metadata_if_enabled,
)
from crawlee.models import Request, RequestQueueHead, RequestQueueMetadata, RequestQueueOperationInfo
from crawlee.models import (
BatchRequestsOperationResponse,
ProlongRequestLockResponse,
Request,
RequestListResponse,
RequestQueueHead,
RequestQueueHeadWithLocks,
RequestQueueMetadata,
RequestQueueOperationInfo,
)
from crawlee.types import StorageTypes

if TYPE_CHECKING:
Expand Down Expand Up @@ -227,7 +236,7 @@ async def list_head(self, *, limit: int | None = None) -> RequestQueueHead:
)

@override
async def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> dict:
async def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> RequestQueueHeadWithLocks:
raise NotImplementedError('This method is not supported in memory storage.')

@override
Expand Down Expand Up @@ -388,7 +397,7 @@ async def prolong_request_lock(
*,
forefront: bool = False,
lock_secs: int,
) -> dict:
) -> ProlongRequestLockResponse:
raise NotImplementedError('This method is not supported in memory storage.')

@override
Expand All @@ -406,11 +415,11 @@ async def batch_add_requests(
requests: list[Request],
*,
forefront: bool = False,
) -> dict:
) -> BatchRequestsOperationResponse:
raise NotImplementedError('This method is not supported in memory storage.')

@override
async def batch_delete_requests(self, requests: list[Request]) -> dict:
async def batch_delete_requests(self, requests: list[Request]) -> BatchRequestsOperationResponse:
raise NotImplementedError('This method is not supported in memory storage.')

@override
Expand All @@ -419,7 +428,7 @@ async def list_requests(
*,
limit: int | None = None,
exclusive_start_id: str | None = None,
) -> dict:
) -> RequestListResponse:
raise NotImplementedError('This method is not supported in memory storage.')

async def update_timestamps(self, *, has_been_modified: bool) -> None:
Expand Down Expand Up @@ -489,7 +498,7 @@ def _json_to_request(self, request_json: str | None) -> Request | None:
request_dict = filter_out_none_values_recursively(json.loads(request_json))
if request_dict is None:
return None
return Request(**request_dict)
return Request.model_validate(request_dict)

async def _create_internal_request(self, request: Request, forefront: bool | None) -> Request:
order_no = self._calculate_order_no(request, forefront)
Expand Down
Loading

0 comments on commit aa5be7f

Please sign in to comment.