Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scan does not work as expected #495

Closed
ndrluis opened this issue Jul 28, 2024 · 16 comments
Closed

Scan does not work as expected #495

ndrluis opened this issue Jul 28, 2024 · 16 comments

Comments

@ndrluis
Copy link
Contributor

ndrluis commented Jul 28, 2024

I'm testing using the iceberg rest image from Tabular as a catalog.

Here's the docker-compose.yml file:

version: '3.8'

services:
  rest:
    image: tabulario/iceberg-rest:0.10.0
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
      - CATALOG_WAREHOUSE=s3://warehouse/
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
      - CATALOG_S3_ENDPOINT=http://minio:9000
    depends_on:
      - minio
    ports:
      - "8181:8181"
    networks:
      iceberg_net:

  minio:
    image: minio/minio:RELEASE.2024-03-07T00-43-48Z
    environment:
      - MINIO_ROOT_USER=admin
      - MINIO_ROOT_PASSWORD=password
      - MINIO_DOMAIN=minio
    networks:
      iceberg_net:
        aliases:
          - warehouse.minio
    expose:
      - 9001
      - 9000
    ports:
      - "9000:9000"
      - "9001:9001"
    command: [ "server", "/data", "--console-address", ":9001" ]

  mc:
    depends_on:
      - minio
    image: minio/mc:RELEASE.2024-03-07T00-31-49Z
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
        until (/usr/bin/mc config host add minio http://minio:9000 admin password) do
          echo '...waiting...' && sleep 1;
        done;
        /usr/bin/mc mb minio/warehouse;
        /usr/bin/mc policy set public minio/warehouse;
        tail -f /dev/null
      "
    networks:
      iceberg_net:

networks:
  iceberg_net:

I created some data with PyIceberg:

from pyiceberg.catalog import load_catalog
import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://localhost:8181",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
        "warehouse": "demo",
    },
)

catalog.create_namespace_if_not_exists("default")

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

tbl = catalog.create_table_if_not_exists("default.cities", schema=schema)

df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014},
    ],
)

tbl.append(df)

And queried with PyIceberg to verify if it's okay:

from pyiceberg.catalog import load_catalog
from pyiceberg.table import Table

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://localhost:8181/",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
        "warehouse": "demo",
    },
)

tbl: Table = catalog.load_table("default.cities")

res = tbl.scan().to_arrow()

print(len(res))

It returns 4.

And then with the Rust implementation:

use std::collections::HashMap;

use futures::TryStreamExt;
use iceberg::{
    io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY},
    Catalog, TableIdent,
};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};

#[tokio::main]
async fn main() {
    // Create catalog
    let config = RestCatalogConfig::builder()
        .uri("http://localhost:8181".to_string())
        .warehouse("demo".to_string())
        .props(HashMap::from([
            (S3_ENDPOINT.to_string(), "http://localhost:9000".to_string()),
            (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
            (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
            (S3_REGION.to_string(), "us-east-1".to_string()),
        ]))
        .build();

    let catalog = RestCatalog::new(config);

    let table = catalog
        .load_table(&TableIdent::from_strs(["default", "cities"]).unwrap())
        .await
        .unwrap();

    let scan = table.scan().select_all().build().unwrap();
    let batch_stream = scan.to_arrow().await.unwrap();

    dbg!(scan);

    let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

    dbg!(batches.len());
}

Its returning nothing.

We have to define the S3 configurations because the Tabular image does not return the S3 credentials during the get config process.

@ndrluis
Copy link
Contributor Author

ndrluis commented Jul 28, 2024

I performed another test using the Tabular catalog, attempting to scan the sandbox warehouse in the examples namespace, specifically targeting the nyc_taxi_yellow table, but it returned no results.

@ndrluis
Copy link
Contributor Author

ndrluis commented Jul 28, 2024

I found the problem. I don’t know how to solve it, but I will try.

The while let Some(Ok(task)) = tasks.next().await statement is hiding some errors. In my previous attempt, I was trying to run it without the S3 credentials and was not receiving the access denied error. This happens because tasks.next() returns the error but does not expose it to the user.

While testing with Tabular, I'm receiving a 403 error from S3. So, we have two issues to solve.

One is to expose the reading errors to the user, and the other is to understand why we are getting these access denied errors.

@ndrluis
Copy link
Contributor Author

ndrluis commented Jul 28, 2024

For the Tabular example, I encountered an 'access denied' problem. The FileIO does not work with remote signing. For the MinIO example, the problem was solved when I added a match statement to return the error while tasks.next().

@ndrluis
Copy link
Contributor Author

ndrluis commented Jul 28, 2024

@Xuanwo
Copy link
Member

Xuanwo commented Jul 29, 2024

Hi, does remote signing means presign in s3?

@Xuanwo
Copy link
Member

Xuanwo commented Jul 29, 2024

I'm guessing #498 should close this issue. Would you like to verify it?

@ndrluis
Copy link
Contributor Author

ndrluis commented Jul 29, 2024

@Xuanwo

Hi, does remote signing means presign in s3?

Yes and no. I'm not sure if this is the flow, because I haven't found any documentation; this is based on my understanding from reading the Python implementation.

It's a presign process, but it's not the client's responsibility to presign. The get config will return the s3.signer.uri, and the load table will return s3.remote-signing-enabled as true along with some other S3 configurations. With that, we need to "presign" using the token returned in the load table.

This is the specification for the server responsible for the signing: s3-signer-open-api.yaml

I'm guessing #498 should close this issue. Would you like to verify it?

I'm not comfortable closing this issue without a regression test that guarantees the expected behavior.

@liurenjie1024
Copy link
Collaborator

I'm not comfortable closing this issue without a regression test that guarantees the expected behavior.

+1 on this. Currently we don't have regression tests on the whole reading progress, which involves integrating with external systems such as spark.

@Xuanwo
Copy link
Member

Xuanwo commented Jul 29, 2024

It's a presign process, but it's not the client's responsibility to presign. The get config will return the s3.signer.uri, and the load table will return s3.remote-signing-enabled as true along with some other S3 configurations. With that, we need to "presign" using the token returned in the load table.

Got it. So, we need to support presign in the REST catalog. Could you help by creating an issue for this? I'll review this section and draft a plan for its implementation.

Currently we don't have regression tests on the whole reading progress, which involves integrating with external systems such as spark.

I think we can start with very basic tests like just scan the whole table.

@liurenjie1024
Copy link
Collaborator

I think we can start with very basic tests like just scan the whole table.

The reason I didn't start this yet is that I want to do it after integration with datafusion. Me and @ZENOTME did integration tests in icelake before, and I have to say that without sql engine support, it's painful to maintain those tests.

@Xuanwo
Copy link
Member

Xuanwo commented Jul 29, 2024

Me and @ZENOTME did integration tests in icelake before, and I have to say that without sql engine support, it's painful to maintain those tests.

I agree that we need a SQL engine to make testing easier.

However, maintaining basic unit tests based on fs or memory should be straightforward, right? We don't need separate test modules; just implement them as unit tests in the REST catalog. For example, it could be as simple as...

// catalog / file io setup, balbalba
let table = balabala();

let scan = table.scan().select_all().build().unwrap();
let batch_stream = scan.to_arrow().await.unwrap();

dbg!(scan);

let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

@ndrluis
Copy link
Contributor Author

ndrluis commented Jul 29, 2024

Got it. So, we need to support presign in the REST catalog. Could you help by creating an issue for this? I'll review this section and draft a plan for its implementation.

Issue #504 created

@liurenjie1024
Copy link
Collaborator

Me and @ZENOTME did integration tests in icelake before, and I have to say that without sql engine support, it's painful to maintain those tests.

I agree that we need a SQL engine to make testing easier.

However, maintaining basic unit tests based on fs or memory should be straightforward, right? We don't need separate test modules; just implement them as unit tests in the REST catalog. For example, it could be as simple as...

// catalog / file io setup, balbalba
let table = balabala();

let scan = table.scan().select_all().build().unwrap();
let batch_stream = scan.to_arrow().await.unwrap();

dbg!(scan);

let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

Correctly writing data into iceberg is not supported yet, so we need external systems such as spark to ingest data. Putting pre generated parquet files maybe an approach, but that requires maintaining binaries in repo.

@sdd
Copy link
Contributor

sdd commented Jul 31, 2024

Correctly writing data into iceberg is not supported yet, so we need external systems such as spark to ingest data. Putting pre generated parquet files maybe an approach, but that requires maintaining binaries in repo.

I've got some code in the perf testing branch that might help. It downloads NYC taxi data, and uses minio, the rest catalog and a spark container to create a table and insert NYC taxi data into it.

#497

@sdd
Copy link
Contributor

sdd commented Aug 10, 2024

I have fixed the issue where errors were not returned to the user, in #535

@Xuanwo
Copy link
Member

Xuanwo commented Aug 19, 2024

I believe this should have been fixed. Please feel free to open new issues if still exists.

@Xuanwo Xuanwo closed this as completed Aug 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants