Skip to content

Commit

Permalink
Python topic publisher (#19)
Browse files Browse the repository at this point in the history
feat: publish and query support for Python API

- Added topic publishing to Python bindings.
- cleaned up Python typing and documentation.
- added more Python API documentation
- changed cargo deny to ignore private crates
  • Loading branch information
Dexter Duckworth authored Aug 4, 2023
1 parent b9fb2d5 commit baae5e8
Show file tree
Hide file tree
Showing 36 changed files with 1,119 additions and 132 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
# Rust
target

# pyenv overrides
# Python
*.so
__pycache__
.python-version
14 changes: 14 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,17 @@ This should:
- Create a tag for the new version

Once the new tag is pushed to the repo, a new Github release and Docker image will be generated automatically by Github Actions.

## Generating Python documentation

Generating Python documentation requires pdoc:

```shell
pip install pdoc
```

To view the documentation, run:

```shell
pdoc ella -d google
```
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
[workspace]
exclude = ["ella-cli"]
members = [
"ella-common",
"ella-tensor",
Expand All @@ -9,6 +8,7 @@ members = [
"ella-cli",
"ella",
"pyella",
"pyella/generate_typing",
]
resolver = "2"

Expand Down
2 changes: 1 addition & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ exceptions = [
# published to private registries.
# To see how to mark a crate as unpublished (to the official registry),
# visit https://doc.rust-lang.org/cargo/reference/manifest.html#the-publish-field.
ignore = false
ignore = true
# One or more private registries that you might publish crates to, if a crate
# is only published to private registries, and ignore is true, the crate will
# not have its license(s) checked
Expand Down
16 changes: 16 additions & 0 deletions ella-engine/src/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ impl Lazy {
Ok(self.stream().await?.rows())
}

pub fn limit(mut self, limit: usize) -> crate::Result<Self> {
self.plan = self.plan.try_map(|plan| {
LogicalPlanBuilder::from(plan)
.limit(0, Some(limit))?
.build()
})?;
Ok(self)
}

pub fn skip(mut self, skip: usize) -> crate::Result<Self> {
self.plan = self
.plan
.try_map(|plan| LogicalPlanBuilder::from(plan).limit(skip, None)?.build())?;
Ok(self)
}

pub fn col<T, S>(&self, col: &str) -> crate::Result<Column<T, S>>
where
T: TensorValue,
Expand Down
12 changes: 12 additions & 0 deletions ella-engine/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ impl Plan {
};
self
}

pub fn try_map<F, E>(mut self, f: F) -> crate::Result<Self>
where
F: FnOnce(LogicalPlan) -> Result<LogicalPlan, E>,
crate::Error: From<E>,
{
self.inner = match self.inner {
PlanInner::Resolved(plan) => PlanInner::Resolved(f(plan)?),
PlanInner::Stub(plan) => PlanInner::Stub(f(plan)?),
};
Ok(self)
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
2 changes: 1 addition & 1 deletion ella-engine/src/registry/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ fn encode_uuid_to_path(uuid: Uuid, root: &Path, prefix: Option<&str>, ext: &str)
}

// TODO: replace UUID logic once v7 is stabilized
// See: [BlackrockNeurotech/ella#11](https://github.com/BlackrockNeurotech/ella/issues/11)
// See: [CerebusOSS/ella#11](https://github.com/CerebusOSS/ella/issues/11)
// Current implementation is taken from here: https://github.com/uuid-rs/uuid/blob/main/src/timestamp.rs

fn new_uuid_now() -> Uuid {
Expand Down
22 changes: 14 additions & 8 deletions ella-engine/src/table/topic/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,7 @@ impl Clone for PublisherInner {

impl Drop for PublisherInner {
fn drop(&mut self) {
if self.is_active {
let active = self.active.fetch_sub(1, Ordering::Release) - 1;
if active == 0 {
self.stop.notify_one();
}
}
self.deactivate();
}
}

Expand All @@ -132,6 +127,16 @@ impl PublisherInner {
is_active,
}
}

fn deactivate(&mut self) {
if self.is_active {
self.is_active = false;
let active = self.active.fetch_sub(1, Ordering::Release) - 1;
if active == 0 {
self.stop.notify_one();
}
}
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -170,12 +175,13 @@ impl Sink<RecordBatch> for Publisher {
self.inner.rw.poll_flush_unpin(cx)
}

#[inline]
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
self.inner.rw.poll_close_unpin(cx)
let res = futures::ready!(self.inner.rw.poll_close_unpin(cx));
self.inner.deactivate();
Poll::Ready(res)
}
}

Expand Down
2 changes: 1 addition & 1 deletion ella-tensor/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub trait Column: Debug + Send + Sync {

fn row_shape(&self) -> Option<Dyn> {
let shape = self.shape();
if shape.ndim() > 0 {
if shape.ndim() > 1 {
Some(shape.remove_axis(Axis(0)))
} else {
None
Expand Down
43 changes: 38 additions & 5 deletions ella-tensor/src/frame/data_frame.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::{fmt::Display, ops::Deref, sync::Arc};

use arrow::record_batch::RecordBatch;
use arrow::{
datatypes::{Fields, Schema},
record_batch::RecordBatch,
};
use ella_common::row::RowFormat;

use crate::{NamedColumn, Shape, Tensor, TensorValue};
use crate::{tensor_schema, NamedColumn, Shape, Tensor, TensorValue};

use super::{batch_to_columns, frame_to_batch, print::print_frames, Frame};

Expand Down Expand Up @@ -40,22 +43,52 @@ impl DataFrame {
R::view(batch.num_rows(), &batch.schema().fields, batch.columns())
}

pub fn ncols(&self) -> usize {
self.columns.len()
}

pub fn nrows(&self) -> usize {
self.rows
}

pub fn column(&self, i: usize) -> &NamedColumn {
&self.columns[i]
}

pub fn pretty_print(&self) -> impl Display + '_ {
print_frames(&[self])
}

pub fn arrow_schema(&self) -> Schema {
Schema::new(
self.columns()
.map(|col| {
Arc::new(tensor_schema(
col.name().to_string(),
col.tensor_type(),
col.row_shape(),
col.nullable(),
))
})
.collect::<Fields>(),
)
}
}

impl Frame for DataFrame {
#[inline]
fn ncols(&self) -> usize {
self.columns.len()
self.ncols()
}

#[inline]
fn nrows(&self) -> usize {
self.rows
self.nrows()
}

#[inline]
fn column(&self, i: usize) -> &NamedColumn {
&self.columns[i]
self.column(i)
}
}

Expand Down
4 changes: 4 additions & 0 deletions ella/src/table/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ impl Publisher {
let schema = self.arrow_schema.clone();
RowSink::try_new(self, schema, buffer)
}

pub fn arrow_schema(&self) -> &SchemaRef {
&self.arrow_schema
}
}

impl Debug for Publisher {
Expand Down
3 changes: 3 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ generate_release_notes version:

generate_readme:
cd ella; cargo rdme

generate_python:
cargo run -p generate_typing --bin generate_data_types
2 changes: 0 additions & 2 deletions pyella/.gitignore

This file was deleted.

1 change: 1 addition & 0 deletions pyella/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ thiserror = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing-subscriber = { workspace = true }

mimalloc = { workspace = true, optional = true }

Expand Down
46 changes: 36 additions & 10 deletions pyella/ella/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,47 @@
# ruff: noqa: E402, F403
__all__ = [
"types",
"table",
"frame",
"open",
"connect",
"column",
"topic",
"Ella",
"data_types",
"now",
"bool_",
"int8",
"int16",
"int32",
"int64",
"uint8",
"uint16",
"uint32",
"uint64",
"float32",
"float64",
"timestamp",
"duration",
"string",
]

from maturin import import_hook as __import_hook
__import_hook.install()
from . import types, table, frame

def __add_submodule(path, src):
import sys
sys.modules[path] = src
from .types import (
bool_,
int8,
int16,
int32,
int64,
uint8,
uint16,
uint32,
uint64,
float32,
float64,
timestamp,
duration,
string,
)

from ella._internal import open, connect, column, topic, Ella, data_types
from ella._internal.type_defs import *

__add_submodule("ella.data_types", data_types)
from ella._internal import open, connect, column, topic, Ella, now
Loading

0 comments on commit baae5e8

Please sign in to comment.