Skip to content

Commit

Permalink
Merge branch 'main' into aug-21-continue-1
Browse files Browse the repository at this point in the history
  • Loading branch information
KeranYang authored Aug 23, 2023
2 parents a4225e0 + 7068590 commit 072b9ca
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
3 changes: 3 additions & 0 deletions docs/user-guide/reference/join-vertex.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Joins and Cycles

As of Numaflow v0.10, Pipeline Edges can be defined such that multiple Vertices send to a single vertex. This includes:

- UDF Map Vertices
- UDF Reduce Vertices
- Sink Vertices

Please see the following examples:

- [Join on Map Vertex](https://github.com/numaproj/numaflow/blob/main/examples/11-join-on-map.yaml)
- [Join on Reduce Vertex](https://github.com/numaproj/numaflow/blob/main/examples/11-join-on-reduce.yaml)
- [Join on Sink Vertex](https://github.com/numaproj/numaflow/blob/main/examples/11-join-on-sink.yaml)
Expand All @@ -17,5 +19,6 @@ A special case of a "Join" is a **Cycle** (a Vertex which can send either to its
Cycles are permitted, except in the case that there's a Reduce Vertex at or downstream of the cycle. (This is because a cycle inevitably produces late data, which would get dropped by the Reduce Vertex. For this reason, cycles should be used sparingly.)

The following examples are of Cycles:

- [Cycle to Self](https://github.com/numaproj/numaflow/blob/main/examples/10-cycle-to-self.yaml)
- [Cycle to Previous](https://github.com/numaproj/numaflow/blob/main/examples/10-cycle-to-prev.yaml)
2 changes: 1 addition & 1 deletion pkg/sinks/udsink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ func (s *UserDefinedSink) Write(ctx context.Context, messages []isb.Message) ([]
msgs := make([]*sinkpb.SinkRequest, len(messages))
for i, m := range messages {
msgs[i] = &sinkpb.SinkRequest{
// NOTE: key is not used anywhere ATM
Id: m.ID,
Value: m.Payload,
Keys: m.Keys,
EventTime: timestamppb.New(m.EventTime),
// Watermark is only available in readmessage....
Watermark: timestamppb.New(time.Time{}), // TODO: insert the correct watermark
Expand Down

0 comments on commit 072b9ca

Please sign in to comment.