Skip to content

Commit

Permalink
xxx-delay
Browse files Browse the repository at this point in the history
  • Loading branch information
ingomueller-net committed Feb 22, 2023
1 parent 634ed20 commit 9680a57
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions experimental/iterators/test/python/dialects/iterators/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import ctypes
import os
import sys
from tempfile import NamedTemporaryFile
import time

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -249,3 +251,46 @@ def testArrowCSVInput():
# CHECK-NEXT: (10, 510, 1010, 1510, 2510, 3010)
# CHECK-NEXT: (35, 535, 1035, 1535, 2535, 3035)
sum_batches_elementwise_with_iterators(reader)


# Test case: Read from a sequence of Arrow arrays/record batches (produced by a
# Python generator).


# Create a generator that produces single-row record batches with increasing
# numbers with an artificial delay of one second after each of them. Since each
# generated record batch immediately produces output, this visually demonstrate
# that the consumption by the MLIR-based iterators interleaves with the
# Python-based production of the record batches in the stream.
def generate_batches_with_delay(schema: pa.Schema) -> None:
for i in range(5):
arrays = [
pa.array(np.array([i], field.type.to_pandas_dtype()))
for field in schema
]
batch = pa.RecordBatch.from_arrays(arrays, schema=schema)
yield batch
# Sleep only when a TTY is attached (in order not to delay unit tests).
if sys.stdout.isatty():
time.sleep(1)


# CHECK-LABEL: TEST: testGeneratorInput
@run
def testGeneratorInput():
# Use pyarrow to create an Arrow table in memory.
table = create_test_input()

# Make physically separate batches from the table. (This ensures offset=0).
generator = generate_batches_with_delay(table.schema)

# Create a RecordBatchReader and export it as a C struct.
reader = pa.RecordBatchReader.from_batches(table.schema, generator)

# Hand the reader as an Arrow array stream to the Iterators test program.
# CHECK-NEXT: (0, 0, 0, 0, 0, 0, 0)
# CHECK-NEXT: (1, 1, 1, 1, 1, 1, 1)
# CHECK-NEXT: (2, 2, 2, 2, 2, 2, 2)
# CHECK-NEXT: (3, 3, 3, 3, 3, 3, 3)
# CHECK-NEXT: (4, 4, 4, 4, 4, 4, 4)
sum_batches_elementwise_with_iterators(reader)

0 comments on commit 9680a57

Please sign in to comment.