Skip to content

Commit 303fc31

Browse files
authored
docs: add documentations for udsource (numaproj#1142)
Signed-off-by: Keran Yang <[email protected]>
1 parent a7adee1 commit 303fc31

File tree

16 files changed

+82
-63
lines changed

16 files changed

+82
-63
lines changed

docs/core-concepts/watermarks.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ an API. Watermark API is supported in all our client SDKs.
4040
4141
```go
4242
// Go
43-
func handle(ctx context.Context, key string, data funcsdk.Datum) funcsdk.Messages {
44-
_ = data.EventTime() // Event time
45-
_ = data.Watermark() // Watermark
46-
... ...
43+
func mapFn(context context.Context, keys []string, d mapper.Datum) mapper.Messages {
44+
_ = d.EventTime() // Event time
45+
_ = d.Watermark() // Watermark
46+
... ...
4747
}
4848
```

docs/development/development.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ kind export kubeconfig
4747
Build container image, and import it to `k3d`, `kind`, or `minikube` cluster if corresponding `KUBECONFIG` is sourced.
4848

4949
- `make docs`
50-
Convert the docs to Github pages, check if there's any error.
50+
Convert the docs to GitHub pages, check if there's any error.
5151

5252
- `make docs-serve`
5353
Start [an HTTP server](http://127.0.0.1:8000/) on your local to host the docs generated Github pages.

docs/development/releasing.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ Always create a release branch for the releases, for example branch `release-0.5
77
## Release Steps
88

99
1. Cherry-pick fixes to the release branch, skip this step if it's the first release in the branch.
10-
1. Run `make test` to make sure all test test cases pass locally.
10+
1. Run `make test` to make sure all test cases pass locally.
1111
1. Push to remote branch, and make sure all the CI jobs pass.
1212
1. Run `make prepare-release VERSION=v{x.y.z}` to update version in manifests, where `x.y.x` is the expected new version.
1313
1. Follow the output of last step, to confirm if all the changes are expected, and then run `make release VERSION=v{x.y.z}`.
14-
1. Follow the output, push a new tag to the release branch, Github actions will automatically build and publish the new release, this will take around 10 minutes.
14+
1. Follow the output, push a new tag to the release branch, GitHub actions will automatically build and publish the new release, this will take around 10 minutes.
1515
1. Test the new release, make sure everything is running as expected, and then recreate a `stable` tag against the latest release.
1616
```shell
1717
git tag -d stable

docs/operations/installation.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ To do managed namespace installation, besides `--namespaced`, add `--managed-nam
7474
7575
By default, the Numaflow controller is installed with `Active-Passive` HA strategy enabled, which means you can run the controller with multiple replicas (defaults to 1 in the manifests).
7676
77-
To turn off HA, add following environment variable to the deployment spec.
77+
To turn off HA, add the following environment variable to the deployment spec.
7878
7979
```
8080
name: NUMAFLOW_LEADER_ELECTION_DISABLED

docs/specifications/edges-buffers-buckets.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
`Edge` is the connection between the vertices, specifically, `edge` is defined in the pipeline spec under `.spec.edges`. No matter if the `to` vertex is a Map, or a Reduce with multiple partitions, it is considered as one edge.
88

9-
In the following pipeline , there are 3 edges defined (`in` - `aoti`, `aoti` - `compute-sum`, `compute-sum` - `out`).
9+
In the following pipeline, there are 3 edges defined (`in` - `aoti`, `aoti` - `compute-sum`, `compute-sum` - `out`).
1010

1111
```yaml
1212
apiVersion: numaflow.numaproj.io/v1alpha1
@@ -54,9 +54,9 @@ Each `edge` could have a name for internal usage, the naming convention is `{pip
5454

5555
`Buffer` is `InterStepBuffer`. Each buffer has an owner, which is the vertex who reads from it. Each `udf` and `sink` vertex in a pipeline owns a group of partitioned buffers. Each buffer has a name with the naming convention `{pipeline-name}-{vertex-name}-{index}`, where the `index` is the partition index, starting from 0. This naming convention applies to the buffers of both map and reduce udf vertices.
5656

57-
When multiple vertices connecting to the same vertex, if the `to` vertex is a Map, the data from all the from vertices will be forwarded to the group of partitoned buffers round-robinly. If the `to` vertex is a Reduce, the data from all the from vertices will be forwarded to the group of partitoned buffers based on the partitioning key.
57+
When multiple vertices connecting to the same vertex, if the `to` vertex is a Map, the data from all the from vertices will be forwarded to the group of partitoned buffers round-robinly. If the `to` vertex is a Reduce, the data from all the from vertices will be forwarded to the group of partitioned buffers based on the partitioning key.
5858

59-
A Source vertex does not have any owned buffers. But a pipeline may have multiple Source vertices, followed by one vertex. Same as above, if the following vertex is a map, the data from all the Source vertices will be forwarded to the group of partitoned buffers round-robinly. If it is a reduce, the data from all the Source vertices will be forwarded to the group of partitoned buffers based on the partitioning key.
59+
A Source vertex does not have any owned buffers. But a pipeline may have multiple Source vertices, followed by one vertex. Same as above, if the following vertex is a map, the data from all the Source vertices will be forwarded to the group of partitioned buffers round-robinly. If it is a reduce, the data from all the Source vertices will be forwarded to the group of partitioned buffers based on the partitioning key.
6060

6161
## Buckets
6262

docs/specifications/overview.md

+7-7
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ Logic:
111111
**Matrix of Operations**
112112

113113
| | Source | Processor | Sink |
114-
| -------------- |------------------| ------------ |---------------|
114+
|----------------|------------------|--------------|---------------|
115115
| ReadFromBuffer | Read From Source | Generic | Generic |
116116
| CallUDF | Void | User Defined | Void |
117117
| Forward | Generic | Generic | Write To Sink |
@@ -124,7 +124,7 @@ Logic:
124124
- Numaflow is restartable if aborted or steps fail while preserving
125125
exactly-once semantics.
126126
- Do not generate more output than can be used by the next stage in a
127-
reasonable amount of time, i.e. the size of buffers between steps
127+
reasonable amount of time, i.e., the size of buffers between steps
128128
should be limited, (aka backpressure).
129129
- User code should be isolated from offset management, restart, exactly once, backpressure, etc.
130130
- Streaming process systems inherently require a concept of time, this
@@ -144,7 +144,7 @@ Logic:
144144
![Tree Dag](../assets/tree_dag.png)
145145
- Diamond (In Future)
146146
![Diamond Dag](../assets/diamond_dag.png)
147-
- Multiple Sources with same schema (In Future)
147+
- Multiple Sources with the same schema (In Future)
148148
![Multi Source Dag](../assets/multi_source_dag.png)
149149

150150
## Non-Requirements
@@ -160,7 +160,7 @@ Logic:
160160

161161
- In order to be able to support various buffering technologies, we
162162
will persist and manage stream "offsets" rather than relying on
163-
the buffering technology (e.g. Kafka)
163+
the buffering technology (e.g., Kafka)
164164
- Each processor may persist state associated with their processing
165165
no distributed transactions are needed for checkpointing
166166
- If we have a tree DAG, how will we manage acknowledgments? We
@@ -217,14 +217,14 @@ To detect duplicates, make sure the delivery is Exactly-Once:
217217
### Unique Identifier for Message
218218

219219
To detect duplicates, we first need to uniquely identify each message.
220-
We will be relying on the "identifier" available (eg, "offset" in Kafka)
220+
We will be relying on the "identifier" available (e.g., "offset" in Kafka)
221221
in the buffer to uniquely identify each message. If such an identifier
222-
is not available, we will be creating an unique identifier (sequence
222+
is not available, we will be creating a unique identifier (sequence
223223
numbers are tough because there are multiple readers). We can use this
224224
unique identifier to ensure that we forward only if the message has not
225225
been forwarded yet. We will only look back for a fixed window of time
226226
since this is a stream processing application on an unbounded stream of
227-
data and we do not have infinite resources.
227+
data, and we do not have infinite resources.
228228

229229
The same offset will not be used across all the steps in Numaflow, but
230230
we will be using the current offset only while forwarding to the next

docs/user-guide/reference/conditional-forwarding.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Conditional Forwarding
22

33
After processing the data, conditional forwarding is doable based on the `Tags` returned in the result.
4-
Below is list of different logic operations that can be done on tags.
4+
Below is a list of different logic operations that can be done on tags.
55
- **and** - forwards the message if all the tags specified are present in Message's tags.
66
- **or** - forwards the message if one of the tags specified is present in Message's tags.
77
- **not** - forwards the message if all the tags specified are not present in Message's tags.

docs/user-guide/reference/multi-partition.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ that the JetStream is provisioned with more nodes to support higher throughput.
77
Since partitions are owned by the vertex reading the data, to create a multi-partitioned edge
88
we need to configure the vertex reading the data (to-vertex) to have multiple partitions.
99

10-
The following code snippet provides an example of how to configure a vertex (in this case, the `cat` vertex) to have multiple partitions, which enables it (`cat` vertex) to read at a higher throughput.
10+
The following code snippet provides an example of how to configure a vertex (in this case, the `cat` vertex) to have multiple partitions, which enables it (`cat` vertex) to read at a higher throughput.
1111

1212
```yaml
1313
- name: cat

docs/user-guide/reference/side-inputs.md

+1-3
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
For an unbounded pipeline in Numaflow that never terminates, there are many cases where users want to update a configuration of the UDF without restarting the pipeline. Numaflow enables it by the `Side Inputs` feature where we can broadcast changes to vertices automatically.
44
The `Side Inputs` feature achieves this by allowing users to write custom UDFs to broadcast changes to the vertices that are listening in for updates.
55

6-
7-
86
### Using Side Inputs in Numaflow
97
The Side Inputs are updated based on a cron-like schedule,
108
specified in the pipeline spec with a trigger field.
@@ -74,7 +72,7 @@ func handle(_ context.Context) sideinputsdk.Message {
7472
return sideinputsdk.BroadcastMessage([]byte(val))
7573
}
7674
```
77-
Similarly, this can be written in [Python](https://github.com/numaproj/numaflow-python/blob/main/examples/sideinput/simple-sideinput/example.py)
75+
Similarly, this can be written in [Python](https://github.com/numaproj/numaflow-python/blob/main/examples/sideinput/simple-sideinput/example.py)
7876
and [Java](https://github.com/numaproj/numaflow-java/blob/main/examples/src/main/java/io/numaproj/numaflow/examples/sideinput/simple/SimpleSideInput.java) as well.
7977

8078
After performing the retrieval/update, the side input value is then broadcasted to all vertices that use the side input.

docs/user-guide/sinks/overview.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
The Sink serves as the endpoint for processed data that has been outputted from the platform,
44
which is then sent to an external system or application. The purpose of the Sink is to deliver
55
the processed data to its ultimate destination, such as a database, data warehouse, visualization
6-
tool, or alerting system. It's the opposite of the Source vettex, which receives input data into the platform.
6+
tool, or alerting system. It's the opposite of the Source vertex, which receives input data into the platform.
77
Sink vertex may require transformation or formatting of data prior to sending it to the target system. Depending on the
88
target system's needs, this transformation can be simple or complex.
99

@@ -18,7 +18,7 @@ Numaflow currently supports the following Sinks
1818

1919
A user-defined sink is a custom Sink that a user can write using Numaflow SDK when
2020
the user needs to output the processed data to a system or using a certain transformation that is not
21-
supported by the platform's built-in sinks. As an example, once we have processed the input messages,
21+
supported by the platform's built-in sinks. As an example, once we have processed the input messages,
2222
we can use Elasticsearch as a User defined sink to store the processed data and enable search and
23-
analysis on the data.
23+
analysis on the data.
2424

docs/user-guide/sinks/user-defined-sinks.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# User Defined Sinks
22

3-
A `Pipeline` may have multiple Sinks, those sinks could either be a pre-defined sink such as `kafka`, `log`, etc, or a `User Defined Sink`.
3+
A `Pipeline` may have multiple Sinks, those sinks could either be a pre-defined sink such as `kafka`, `log`, etc., or a `User Defined Sink`.
44

55
A pre-defined sink vertex runs single-container pods, a user defined sink runs two-container pods.
66

docs/user-guide/sources/overview.md

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
# Sources
22

33
Source vertex is responsible for reliable reading data from an unbounded source into Numaflow.
4+
Source vertex may require [transformation](./transformer/overview.md) or formatting of data prior to sending it to the output buffers.
5+
Source Vertex also does [Watermark](../../core-concepts/watermarks.md) tracking and late data detection.
46

5-
In Numaflow, we currently support the following builtin sources
7+
In Numaflow, we currently support the following sources
68

79
* [Kafka](./kafka.md)
810
* [HTTP](./http.md)
911
* [Ticker](./generator.md)
1012
* [Nats](./nats.md)
13+
* [User Defined Source](./user-defined-sources.md)
1114

12-
Source Vertex also does [Watermark](../../core-concepts/watermarks.md) tracking and late data detection.
15+
A user defined source is a custom source that a user can write using Numaflow SDK when
16+
the user needs to read data from a system that is not supported by the platform's built-in sources.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# User Defined Sources
2+
3+
A `Pipeline` may have multiple Sources, those sources could either be a pre-defined source such as `kafka`, `http`, etc., or a `User Defined Source`.
4+
5+
With no source data transformer, A pre-defined source vertex runs single-container pods; a user-defined source runs two-container pods.
6+
7+
## Build Your Own User Defined Sources
8+
9+
You can build your own user defined sources in multiple languages.
10+
11+
Check the links below to see the examples for different languages.
12+
13+
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/sourcer/examples/simple_source/)
14+
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/)
15+
16+
After building a docker image for the written user-defined source, specify the image as below in the vertex spec.
17+
18+
```yaml
19+
spec:
20+
vertices:
21+
- name: input
22+
source:
23+
udsource:
24+
container:
25+
image: my-source:latest
26+
```
27+
28+
## Available Environment Variables
29+
30+
Some environment variables are available in the user defined source container:
31+
32+
- `NUMAFLOW_NAMESPACE` - Namespace.
33+
- `NUMAFLOW_POD` - Pod name.
34+
- `NUMAFLOW_REPLICA` - Replica index.
35+
- `NUMAFLOW_PIPELINE_NAME` - Name of the pipeline.
36+
- `NUMAFLOW_VERTEX_NAME` - Name of the vertex.

docs/user-guide/user-defined-functions/map/map.md

+10-30
Original file line numberDiff line numberDiff line change
@@ -8,33 +8,13 @@ There are some [Built-in Functions](builtin-functions/README.md) that can be use
88

99
## Build Your Own UDF
1010

11-
You can build your own UDF in multiple languages. A User Defined Function could be as simple as below in Golang.
12-
13-
```golang
14-
package main
15-
16-
import (
17-
"context"
18-
19-
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
20-
"github.com/numaproj/numaflow-go/pkg/function/server"
21-
)
22-
23-
func mapHandle(_ context.Context, keys []string, d functionsdk.Datum) functionsdk.Messages {
24-
// Directly forward the input to the output
25-
return functionsdk.MessagesBuilder().Append(functionsdk.NewMessage(d.Value()).WithKeys(keys))
26-
}
27-
28-
func main() {
29-
server.New().RegisterMapper(functionsdk.MapFunc(mapHandle)).Start(context.Background())
30-
}
31-
```
11+
You can build your own UDF in multiple languages.
3212

3313
Check the links below to see the UDF examples for different languages.
3414

35-
- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/function)
36-
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples)
37-
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/function)
15+
- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/map/)
16+
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/mapper/examples/)
17+
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/map/)
3818

3919
After building a docker image for the written UDF, specify the image as below in the vertex spec.
4020

@@ -49,8 +29,8 @@ spec:
4929
5030
### Streaming Mode
5131
52-
In cases the map function generates more than one outputs (e.g. flat map), the UDF can be
53-
configured to run in a streaming mode instead of batching which is the default mode.
32+
In cases the map function generates more than one output (e.g., flat map), the UDF can be
33+
configured to run in a streaming mode instead of batching, which is the default mode.
5434
In streaming mode, the messages will be pushed to the downstream vertices once generated
5535
instead of in a batch at the end. The streaming mode can be enabled by setting the annotation
5636
`numaflow.numaproj.io/map-stream` to `true` in the vertex spec.
@@ -68,13 +48,13 @@ spec:
6848

6949
Check the links below to see the UDF examples in streaming mode for different languages.
7050

71-
- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/function/flatmap_stream)
72-
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/flatmap_stream)
73-
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/function/map/flatmapstream)
51+
- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/mapstream/flatmap_stream/)
52+
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/mapstreamer/examples/flatmap_stream/)
53+
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/mapstream/flatmapstream/)
7454

7555
### Available Environment Variables
7656

77-
Some environment variables are available in the user defined function container, they might be useful in you own UDF implementation.
57+
Some environment variables are available in the user defined function container, they might be useful in your own UDF implementation.
7858

7959
- `NUMAFLOW_NAMESPACE` - Namespace.
8060
- `NUMAFLOW_POD` - Pod name.

docs/user-guide/user-defined-functions/reduce/reduce.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44

55
Reduce is one of the most commonly used abstractions in a stream processing pipeline to define
66
aggregation functions on a stream of data. It is the reduce feature that helps us solve problems like
7-
"performs a summary operation(such as counting the number of occurrence of a key, yielding user login
8-
frequencies), etc."Since the input an unbounded stream (with infinite entries), we need an additional
7+
"performs a summary operation(such as counting the number of occurrences of a key, yielding user login
8+
frequencies), etc. "Since the input is an unbounded stream (with infinite entries), we need an additional
99
parameter to convert the unbounded problem to a bounded problem and provide results on that. That
1010
bounding condition is "time", eg, "number of users logged in per minute". So while processing an
1111
unbounded stream of data, we need a way to group elements into finite chunks using time. To build these
12-
chunks the reduce function is applied to the set of records produced using the concept of [windowing](./windowing/windowing.md).
12+
chunks, the reduce function is applied to the set of records produced using the concept of [windowing](./windowing/windowing.md).
1313

1414
## Reduce Pseudo code
1515

@@ -63,7 +63,7 @@ The reduce supports parallelism processing by defining a `partitions` in the ver
6363

6464
It is wrong to give a `partitions` > `1` if it is a _non-keyed_ vertex (`keyed: false`).
6565

66-
There are a couple of [examples](examples.md) that demonstrates Fixed windows, Sliding windows,
66+
There are a couple of [examples](examples.md) that demonstrate Fixed windows, Sliding windows,
6767
chaining of windows, keyed streams, etc.
6868

6969
## Time Characteristics

mkdocs.yml

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ nav:
5151
- user-guide/sources/http.md
5252
- user-guide/sources/kafka.md
5353
- user-guide/sources/nats.md
54+
- user-guide/sources/user-defined-sources.md
5455
- Data Transformer:
5556
- Overview: "user-guide/sources/transformer/overview.md"
5657
- Built-in Transformers:

0 commit comments

Comments
 (0)