Returns a list of all registered EventTypes
The goal of Nakadi (ნაკადი means “stream” in Georgian) is to provide an event broker infrastructure to:
Abstract event delivery via a secured RESTful API.
This allows microservices teams to maintain service boundaries, and not directly depend on any specific message broker technology. Access can be managed individually for every Event Type and secured using OAuth and custom authorization plugins.
Enable convenient development of event-driven applications and asynchronous microservices.
Event types can be defined with Event type schemas and managed via a registry. All events will be validated against the schema before publishing the event type. It allows to granite the data quality and data consistency for the data consumers.
Efficient low latency event delivery.
Once a publisher sends an event using a simple HTTP POST, consumers can be pushed to via a streaming HTTP connection, allowing near real-time event processing. The consumer connection has keepalive controls and support for managing stream offsets using subscriptions.

Read more to understand The big picture Architecture for data integration
Watch the talk Data Integration in the World of Microservices
Nakadi is high-load production ready. Zalando uses Nakadi as its central Event Bus Service. Nakadi reliably handles the traffic from thousands event types with the throughput of more than hundreds gigabytes per second. The project is in active development. See the changelog
Read more about latest development in our Changelog
The zalando-nakadi organisation contains many useful related projects like
An event type can be created by posting to the event-types resource.
curl -v -XPOST http://localhost:8080/event-types -H "Content-type: application/json" -d '{
"name": "order.ORDER_RECEIVED",
"owning_application": "order-service",
"category": "undefined",
"partition_strategy": "random",
"schema": {
"type": "json_schema",
"schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
}
}'
Events for an event type can be published by posting to its “events” collection:
curl -v -XPOST http://localhost:8080/event-types/order.ORDER_RECEIVED/events -H "Content-type: application/json" -d '[
{
"order_number": "24873243241",
"metadata": {
"eid": "d765de34-09c0-4bbb-8b1e-7160a33a0791",
"occurred_at": "2016-03-15T23:47:15+01:00"
}
}, {
"order_number": "24873243242",
"metadata": {
"eid": "a7671c51-49d1-48e6-bb03-b50dcf14f3d3",
"occurred_at": "2016-03-15T23:47:16+01:00"
}
}]'
HTTP/1.1 200 OK
You can open a stream for an Event Type via the events sub-resource:
curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events
HTTP/1.1 200 OK
{"cursor":{"partition":"0","offset":"4"},"events":[{"order_number": "ORDER_001", "metadata": {"eid": "4ae5011e-eb01-11e5-8b4a-1c6f65464fc6", "occurred_at": "2016-03-15T23:56:11+01:00"}}]}
{"cursor":{"partition":"0","offset":"5"},"events":[{"order_number": "ORDER_002", "metadata": {"eid": "4bea74a4-eb01-11e5-9efa-1c6f65464fc6", "occurred_at": "2016-03-15T23:57:15+01:00"}}]}
{"cursor":{"partition":"0","offset":"6"},"events":[{"order_number": "ORDER_003", "metadata": {"eid": "4cc6d2f0-eb01-11e5-b606-1c6f65464fc6", "occurred_at": "2016-03-15T23:58:15+01:00"}}]}
There is a large ecosystem of projects around Nakadi. Check they out on zalando-nakadi
In this section we’ll walk through running a Nakadi service on your machine. Once you have the service up and running, you can jump to Using Nakadi to see how produce and consume messages.
You can run a Nakadi service locally using Docker. If you don’t have Docker installed, there are great instructions available on the Docker website.
From the project’s home directory you can install and start a Nakadi container
via the gradlew command:
./gradlew startNakadi
This will start a docker container for the Nakadi server and another container
with its PostgreSQL, Kafka and Zookeeper dependencies. You can read more about
the gradlew script in the Building and Developing section
To stop the running Nakadi:
./gradlew stopNakadi
If you’re having trouble getting started, you might find an answer in the Frequently Asked Questions (FAQ) section of the documentation.
Some ports need to be available to run the service:
They allow the services to communicate with each other and should not be used by other applications.
Since Docker for Mac OS runs inside Virtual Box, you will want to expose some ports first to allow Nakadi to access its dependencies:
docker-machine ssh default \
-L 9092:localhost:9092 \
-L 8080:localhost:8080 \
-L 5432:localhost:5432 \
-L 2181:localhost:2181
Alternatively you can set up port forwarding on the “default” machine through its network settings in the VirtualBox UI.

If you get the message “Is the docker daemon running on this host?” but you know Docker and VirtualBox are running, you might want to run this command:
eval "$(docker-machine env default)"
Note: Docker for Mac OS (previously in beta) version 1.12 (1.12.0 or 1.12.1) currently is not supported due to the bug in networking host configuration.
The Nakadi API allows the publishing and consuming of events over HTTP.
A good way to think of events is that they are like messages in a stream processing or queuing system, but have a defined structure that can be understood and validated. The object containing the information describing an event is called an event type.
To publish and consume events, an owning application must first register a new event type with Nakadi. The event type contains information such as its name, the aforementioned owning application, strategies for partitioning and enriching data, and a JSON Schema. Nakadi supports an event type registry API that lists all the available event types.
Once the event type is created, a resource called a stream becomes available for that event type. The stream will accept events for the type from a producer and can be read from by one or more consumers. Nakadi can validate each event that is sent to the stream.
An event type’s stream can be divided into one or more partitions. Each event is placed into exactly one partition. Each partition represents an ordered log - once an event is added to a partition its position is never changed, but there is no global ordering across partitions [1].
Consumers can read events and track their position in the stream using a cursor that is given to each partition. Consumers can also use a cursor to read from a stream at a particular position. Multiple consumers can read from the same stream, allowing different applications to read the stream simultaneously.
In summary, applications using Nakadi can be grouped as follows:
Event Type Owners: Event type owners interact with Nakadi via the event type registry to define event types based on a schema and create event streams.
Event Producers: Producers publish events to the event type’s stream, that conform to the event type’s schema.
Event Consumers: Consumers read events from the event stream. Multiple consumers can read from the same stream.
[1] For more detail on partitions and the design of streams see “The Log” by Jay Kreps.
By default the events resource will consume from all partitions of an event
type and from the end (or “tail”) of the stream. To select only particular
partitions and a position where in the stream to start, you can supply
an X-Nakadi-Cursors header in the request:
curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events \
-H 'X-Nakadi-Cursors: [{"partition": "0", "offset":"12"}]'
The header value is a JSON array of cursors. Each cursor in the array describes its partition for the stream and an offset to stream from. Note that events within the same partition maintain their overall order.
The offset value of the cursor allows you select where the in the stream you
want to consume from. This can be any known offset value, or the dedicated value
BEGIN which will start the stream from the beginning. For example, to read
from partition 0 from the beginning:
curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events \
-H 'X-Nakadi-Cursors:[{"partition": "0", "offset":"BEGIN"}]'
The details of the partitions and their offsets for an event type are
available via its partitions resource.
If there are no events to be delivered Nakadi will keep a streaming connection open by
periodically sending a batch with no events but which contains a cursor pointing to
the current offset. For example:
curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events
HTTP/1.1 200 OK
{"cursor":{"partition":"0","offset":"6"},"events":[{"order_number": "ORDER_003", "metadata": {"eid": "4cc6d2f0-eb01-11e5-b606-1c6f65464fc6", "occurred_at": "2016-03-15T23:58:15+01:00"}}]}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}
This can be treated as a keep-alive control for some load balancers.
The object containing the information describing an event is called an event type.
To publish events an event type must exist in Nakadi. The event type contains information such as its name, a category, the owning application, strategies for partitioning and enriching data, and a JSON Schema. Nakadi has an event type registry API that lists all the available event types.
There are three main categories of event type defined by Nakadi -
Business Event: An event that is part of, or drives a business process, such as a state transition in a customer order.
Data Change Event: An event that represents a change to a record or other item, or a new item. Change events are associated with a create, update, delete, or snapshot operation.
Undefined Event: A free form category suitable for events that are entirely custom to the producer.
Each event category enables different capabilities for an event type, notably their schema and validation rules, which we’ll describe next.
The events for the ‘business’ and ‘data’ categories have their own pre-defined schema structures, based on JSON Schema, as well as a schema that is defined custom to the event type when it is created. The pre-defined structures describe common fields for an event and the custom schema for the event is defined when the event type is created.
The schema for an event type is submitted as a JSON Schema and will only declare the custom part of the event. This means the pre-defined schema for the ‘business’ and ‘data’ categories don’t need to be declared (and should not be declared). The ‘undefined’ category has no-predefined schema.
When an event for one of these categories is posted to the server, it is expected to conform to the combination of the pre-defined schema and to the custom schema defined for the event type, and not just the custom part of the event. This combination is called the effective schema and is validated by Nakadi for the ‘business’ and ‘data’ types.
The ‘undefined` category behaves slightly different to the other categories. Its effective schema is exactly the same as the one created with its event type definition (it has no extra structure), but it is not validated by Nakadi. Instead an ‘undefined’ event type’s schema is simply made available in the event type registry for consumers to use if they wish.
The custom schema for an event type can be as simple as
{ "\additionalProperties\": true } to allow arbitrary JSON, but will usually
have a more specific definition :)
Compatibility modes are used to control schema changes. Each mode solves a specific problem and thus presents different constraints.
Nakadi supports different compatibility modes for event types used to control valid schema changes and to ensure schema consistency of published event data. Compatibility modes define restrictions for event schema evolution, i.e. a set cascade of allowed schema changes and thereby different compatibility guarantees for event consumers.
The default compatibility mode is forward compatible, but full
compatibility usage is definitely encouraged, and the default mode
will change in near future.
The compatible compatibility mode is the safest mode. It’s both
forward compatible and backward compatible. It means that:
It guarantees high data quality, which is crucial for a variety of applications. At Zalando, it’s required to use this compatibility mode to have data processed by business intelligence and long term event storage by the data lake.
Supported changes:
The following json-schema attributes are not supported:
Removing the support for these attributes is necessary to avoid the introduction of incompatible changes.
Under this compatibility mode, it’s necessary to fully specify events properties in order for validation to succeed; events containing properties that are not declared in the schema will be rejected. For this reason, producers should first update their schemas with new attributes and only after that start producing events with such attributes.
The forward mode has been designed to allow event type owners to
expand their schemas without breaking existing consumers.
It’s called forward because consumers using older schemas can safely read events generated by newer schemas. However, it’s not backward compatible, in the sense that reading older events using newer schemas is not safe.
Supported changes:
Under this mode event validation accepts events with fields not
declared in the schema. In other words, it’s not necessary to specify
additionalProperties true in the schema, since this is the default
behaviour of json-schema validator.
We discourage the usage of additionalProperties entirely. The more
complete a schema definition the more stable and valuable the
events. The ability to not specify every attribute is there only for
some very specific situations when it’s not possible to define them
all.
Consumers reading events from forward compatible event types SHOULD ignore event attributes not declared in the event type schema. This is aligned with API guidelines for compatibility.
Under compatibility mode none schemas can be changed
arbitrarily. This mode is not recommended unless there is a very good
reason not to provide any compatibility guarantee.
It’s possible to change the compatibility mode from none to
forward and from forward to compatible, e.g. it’s possible to
make the schema validation more strict but never more relaxed.
It’s not possible to upgrade directly from none to
compatible. It’s necessary to go first through forward for later
upgrading to compatible.
Users should be aware about changes in validation behaviour when
upgrading to compatible. Please, be sure to read the section on
compatible mode above.
An event type can be created by posting to the /event-types resource.
This example shows a business category event type called order_received:
curl -v -XPOST -H "Content-Type: application/json" http://localhost:8080/event-types -d '{
"name": "order_received",
"owning_application": "acme-order-service",
"category": "business",
"partition_strategy": "hash",
"partition_key_fields": ["order_number"],
"enrichment_strategies": ["metadata_enrichment"],
"default_statistic": {
"messages_per_minute": 1000,
"message_size": 5,
"read_parallelism": 1,
"write_parallelism": 1
},
"schema": {
"type": "json_schema",
"schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
}
}'
The event type has a simple JSON Schema submitted as an escaped JSON string describing the order number and thus only declare the custom part of the schema. The partition_strategy
says events will be allocated to partitions according to the hash of the order_number field, defined in partition_key_fields, and the owner’s name is
"acme-order-service". The enrichment_strategies array says to apply metadata_enrichment to submitted events (common metadata is a feature of some categories).
The event type has an optional default_statistic object, which controls the number of partitions. Nakadi will use a sensible default if no value is provided. The values provided here
cannot be changed later, so choose them wisely.
A successful request will result in a 201 Created response.
Once an event type is created, it is added to the event type registry and its details are visible from its URI in the registry. Events can then be posted to its stream and consumed by multiple clients.
The exact required fields depend on the event type’s category, but name, owning_application and schema are always expected. The
“API Reference” contains more details on event types.
An event type’s stream is divided into one or more partitions and each event is placed into exactly one partition. Partitions preserve the order of events - once an event is added to a partition its position relative to other events in the partition is never changed. The details of the partitions and their offsets for an event type are available via its /partitions resource.
Each partition is a fully ordered log, and there is no global ordering across partitions. Clients can consume a stream’s partitions independently and track their position across the stream.

Dividing a stream this way allows the overall system to be scaled and provide good throughput for producers and consumers. It’s similar to how systems such as Apache Kafka and AWS Kinesis work.
The assignment of events to a partition is controllable by the producer. The
partition_strategy field determines how events are mapped to partitions. Nakadi offers the following strategies:
random: the partition is selected randomly and events will be evenly distributed across partitions. Random is the default option used by Nakadi.
hash: the partition is selected by hashing the value of the fields
defined in the event type’s partition_key_fields. In practice this means events that are about the same logical entity and which have the same values for the partition key will be sent to the same partition.
user_defined: the partition is set by the producer when sending an event. This option is only available for the ‘business’ and data’ categories.
Which option to use depends on your requirements. When order matters, hash is usually the right choice. For very high volume streams where order doesn’t matter, random can be a good choice as it load balances data well. The user defined option is a power tool, unless you know you need it, use hash or random. Hash is the preferred strategy, as it ensures that duplicated events will end up in the same partition.
Nakadi allows users to restrict access to resources they own - currently, event types are the only resources supported, but we plan to extend this feature to subscriptions in the near future.
The authorization model is simple: policies can be attached to resources. A policy P defines which subjects can
perform which operations on a resource R. To do so, the policy contains, for each operation, a list of attributes that
represent subjects who are authorized to perform the operation on the resource. For a subject to be authorized, it needs
to match at least one of these attributes (not necessarily all of them).
There are three kinds of operation: admin, to update the resource and delete it; read, to read events from the
resource; and write, to write events to the resource.
An authorization request is represented by the tuple
R(subject, operation, resource)
The request will be approved iff the resource policy has at least one attribute for operation that matches the
subject.
Protecting an event type can be done either during the creation of the event type, or later, as an update to the event type. Users simply need to add an authorization section to their event type description, which looks like this:
"authorization": {
"admins": [{"data_type": "user", "value": "bfawlty"}],
"readers": [{"data_type": "user", "value": "bfawlty"}],
"writers": [{"data_type": "user", "value": "bfawlty"}]
}
In this section, the admins list includes the attributes that authorize a subject to perform the admin operation;
the readers list includes the attributes that authorize a subject to perform the read operation; and the writers
list includes the attributes that authorize a subject to perform the write operation;
Whenever an event type is created, or its authorization section is updated, all attributes are validated. The exact nature of the validation depends on the plugin implementation.
Here is a sample request with an authorization section. It gives read, write, and admin access to a single attribute,
of type service:
curl -v -XPOST -H "Content-Type: application/json" http://localhost:8080/event-types -d '{
"name": "order_received",
"owning_application": "acme-order-service",
"category": "business",
"partition_strategy": "hash",
"partition_key_fields": ["order_number"],
"enrichment_strategies": ["metadata_enrichment"],
"default_statistic": {
"messages_per_minute": 1000,
"message_size": 5,
"read_parallelism": 1,
"write_parallelism": 1
},
"schema": {
"type": "json_schema",
"schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
},
"authorization": {
"admins": [{"data_type": "user", "value": "bfawlty"}],
"readers": [{"data_type": "user", "value": "bfawlty"}],
"writers": [{"data_type": "user", "value": "bfawlty"}]
}
}'
Updating an event type is similar to creating one. Here is a sample request, that gives read, write, and admin access to the same application:
curl -v -XPUT -H "Content-Type: application/json" http://localhost:8080/event-types/order_received -d '{
"name": "order_received",
"owning_application": "acme-order-service",
"category": "business",
"partition_strategy": "hash",
"partition_key_fields": ["order_number"],
"enrichment_strategies": ["metadata_enrichment"],
"default_statistic": {
"messages_per_minute": 1000,
"message_size": 5,
"read_parallelism": 1,
"write_parallelism": 1
},
"schema": {
"type": "json_schema",
"schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
},
"authorization": {
"admins": [{"data_type": "user", "value": "bfawlty"}],
"readers": [{"data_type": "user", "value": "bfawlty"}],
"writers": [{"data_type": "user", "value": "bfawlty"}]
}
}'
When updating an event type, users should keep in mind the following caveats:
WARNING: this also applies to consumers using subscriptions; if a subscription includes multiple event types, and as a result of the update, a consumer loses read access to one of them, then the consumer will not be able to consume from the subscription anymore.
One or more events can be published by posting to an event type’s stream.
The URI for a stream is a nested resource and based on the event type’s name - for example the “widgets” event type will have a relative resource path called /event-types/widgets/events.
This example posts two events to the order_received stream:
curl -v -XPOST -H "Content-Type: application/json" http://localhost:8080/event-types/order_received/events -d '[
{
"order_number": "24873243241",
"metadata": {
"eid": "d765de34-09c0-4bbb-8b1e-7160a33a0791",
"occurred_at": "2016-03-15T23:47:15+01:00"
}
}, {
"order_number": "24873243242",
"metadata": {
"eid": "a7671c51-49d1-48e6-bb03-b50dcf14f3d3",
"occurred_at": "2016-03-15T23:47:16+01:00"
}
}]'
HTTP/1.1 200 OK
As shown above, the event stream accepts an array of events.
Each event sent to the stream will be validated relative to the effective schema for the event type’s category.
The validation behavior and the effective schema varies based on the event type’s category. For example, because the example above is a ‘business’ category event type, as well as the fields defined in the event type’s original schema, the events must also contain a metadata object with an eid and occurred_at fields in order to conform to the standard structure for that category.
Once the event is validated, it is placed into a partition and made available to consumers. If the event is invalid, it is rejected by Nakadi.
@@@TODO
The order of events in the posted array will be the order they are published onto the event stream and seen by consumers. They are not re-ordered based on any values or properties of the data.
Applications that need to order events for a particular entity or based on a identifiable key in the data should configure their event type with the hash partitioning strategy and name the fields that can be used to construct the key. This allows partial ordering for a given entity.
Total ordering is not generally achievable with Nakadi (or Kafka) unless the partition size is configured to be size 1. In most cases, total ordering is not needed and in many cases is not desirable as it can severely limit system scalability and result in cluster hot spotting.
Nakadi preserves the order of events sent to it (the “arrival order”), but has no control over the network between it and the producer. In some cases it may be possible for events to leave the producer but arrive at Nakadi in a different order (the “delivery order”).
Not all events need ordering guarantees but producers that do need end to end ordering have a few options they can take:
Wait for a response from the Nakadi server before posting the next event. This trades off overall producer throughput for ordering.
Use the parent_eids field in the ‘business’ and ‘data’ categories. This acts as a causality mechanism by allowing events to have “parent” events. Note the parent_eids option is not available in the ‘undefined’ category.
Define and document the ordering semantics as part of the event type’s scheme definition such that a consumer could use the information to sequence events at their end.
The Low-level API is deprecated, and will be removed from a future version of Nakadi. Please consider using the High-level API instead.
A consumer can open the stream for an Event Type via the /events sub-resource. For example to connect to the order_received stream send a GET request to its stream as follows:
curl -v http://localhost:8080/event-types/order_received/events
The stream accepts various parameters from the consumer, which you can read about in the “API Reference”. In this section we’ll just describe the response format, along with how cursors and keepalives work.
The HTTP response on the wire will look something like this (the newline is show as \n for clarity):
curl -v http://localhost:8080/event-types/order_received/events
HTTP/1.1 200 OK
Content-Type: application/x-json-stream
{"cursor":{"partition":"0","offset":"6"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"5"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"4"},"events":[...]}\n
Nakadi groups events into batch responses (see the next section, “Batch Responses” for some more details). Batches are separated by a newline and each available batch will be emitted on a single line. If there are no new batches the server will occasionally emit an empty batch (see the section “Event Stream Keepalives” further down).
Technically, while each batch is a JSON document, the overall response is not valid JSON. For this reason it is served as the media type application/x-stream-json rather than application/json. Consumers can use the single line delimited structure to frame data for JSON parsing.
A pretty-printed batch object looks like this -
{
"cursor": {
"partition": "0",
"offset": "4"
},
"events": [...]
}
Each batch belongs to a single partition. The cursor object describes the partition and the offset for this batch of events. The cursor allow clients to checkpoint their position in the stream’s partition. Note that individual events in the stream don’t have cursors, they live at the level of a batch.
The events array contains a list of events that were published in the order they arrived from the producer. Note that while the producer can also send batches of events, there is no strict correlation between the batches the consumer is given and the ones the producer sends. Nakadi will regroup events send by the producer and distribute them across partitions as needed.
By default the /events resource will return data from all partitions of an event type stream and will do so from the end (or “tail”) of the stream. To select only particular partitions and a position in the stream to start, you can supply an X-Nakadi-Cursors header in the request:
curl -v http://localhost:8080/event-types/order_received/events \
-H 'X-Nakadi-Cursors: [{"partition": "0", "offset":"12"}]'
The X-Nakadi-Cursors header value is a JSON array of cursors. Each cursor in the array describes its partition for the stream and an offset to stream from.
The offset value of the cursor allows you select where in the stream partition you want to consume from. This can be any known offset value, or the dedicated value begin which will start the stream from the beginning. For example, to read from partition 0 from the beginning:
curl -v http://localhost:8080/event-types/order_received/events \
-H 'X-Nakadi-Cursors:[{"partition": "0", "offset":"begin"}]'
If there are no events to be delivered the server will keep a streaming connection open by periodically sending a batch with no events but which contains a cursor pointing to the current offset. For example:
curl -v http://localhost:8080/event-types/order_received/events
HTTP/1.1 200 OK
Content-Type: application/x-json-stream
{"cursor":{"partition":"0","offset":"6"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"6"}}\n
{"cursor":{"partition":"0","offset":"6"}}\n
{"cursor":{"partition":"0","offset":"6"}}\n
This can be treated as a keep-alive control.
Subscriptions allow clients to consume events, where the Nakadi server store offsets and automatically manages reblancing of partitions across consumer clients. This allows clients to avoid managing stream state locally.
The typical workflow when using subscriptions is:
Create a Subscription specifying the event-types you want to read.
Start reading batches of events from the subscription.
Commit the cursors found in the event batches back to Nakadi, which will store the offsets.
If the connection is closed, and later restarted, clients will get events from the point of your last cursor commit. If you need more than one client for your subscription to distribute the load you can read the subscription with multiple clients and Nakadi will balance the load across them.
The following sections provide more detail on the Subscription API and basic examples of Subscription API creation and usage:
For a more detailed description and advanced configuration options please take a look at Nakadi swagger file.
A Subscription can be created by posting to the /subscriptions collection resource:
curl -v -XPOST "http://localhost:8080/subscriptions" -H "Content-type: application/json" -d '{
"owning_application": "order-service",
"event_types": ["order.ORDER_RECEIVED"]
}'
The response returns the whole Subscription object that was created, including the server generated id field:
HTTP/1.1 201 Created
Content-Type: application/json;charset=UTF-8
{
"owning_application": "order-service",
"event_types": [
"order.ORDER_RECEIVED"
],
"consumer_group": "default",
"read_from": "end",
"id": "038fc871-1d2c-4e2e-aa29-1579e8f2e71f",
"created_at": "2016-09-23T16:35:13.273Z"
}
Consuming events is done by sending a GET request to the Subscriptions’s event resource (/subscriptions/{subscription-id}/events):
curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/events"
The response is a stream that groups events into JSON batches separated by an endline (\n) character. The output looks like this:
HTTP/1.1 200 OK
X-Nakadi-StreamId: 70779f46-950d-4e48-9fca-10c413845e7f
Transfer-Encoding: chunked
{"cursor":{"partition":"5","offset":"543","event_type":"order.ORDER_RECEIVED","cursor_token":"b75c3102-98a4-4385-a5fd-b96f1d7872f2"},"events":[{"metadata":{"occurred_at":"1996-10-15T16:39:57+07:00","eid":"1f5a76d8-db49-4144-ace7-e683e8ff4ba4","event_type":"aruha-test-hila","partition":"5","received_at":"2016-09-30T09:19:00.525Z","flow_id":"blahbloh"},"data_op":"C","data":{"order_number":"abc","id":"111"},"data_type":"blah"},"info":{"debug":"Stream started"}]}
{"cursor":{"partition":"5","offset":"544","event_type":"order.ORDER_RECEIVED","cursor_token":"a28568a9-1ca0-4d9f-b519-dd6dd4b7a610"},"events":[{"metadata":{"occurred_at":"1996-10-15T16:39:57+07:00","eid":"1f5a76d8-db49-4144-ace7-e683e8ff4ba4","event_type":"aruha-test-hila","partition":"5","received_at":"2016-09-30T09:19:00.741Z","flow_id":"blahbloh"},"data_op":"C","data":{"order_number":"abc","id":"111"},"data_type":"blah"}]}
{"cursor":{"partition":"5","offset":"545","event_type":"order.ORDER_RECEIVED","cursor_token":"a241c147-c186-49ad-a96e-f1e8566de738"},"events":[{"metadata":{"occurred_at":"1996-10-15T16:39:57+07:00","eid":"1f5a76d8-db49-4144-ace7-e683e8ff4ba4","event_type":"aruha-test-hila","partition":"5","received_at":"2016-09-30T09:19:00.741Z","flow_id":"blahbloh"},"data_op":"C","data":{"order_number":"abc","id":"111"},"data_type":"blah"}]}
{"cursor":{"partition":"0","offset":"545","event_type":"order.ORDER_RECEIVED","cursor_token":"bf6ee7a9-0fe5-4946-b6d6-30895baf0599"}}
{"cursor":{"partition":"1","offset":"545","event_type":"order.ORDER_RECEIVED","cursor_token":"9ed8058a-95be-4611-a33d-f862d6dc4af5"}}
Each batch contains the following fields:
cursor: The cursor of the batch which should be used for committing the batch.
events: The array of events of this batch.
info: An optional field that can hold useful information (e.g. the reason why the stream was closed by Nakadi).
Please also note that when stream is started, the client receives a header X-Nakadi-StreamId which must be used when committing cursors.
To see a full list of parameters that can be used to control a stream of events, please see an API specification in swagger file.
If you need more than one client for your subscription to distribute load or increase throughput - you can read the subscription with multiple clients and Nakadi will automatically balance the load across them.
The balancing unit is the partition, so the number of clients of your subscription can’t be higher than the total number of all partitions of the event-types of your subscription.
For example, suppose you had a subscription for two event-types A and B, with 2 and 4 partitions respectively. If you start reading events with a single client, then the client will get events from all 6 partitions. If a second client connects, then 3 partitions will be transferred from first client to a second client, resulting in each client consuming 3 partitions. In this case, the maximum possible number of clients for the subscription is 6, where each client will be allocated 1 partition to consume.
The Subscription API provides a guarantee of at-least-once delivery. In practice this means clients can see a duplicate event in the case where there are errors committing events. However the events which were successfully committed will not be resent.
A useful technique to detect and handle duplicate events on consumer side is to be idempotent and to check eid field of event metadata. Note: eid checking is not possible using the “undefined” category, as it’s only supplied in the “business” and “data” categories.
The cursors in the Subscription API have the following structure:
{
"partition": "5",
"offset": "543",
"event_type": "order.ORDER_RECEIVED",
"cursor_token": "b75c3102-98a4-4385-a5fd-b96f1d7872f2"
}
The fields are:
partition: The partition this batch belongs to. A batch can only have one partition.
offset: The offset of this batch. The offset is server defined and opaque to the client - clients should not try to infer or assume a structure.
event_type: Specifies the event-type of the cursor (as in one stream there can be events of different event-types);
cursor_token: The cursor token generated by Nakadi.
Cursors can be committed by posting to Subscription’s cursor resource (/subscriptions/{subscriptionId}/cursors), for example:
curl -v -XPOST "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/cursors"\
-H "X-Nakadi-StreamId: ae1e39c3-219d-49a9-b444-777b4b03e84c" \
-H "Content-type: application/json" \
-d '{
"items": [
{
"partition": "0",
"offset": "543",
"event_type": "order.ORDER_RECEIVED",
"cursor_token": "b75c3102-98a4-4385-a5fd-b96f1d7872f2"
},
{
"partition": "1",
"offset": "923",
"event_type": "order.ORDER_RECEIVED",
"cursor_token": "a28568a9-1ca0-4d9f-b519-dd6dd4b7a610"
}
]
}'
Please be aware that X-Nakadi-StreamId header is required when doing a commit. The value should be the same as you get in X-Nakadi-StreamId header when opening a stream of events. Also, each client can commit only the batches that were sent to it.
The possible successful responses for a commit are:
204: cursors were successfully committed and offset was increased.
200: cursors were committed but at least one of the cursors didn’t increase the offset as it was less or equal to already committed one. In a case of this response code user will get a json in a response body with a list of cursors and the results of their commits.
The timeout for commit is 60 seconds. If you open the stream, read data and don’t commit anything for 60 seconds - the stream connection will be closed from Nakadi side. Please note that if there are no events available to send and you get only empty batches - there is no need to commit, Nakadi will close connection only if there is some uncommitted data and no commits happened for 60 seconds.
If the connection is closed for some reason then the client still has 60 seconds to commit the events it received from the moment when the events were sent. After that the session
will be considered closed and it will be not possible to do commits with that X-Nakadi-StreamId.
If the commit was not done - then the next time you start reading from a subscription you
will get data from the last point of your commit, and you will again receive the events you
haven’t committed.
When a rebalance happens and a partition is transferred to another client - the commit timeout of 60 seconds saves the day again. The first client will have 60 seconds to do the commit for that partition, after that the partition is started to stream to a new client. So if the commit wasn’t done in 60 seconds then the streaming will start from a point of last successful commit. In other case if the commit was done by the first client - the data from this partition will be immediately streamed to second client (because there is no uncommitted data left and there is no need to wait any more).
It is not necessary to commit each batch. When the cursor is committed, all events that
are before this cursor in the partition will also be considered committed. For example suppose the offset was at e0 in the stream below,
partition: [ e0 | e1 | e2 | e3 | e4 | e5 | e6 | e7 | e8 | e9 ]
offset--^
and the stream sent back three batches to the client, where the client committed batch 3 but not batch 1 or batch 2,
partition: [ e0 | e1 | e2 | e3 | e4 | e5 | e6 | e7 | e8 | e9 ]
offset--^
|--- batch1 ---|--- batch2 ---|--- batch3 ---|
| | |
v | |
[ e1 | e2 | e3 ] | |
v |
[ e4 | e5 | e6 ] |
v
[ e7 | e8 | e9 ]
client: cursor commit --> |--- batch3 ---|
then the offset will be moved all the way up to e9 implicitly committing all the events that were in the previous batches 1 and 2,
partition: [ e0 | e1 | e2 | e3 | e4 | e5 | e6 | e7 | e8 | e9 ]
^-- offset
You can also check the current position of your subscription:
curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/cursors"
The response will be a list of current cursors that reflect the last committed offsets:
HTTP/1.1 200 OK
{
"items": [
{
"partition": "0",
"offset": "8361",
"event_type": "order.ORDER_RECEIVED",
"cursor_token": "35e7480a-ecd3-488a-8973-3aecd3b678ad"
},
{
"partition": "1",
"offset": "6214",
"event_type": "order.ORDER_RECEIVED",
"cursor_token": "d1e5d85e-1d8d-4a22-815d-1be1c8c65c84"
}
]
}
The API also provides statistics on your subscription:
curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/stats"
The output will contain the statistics for all partitions of the stream:
HTTP/1.1 200 OK
{
"items": [
{
"event_type": "order.ORDER_RECEIVED",
"partitions": [
{
"partition": "0",
"state": "reassigning",
"unconsumed_events": 2115,
"stream_id": "b75c3102-98a4-4385-a5fd-b96f1d7872f2"
},
{
"partition": "1",
"state": "assigned",
"unconsumed_events": 1029,
"stream_id": "ae1e39c3-219d-49a9-b444-777b4b03e84c"
}
]
}
]
}
To delete a Subscription, send a DELETE request to the Subscription resource using its id field (/subscriptions/{id}):
curl -v -X DELETE "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f"
Successful response:
HTTP/1.1 204 No Content
To view a Subscription send a GET request to the Subscription resource resource using its id field (/subscriptions/{id}): :
curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f"
Successful response:
HTTP/1.1 200 OK
{
"owning_application": "order-service",
"event_types": [
"order.ORDER_RECEIVED"
],
"consumer_group": "default",
"read_from": "end",
"id": "038fc871-1d2c-4e2e-aa29-1579e8f2e71f",
"created_at": "2016-09-23T16:35:13.273Z"
}
To get a list of subscriptions send a GET request to the Subscription collection resource:
curl -v -XGET "http://localhost:8080/subscriptions"
Example answer:
HTTP/1.1 200 OK
{
"items": [
{
"owning_application": "order-service",
"event_types": [
"order.ORDER_RECEIVED"
],
"consumer_group": "default",
"read_from": "end",
"id": "038fc871-1d2c-4e2e-aa29-1579e8f2e71f",
"created_at": "2016-09-23T16:35:13.273Z"
}
],
"_links": {
"next": {
"href": "/subscriptions?offset=20&limit=20"
}
}
}
It’s possible to filter the list with the following parameters: event_type, owning_application.
Also, the following pagination parameters are available: offset, limit.
Nakadi does not ship with a client, but there are some open source clients available that you can try:
| Name | Language/Framework | GitHub |
|---|---|---|
| Nakadi Java | Java | https://github.com/dehora/nakadi-java |
| Fahrschein | Java | https://github.com/zalando-nakadi/fahrschein |
| Riptide: Stream | Java/Spring | https://github.com/zalando/riptide/tree/master/riptide-stream |
| Kanadi | Scala | https://github.com/zalando-incubator/kanadi |
| Nakadion | Rust | https://crates.io/crates/nakadion |
| nakadi-client | Haskell | https://nakadi-client.haskell.silverratio.net |
| go-nakadi | Go | https://github.com/stoewer/go-nakadi |
| nakacli | CLI | https://github.com/amrhassan/nakacli |
More Nakadi related projects can be found here https://github.com/zalando-nakadi
We’ll add more clients to this section as they appear. Nakadi doesn’t support these clients; issues and pull requests should be filed with the client project.
In this section, we’ll look at how Nakadi fits in with the stream broker/processing ecosystems. Notably we’ll compare it to Apache Kafka, as that’s a common question, but also look briefly at some of the main cloud offerings in this area.
Relative to Apache Kafka, Nakadi provides a number of benefits while still leveraging the raw power of Kafka as its internal broker.
Nakadi has some characteristics in common with Kafka, which is to be expected as the Kafka community has done an excellent job in defining the space. The logical model is basically the same - streams have partitions, messages in a partition maintain their order, and there’s no order across partitions. One producer can send an event to be read by multiple consumers and consumers have access to offset data that they can checkpoint. There are also some differences. For example Nakadi doens’t expose Topics as a concept in its API. Instead there are Event Types that define structure and ownership details as well as the stream. Also consumers receive messages in batches and each batch is checkpointed rather than an individual message.
Nakadi uses HTTP for communications. This lets microservices to maintain their boundaries and avoids forcing a shared technology dependency on producers and consumers - if you can speak HTTP you can use Nakadi and communicate with other services. This is a fairly subtle point, but Nakadi is optimised for general microservices integration and message passing, and not just handing off data to analytics subsystems. This means it needs to be available to as many different runtimes and stacks as possible, hence HTTP becomes the de-facto choice.
Nakadi is designed to support autonomous service teams. In Zalando, where Nakadi originated, each team has autonomy and control of their microservices stack to let them move quickly and take ownership. When running on AWS, this extends all the way down - every team has their own account structure, and to ensure a level of security and compliance teams run standard AMIs and constrain how they interact to HTTPS using OAuth2 access controls. This means we tend to want to run any shared infrastructure as a service with a HTTP based interface. Granted, not everyone has this need - many shops on AWS won’t have per-team account structures and will tend to use a smaller number of shared environments, but we’ve found it valulable to be able leverage the power of systems like Kafka in a way that fits in with this service architecture.
An event type registry with schema validation. Producers can define event types using JSON Schema. Having events validated against a published schema allows consumers to know they will. There are projects in the Kafka ecosystem from Confluent that provide similar features such as the rest-proxy and the schema-registry, but they’re slightly optimised for analytics, and not quite ideal for microservices where its more common to use regular JSON rather than Avro. The schema registry in particular is dependent on Avro. Also the consumer connection model for the rest-proxy requires clients are pinned to servers which complicates clients - the hope for the Nakadi is that its managed susbcription API, when that’s available, will not require session affinity in this way.
Inbuilt event types. Nakadi also optional support for events that describe business processes and data changes. These provide common primitives for event identity, timestamps, causality, operations on data and header propagation. Teams could define their own structures, but there’s value in having some basic things that consumers and producers can coordinate on independent of the payload, and which are being checked before being propagated to multiple consumers.
Operations is also a factor in Nakadi’s design. Managing upgrades to systems like Kafka becomes easier when technology sits behind an API and isn’t a shared dependency between microservices. Asychronous event delivery can be a simpler overall option for a microservice architecture compared to synchronized and deep call paths that have to be mitigated with caches, bulkheads and circuit breakers.
In short, Nakadi is best seen as a complement to Kafka. It allows teams to use Kafka within their own boundaries but not be forced into sharing it as a global dependency.
Like Nakadi, Pub/Sub has a HTTP API which hides details from producers and consumers and makes it suitable for use as a microservices backplane. There are some differences worth noting:
Pub/Sub lets you acknowledge every message individually rather than checkpointing a position in a logical log. This approach makes its model fairly different to the other systems mentioned here. While it implies that there are no inbuilt ordering assurances it does allow consumers to be very precise about what they have received.
Pub/Sub requires a susbcription to be setup before messages can be consumed, which can then be used to manage delivery state for messages. In that sense it’s not unlike a traditional queuing system where the server (or “broker”) manages state for the consumer, with the slight twist that messages have a sort of random access for acknowledgements instead of competing for work at the top of queue. Nakadi may offer a similar subcription option in the future via a managed API, but today consumers are expected to manage their own offsets.
Pub/Sub uses a polling model for consumers. Consumers grab a page of messages to process and acknowlege, and then make a new HTTP request to grab another page. Nakadi maintains a streaming connection to consumers, and will push events as they arrive.
Pub/Sub uses a common envelope structure for producing and consuming messages, and does not define any higher level structures beyond that.
Like Nakadi and Pub/Sub, AWS Kinesis has a HTTP API to hide its details. Kinesis and Nakadi are more similar to each other than Pub/Sub, but there are some differences.
Kinesis expose shards (partitions) for a stream and supplies enough information to support per message checkpointing with semantics much like Kafka and Nakadi. Nakadi only supplies checkpointing information per batch of messages. Kinesis allows setting the partition hash key directly, whereas Nakadi computes the key based on the data.
Kinesis uses a polling model for consumers, whereas Nakadi maintains a streaming connection Kinesis consumers use a “shard iterator” to a grab pages of message, and then make a new HTTP request to grab another page. Kinesis limits the rate at which this can be done across all consumers (typically 5 transactions per second per open shard), which places an upper bound on consumer throughput. Kinesis has a broad range of choices for resuming from a position in the stream, Nakadi allows access only from the beginning and a named offset.
Kinesis uses a common envelope structure for producing and consuming messages, and does not define any higher level structures beyond that. Payload data is submitted as an opaque base64 blob.
AWS restrict the number of streams available to an account to quite a low starting number, and messages can be stored for a maximum of 7 days whereas Nakadi can support a large number of event types and the expiration for events is configurable.
Kinesis supports resizing the number of shards in a stream wheres partition counts in Nakadi are fixed once set for an event type.
The basic abstraction in SQS is a queue, which is quite different from a Nakadi / Kafka stream.
SQS queues are durable and highly available. A queue can hold an unlimited number of messages, with a maximum message retention of 2 weeks. Each message carries an opaque text payload (max. 256KB). In addition to that, messages can have up to 10 message attributes, which can be read without inspecting the payload.
Each message in an SQS queue can only be consumed once. In the case of multiple consumers, each one would typically use a dedicated SQS queue, which are all hooked up to a shared Amazon SNS topic that provides the fanout. When a new consumer is later added to this setup, its queue will initially be empty. An SQS queue does not have any history, and cannot be “replayed” again like a Kafka stream.
SQS has “work queue” semantics. This means that delivered messages have to be removed from the queue explicitly by a separate call. If this call is not received within a configured timeframe, the message is delivered again (“automatic retry”). After a configurable number of unsuccessful deliveries, the message is moved to a dead letter queue.
In contrast to moving a single cursor in the datastream (like in Nakadi, Kinesis or Kafka), SQS semantics of confirming individual messages, has advantages if a single message is unprocessable (i.e. format is not parseable). In SQS only the problamatic message is delayed. In a cursor semantic the client has to decide: Either stop all further message processing until the problem is fixed or skip the message and move the cursor.
Hermes like Nakadi, is an API based broker build on Apache Kafka. There are some differences worth noting:
Hermes uses webhooks to deliver messages to consumers. Consumers register a subscription with a callback url and a subscription policy that defines behaviors such as retries and delivery rates. Nakadi maintains a streaming connection to consumers, and will push events as they arrive. Whether messages are delivered in order to consumers does not appear to be a defined behaviour in the API. Similar to Kafka, Nakadi will deliver messages to consumers in arrival order for each partition. Hermes does not appear to support partitioning in its API. Hermes has good support for tracking delivered and undelivered messages to susbcribers.
Hermes supports JSON Schema and Avro validation in its schema registry. Nakadi’s registry currently only supports JSON Schema, but may support Avro in the future. Hermes does not provide inbuilt event types, whereas Nakadi defines optional types to support data change and business process events, with some uniform fields producers and consumers can coordinate on.
Hermes allows topics (event types in Nakadi) to be collated into groups that are adminstrated by a single publisher. Consumers access data at a per topic level, the same as Nakadi currently; Nakadi may support multi-topic subscriptions in the future via a subscription API.
The Hermes project supports a Java client driver for publishing messages. Nakadi does not ship with a client.
Hermes claims resilience when it comes to issues with its internal Kafka broker, such that it will continue to accept messages when Kafka is down. It does this by buffering messages in memory with an optional means to spill to local disk; this will help with crashing brokers or hermes nodes, but not with loss of an instance (eg an ec2 instance). Nakadi does not accept messages if its Kafka brokers are down or unavailable.
This section features patterns on how to use Nakadi and event stream processing in general.
Nakadi throughput scales with the number of partitions in an event type. The number of partitions in an event type is fixed — it can only be configured on create. Scaling throughput by creating a new event type can be tricky though, because the switch-over has to be coordinated between producers and consumers.
You expect a significant increase in throughput over time. How many partitions should you create?
Create more partitions then you currently need. Each consumer initially reads from multiple partitions. Increase the number of consumers as throughput increases, until the number of consumers is equal to the number of partitions.
To distribute the workload evenly, make sure that each consumer reads from the same number of partitions. This strategy works best if the number of partitions is a product of small primes:
The total number of partitions in a Nakadi cluster is limited. Start with a single partition, and employ this pattern only once you are forced to use multiple partitions. Don’t over-overpartition, use the lowest sensible number that works. You can always fall back on creating a new event type with more partitions later, if necessary.
You want to process all events in a given event type, but you have to preserve local (per-partition) ordering of the events.
Create a processing pipeline with multiple stages. Each stage consists of a single worker thread, and an inbox (small bounded in-memory list).
Each stage reads events one by one from its inbox, processes them, and puts them in the inbox of the next stage. The first stage reads events from Nakadi instead.
If you want to publish the events to Nakadi after processing, then the last stage can collect them in an internal buffer and post them in batches.
To keep track of Nakadi cursors, you can push them as pseudo-events trough the pipeline. Once the cursor has reached the last stage, all events in the batch must have been processed, so the cursor can be saved.
Using bounded inboxes decouples the stages from each other, creates backpressure between them, and puts an upper limit on the total amount of work-in-progress in the pipeline.
Overall troughput of the pipeline is limited by the stage with the largest average processing time per event. By optimizing this bottleneck, you can optimize the overall throughput. Example: if the slowest stage needs 20ms to process each event, throughput will be lower than 50 events per second.
Each pipeline can consume events from one or more partitions. This setup can be scaled by increasing the number of pipelines running in parallel, up to the number of partitions in the event type.
Nakadi is hosted on Github - zalando/nakadi and you can clone or fork it from there.
The project is built with Gradle.
The gradlew wrapper script is available in the project’s root and will bootstrap the right Gradle version if it’s not already installed.
The gradle setup is fairly standard, the main dev tasks are:
./gradlew build: run a build and test./gradlew clean: clean down the buildPull requests and master are built using Travis CI and you can see the build history here.
There are a few build commands for testing -
./gradlew build: will run a build along with the unit tests./gradlew acceptanceTest: will run the acceptance tests./gradlew fullAcceptanceTest: will run the ATs in the context of DockerThere are a few build commands for running Docker -
./gradlew startDockerContainer: start the docker containers and download images if needed../gradlew stopAndRemoveDockerContainer: shutdown the docker processes./gradlew startStoragesInDocker: start the storage container that runs Kafka and PostgreSQL. This is handy for running Nakadi directly or in your IDE.For working with an IDE, the ./gradlew eclipse IDE task is available and you’ll be able to import the build.gradle into Intellij IDEA directly.

The default retention time in the project is set by the retentionMs value in application.yml, which is currently 2 days.
The service installation you’re working with may have a different operational setting, and you should get in touch with the team operating that internal Nakadi service.
At the moment, retention can’t be defined via the API per event type. It may be added as an option in the future. The best option for now would be to configure the underlying Kafka topic directly.
If you want to change the default for a server installation, you can set the retentionMs value in application.yml to a new value.
The default partition count in the project is set by the partitionNum value in application.yml, which is currently 1.
The service installation you’re working with may have a different operational setting, and you should get in touch with the team operating that internal Nakadi service.
At the moment, partition size can’t be defined via the API per event type. It may be added as an option in the future. The best option for now would be to configure the underlying Kafka topic directly.
If you want to change the default for a server installation, you can set the partitionNum value in application.yml to a new value.
See the section “Partition Strategies”, which goes into more detail on the available options and what they’re good for.
Clients can track offset information sent in the Cursor on a per-partition basis - each batch of events sent to a consumer will contain such a Cursor that will detail the partition id and an offset (see “Cursors and Offsets” for more information). This allows a client to track how far in the partition they have consumed events, and also allows them to submit a cursor with an appropriate value as described in the “Cursors and Offsets” section. One approach would be to use local storage (eg a datastore like PostgreSQL or DynamoDB) to record the position to date outside the client application, making it available in the event of restarts.
Note that a managed API is being developed which will supporting storing offsets for consumer clients in the future.
The effective schema is the combination of the schema structure defined for a particular category, such as ‘business’ or ‘data’ and the custom schema submitted when creating an event type. When an event is posted to Nakadi, the effective schema is used to validate the event and not the separate category level and custom level schemas.
You can read more in the section “Effective Schema”.
It’s possible you are working with an ‘undefined’ event type. The ‘undefined’ category doesn’t support metadata validation or enrichment. In more technical terms, the effective schema for an undefined event is exactly the same as the schema that was submitted when the event type was created.
The project doesn’t ship with a client, but there are a number of open source clients described in the “Clients” section.
If you have an open source client not listed there, we’d love to hear from you :) Let us know via GitHub and we’ll add it to the list.
The default behavior when running the docker containers locally will be for OAuth to be disabled.
If you are running a Nakadi server locally outside docker, you can disable token checks by setting the environment variable NAKADI_OAUTH2_MODE to OFF before starting the server.
Note that, even if OAuth is disabled using the NAKADI_OAUTH2_MODE environment variable, the current behavior will be to check a token if one is sent by a client so you might need to configure the client to also not send tokens.
The standard workaround is to define an event type with the following category and schema:
undefined{"additionalProperties": true}Note that sending a schema of {} means nothing will validate, not that anything will be allowed.
It’s a not a configuration the project directly supports or is designed for. But if you are willing to use JSON as a wrapper, one option is to define a JSON Schema with a property whose type is a string, and send the non-JSON content as a Base64 encoded value for the string. It’s worth pointing out this is entirely opaque to Nakadi and you won’t get the benefits of schema checking (or even that the submitted string is properly encoded Base64). Note that if you try this, you’ll need to be careful to encode the Base64 as being URL/file safe to avoid issues with the line delimited stream format Nakadi uses to send messages to consumers - as mentioned this is an option that the server doesn’t directly support.
If you get the message “Is the docker daemon running on this host?” first check that Docker and VirtualBox are running. If you know they are running, you might want to run this command -
eval "$(docker-machine env default)"
When there are no events available for an event-type because they’ve expired, then newest_available_offset will be smaller than oldest_available_offset. Because Nakadi has exclusive offset handling, it shows the offset of the last message in newest_available_offset.
Not at the moment. If the events are for different event types, or the events will be distributed across different partitions for a single event type, then there’s no way to achieve atomicity in the sense of “all events or no events will be published” in the general case. If the events belong to the same partition, the server does not have compensating behavior to ensure they will all be written.
Producers that need atomicity will want to create an event type structure that allows all the needed information to be contained within an event. This is a general distributed systems observation around message queuing and stream broker systems rather than anything specific to Nakadi.
The server will accept gzip encoded events when posted. On the consumer side, if the client asks for compression the server will honor the request.
Nakadi accepts contributions from the open-source community. Please see CONTRIBUTE.md.
This document covers Timelines internals. It’s meant to explain how timelines work, to help you understand the code and what each part of it contributes to the overall picture.
Timeline creation is coordinated through a series of locks and barriers using Zookeeper. Following we depict an example of what the ZK datastructure looks like at each step.
Every time a Nakadi application is launched, it tries to create the following ZK structure:
timelines:
lock: - lock for timeline versions synchronization
version: {version} monotonically incremented long value (version of timelines configuration)
locked_et: -
nodes: nakadi nodes
node1: {version} Each nakadi node exposes the version used on this node
node2: {version}
In order to not override the initial structure, due to concurrency,
each instance needs to take the lock /nakadi/timelines/lock before
executing.
When a new timeline creation is initiated, the first step is to
acquire a lock to update timelines for et_1 by creating an ephemeral
node at /timelines/locked_et/et_1.
timelines:
lock: -
version: 0
locked_et:
et_1: -
nodes:
node1: 0
node2: 0
Next, the instance coordinating the timeline creation bumps the version node, which all Nakadi instances are listening to changes, so they are notified when something changes.
timelines:
lock: -
version: 1 # this is incremented by 1
locked_et:
et_1: -
nodes:
node1: 0
node2: 0
Each Nakadi instance watches the value of the
/nakadi/timelines/version/ node. When it changes, each instance
checks all locked event types and reacts accordingly, by either
releasing or blocking publishers locally.
Once each instance has updated its local list of locked event types, it bumps its own version, to let the timeline creator initiator know that it can proceed.
timelines:
lock: -
version: 1
locked_et:
et_1: -
nodes:
node1: 1 # each instance updates its own version
node2: 1
Once all instances reacted, the creation proceeds with the initiator inserting the necessary database entries in the timelines table, and by snapshotting the latest available offset for the existing storage. It also creates a topic in the new storage. Be aware that if a timeline partition has never been used, the offset stored is -1. If it has a single event, the offset is zero and so on.
Following the same logic for initiating the creation of a timeline, locks are deleted and version is bumped. All Nakadi instances react by removing their local locks and switching timeline if necessary.
timelines:
lock: -
version: 2
locked_et:
nodes:
node1: 1
node2: 1
After every instance reacted, it should look like:
timelines:
lock: -
version: 2
locked_et:
nodes:
node1: 2 # each instance updates its own version
node2: 2
All done here. A new timeline has been created successfully. All operations are logged so in case you need to debug things, just take a look at INFO level logs.
Nakadi at its core aims at being a generic and content-agnostic event broker with a convenient
API. In doing this, Nakadi abstracts away, as much as possible, details of the backing
messaging infrastructure. The single currently supported messaging infrastructure is Kafka
(Kinesis is planned for the future).
In Nakadi every Event has an EventType, and a stream of Events is exposed for each
registered EventType.
An EventType defines properties relevant for the operation of its associated stream, namely:
Returns a list of all registered EventTypes
| Name | Located in | Description |
|---|---|---|
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
Creates a new EventType.
The fields enrichment-strategies and partition-resolution-strategy
have all an effect on the incoming Event of this EventType. For its impacts on the reception
of events please consult the Event submission API methods.
Event of this EventType. Details of usage can be found in this external documentEventType is defined as an EventTypeSchema. Currently onlyjson-schema is supported, representing JSON Schema draft 04.EventType withEventTypeSchema.type other than json-schema or passing a EventTypeSchema.schema| Name | Located in | Description |
|---|---|---|
| event-type | body |
EventType
EventType to be created |
Returns the EventType identified by its name.
| Name | Located in | Description |
|---|---|---|
| name | path |
String
Name of the EventType to load. |
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
Updates the EventType identified by its name. Behaviour is the same as creation of
EventType (See POST /event-type) except where noted below.
The name field cannot be changed. Attempting to do so will result in a 422 failure.
Modifications to the schema are constrained by the specified compatibility_mode.
Updating the EventType is only allowed for clients that satisfy the authorization admin requirements,
if it exists.
| Name | Located in | Description |
|---|---|---|
| name | path |
String
Name of the EventType to update. |
| event-type | body |
EventType
EventType to be updated. |
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
Deletes an EventType identified by its name. All events in the EventType’s stream’ will
also be removed. Note: deletion happens asynchronously, which has the following
consequences:
EventType before the underlying topic deletion is completeEventType is only allowed for clients that satisfy the authorization admin requirements,| Name | Located in | Description |
|---|---|---|
| name | path |
String
Name of the EventType to delete. |
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
GET with payload.
Calculate the distance between two offsets. This is useful for performing checks for data completeness, when
a client has read some batches and wants to be sure that all delivered events have been correctly processed.
If the event type uses ‘compact’ cleanup policy - then the actual number of events for consumption can be lower
than the distance reported by this endpoint.
If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.
| Name | Located in | Description |
|---|---|---|
| name | path |
String
Name of the EventType |
| cursors-distances-query | body |
Array of
CursorDistanceQuery
List of pairs of cursors: |
An array of CursorDistanceResults with the distance between two offsets.
OK
Unprocessable Entity
Access forbidden because of missing scope or EventType authorization failure.
GET with payload.
This endpoint is mostly interesting for monitoring purposes. Used when a consumer wants to know how far behind
in the stream its application is lagging.
It provides the number of unconsumed events for each cursor’s partition.
If the event type uses ‘compact’ cleanup policy - then the actual number of unconsumed events can be lower than
the one reported by this endpoint.
If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.
| Name | Located in | Description |
|---|---|---|
| name | path |
String
EventType name |
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
| cursors | body |
Array of
Cursor
Each cursor indicates the partition and offset consumed by the client. When a cursor is provided, |
The Low Level API is deprecated. It will be removed in a future version.
Starts a stream delivery for the specified partitions of the given EventType.
The event stream is formatted as a sequence of EventStreamBatches separated by \n. Each
EventStreamBatch contains a chunk of Events and a Cursor pointing to the end of the
chunk (i.e. last delivered Event). The cursor might specify the offset with the symbolic
value BEGIN, which will open the stream starting from the oldest available offset in the
partition.
Currently the application/x-json-stream format is the only one supported by the system,
but in the future other media types may be supported.
If streaming for several distinct partitions, each one is an independent EventStreamBatch.
The initialization of a stream can be parameterized in terms of size of each chunk, timeout
for flushing each chunk, total amount of delivered Events and total time for the duration of
the stream.
Nakadi will keep a streaming connection open even if there are no events to be delivered. In
this case the timeout for the flushing of each chunk will still apply and the
EventStreamBatch will contain only the Cursor pointing to the same offset. This can be
treated as a keep-alive control for some load balancers.
The tracking of the current offset in the partitions and of which partitions is being read
is in the responsibility of the client. No commits are needed.
The HTTP response on the wire will look something like this (the newline is show as \n for clarity):
curl -v http://localhost:8080/event-types/order_received/events
HTTP/1.1 200 OK
Content-Type: application/x-json-stream
{"cursor":{"partition":"0","offset":"6"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"5"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"4"},"events":[...]}\n
| Name | Located in | Description |
|---|---|---|
| name | path |
String
EventType name to get events about |
| X-nakadi-cursors | header |
Array of
String
format:
#/definitions/Cursor
Cursors indicating the partitions to read from and respective starting offsets. |
| batch_limit | query |
Integer
format:
int32
default:
1
Maximum number of |
| stream_limit | query |
Integer
format:
int32
default:
0
Maximum number of
|
| batch_flush_timeout | query |
Number
format:
int32
default:
30
Maximum time in seconds to wait for the flushing of each chunk (per partition).
|
| stream_timeout | query |
Number
format:
int32
default:
0
minimum:
0
maximum:
4200
Maximum time in seconds a stream will live before connection is closed by the server. |
| stream_keep_alive_limit | query |
Integer
format:
int32
default:
0
Maximum number of empty keep alive batches to get in a row before closing the connection. |
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
Starts streaming to the client.
Stream format is a continuous series of EventStreamBatchs separated by \n
Not authenticated
Unprocessable entity
Too Many Requests. The client reached the maximum amount of simultaneous connections to a single partition
Access is forbidden for the client or event type
Publishes a batch of Events of this EventType. All items must be of the EventType
identified by name.
Reception of Events will always respect the configuration of its EventType with respect to
validation, enrichment and partition. The steps performed on reception of incoming message
are:
EventType will be checked in order against theBatchItemResponse. If theEventType defines schema validation it will be performed at this moment. The size of eachEventType. No preexistingBatchItemResponse object.EventType. Failure to evaluate the rule will reject the Event.| Name | Located in | Description |
|---|---|---|
| name | path |
String
Name of the EventType |
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
| event | body |
Array of
Event
The Event being published |
All events in the batch have been successfully published.
At least one event has failed to be submitted. The batch might be partially submitted.
Client is not authenticated
At least one event failed to be validated, enriched or partitioned. None were submitted.
Access is forbidden for the client or event type
Lists the Partitions for the given event-type.
This endpoint is mostly interesting for monitoring purposes or in cases when consumer wants
to start consuming older messages.
If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.
| Name | Located in | Description |
|---|---|---|
| name | path |
String
EventType name |
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
| cursors | query |
Array of
String
format:
#/definitions/Cursor
Each cursor indicates the partition and offset consumed by the client. When this parameter is provided, |
Returns the given Partition of this EventType. If per-EventType authorization is enabled, the caller must
be authorized to read from the EventType.
| Name | Located in | Description |
|---|---|---|
| name | path |
String
EventType name |
| partition | path |
String
Partition id |
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
| consumed_offset | query |
String
Offset to query for unconsumed events. Depends on |
List of schemas ordered from most recent to oldest.
| Name | Located in | Description |
|---|---|---|
| name | path |
String
EventType name |
| limit | query |
Integer
format:
int64
default:
20
minimum:
1
maximum:
1000
maximum number of schemas retuned in one page |
| offset | query |
Integer
format:
int64
default:
0
minimum:
0
page offset |
list of schemas.
OK
Retrieves a given schema version. A special {version} key named ‘latest’ is provided for
convenience.
| Name | Located in | Description |
|---|---|---|
| name | path |
String
EventType name |
| version | path |
String
EventType schema version |
Schema object.
Transforms a list of Cursors with shift into a list without shifts. This is useful when there is the need
for randomly access events in the stream.
If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.
| Name | Located in | Description |
|---|---|---|
| name | path |
String
EventType name |
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
| cursors | body |
Array of
ShiftedCursor
GET with payload. |
An array of Cursors with shift applied to the offset. The response should contain cursors in the same
order as the request.
OK
It’s only possible to navigate from a valid cursor to another valid cursor, i.e. partitions and
offsets must exist and not be expired. Any combination of parameters that might break this rule will
result in 422. For example:
partition and offset are expired.shift provided leads to a already expired cursor.shift provided leads to a cursor that is not yet existent, i.e. it’s pointing toAccess forbidden because of missing scope or EventType authorization failure.
List timelines for a given event type.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
| name | path |
String
Name of the EventType to list timelines for. |
list of timelines.
OK
No such event type
Access forbidden
Creates a new timeline for an event type and makes it active.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
| name | path |
String
Name of the EventType |
| timeline_request | body |
Object
Object properties:
storage_id
:
String
Storage id to be used for timeline creation |
Get monitoring metrics
| Name | Located in | Description |
|---|---|---|
Lists all of the enrichment strategies supported by this Nakadi installation. Special or
custom strategies besides the defaults will be listed here.
| Name | Located in | Description |
|---|---|---|
Returns a list of all enrichment strategies known to Nakadi
Client is not authenticated
Lists all of the partition resolution strategies supported by this installation of Nakadi.
Special or custom strategies besides the defaults will be listed here.
Nakadi currently offers these inbuilt strategies:
random: Resolution of the target partition happens randomly (events are evenlyuser_defined: Target partition is defined by the client. As long as the indicatedmetadata.partition (See EventMetadata). Failure to dohash: Resolution of the partition follows the computation of a hash from the value ofpartition_key_fields, guaranteeing that Events| Name | Located in | Description |
|---|---|---|
Returns a list of all partitioning strategies known to Nakadi
Client is not authenticated
Lists all administrator permissions. This endpoint is restricted to administrators with the ‘read’ permission.
| Name | Located in | Description |
|---|---|---|
List all administrator permissions.
Client is not authenticated
Access forbidden
Updates the list of administrators. This endpoint is restricted to administrators with the ‘admin’ permission.
| Name | Located in | Description |
|---|---|---|
| authorization | body |
AdminAuthorization
Lists of administrators |
List all administrator permissions.
Client is not authenticated
Access forbidden
Unprocessable entity due to not enough administrators in a list
Lists all blocked producers/consumers divided by app and event type.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
a list of all blocked event types for publishing events.
a list of all blocked apps for publishing events.
a list of all blocked producers.
a list of all blocked event types for consuming events.
a list of all blocked apps for consuming events.
a list of all blocked consumers.
Lists all blocked producers/consumers.
Blocks publication/consumption for particular app or event type.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
| blacklist_type | path |
String
Type of the blacklist to put client into.
|
| name | path |
String
Name of the client to block. |
Client or event type was successfully blocked.
Unblocks publication/consumption for particular app or event type.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
| blacklist_type | path |
String
Type of the blacklist to put client into.
|
| name | path |
String
Name of the client to unblock. |
Client was successfully unblocked.
Lists all available features.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
list of features.
A list of all available features.
Enables or disables feature depends on the payload
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
| feature | body | Feature |
Feature was successfully accepted.
Lists all available storage backends.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
Creates a new storage backend.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
| storage | body |
Storage
Storage description |
Sets default storage to use in Nakadi.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
| id | path |
String
storage backend ID |
Retrieves a storage backend by its ID.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
| id | path |
String
storage backend ID |
Deletes a storage backend from its ID, if it is not in use.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.
| Name | Located in | Description |
|---|---|---|
| id | path |
String
storage backend ID |
Lists all subscriptions that exist in a system. List is ordered by creation date/time descending (newest
subscriptions come first).
| Name | Located in | Description |
|---|---|---|
| owning_application | query |
String
Parameter to filter subscriptions list by owning application. If not specified - the result list will |
| event_type | query |
Array of
String
collectionFormat:
multi
Parameter to filter subscriptions list by event types. If not specified - the result list will contain |
| limit | query |
Integer
format:
int64
default:
20
minimum:
1
maximum:
1000
maximum number of subscriptions retuned in one page |
| offset | query |
Integer
format:
int64
default:
0
minimum:
0
page offset |
| show_status | query |
Boolean
default:
false
show subscription status |
list of subscriptions
OK
Bad Request
This endpoint creates a subscription for EventTypes. The subscription is needed to be able to
consume events from EventTypes in a high level way when Nakadi stores the offsets and manages the
rebalancing of consuming clients.
The subscription is identified by its key parameters (owning_application, event_types, consumer_group). If
this endpoint is invoked several times with the same key subscription properties in body (order of even_types is
not important) - the subscription will be created only once and for all other calls it will just return
the subscription that was already created.
| Name | Located in | Description |
|---|---|---|
| subscription | body |
Subscription
Subscription to create |
Subscription for such parameters already exists. Returns subscription object that already
existed.
Subscription was successfuly created. Returns subscription object that was created.
Bad Request
Unprocessable Entity
Returns a subscription identified by id.
| Name | Located in | Description |
|---|---|---|
| subscription_id | path |
String
format:
uuid
Id of subscription. |
OK
Subscription not found
This endpoint only allows to update the authorization section of a subscription. All other properties are
immutable. This operation is restricted to subjects with administrative role.
| Name | Located in | Description |
|---|---|---|
| subscription | body |
Subscription
Subscription with modified authorization section. |
| subscription_id | path |
String
Id of subscription |
Deletes a subscription.
| Name | Located in | Description |
|---|---|---|
| subscription_id | path |
String
format:
uuid
Id of subscription. |
Exposes the currently committed offsets of a subscription.
| Name | Located in | Description |
|---|---|---|
| subscription_id | path |
String
format:
uuid
Id of subscription. |
Subscription not found
Endpoint for committing offsets of the subscription. If there is uncommited data, and no commits happen
for 60 seconds, then Nakadi will consider the client to be gone, and will close the connection. As long
as no events are sent, the client does not need to commit.
If the connection is closed, the client has 60 seconds to commit the events it received, from the moment
they were sent. After that, the connection will be considered closed, and it will not be possible to do
commit with that X-Nakadi-StreamId anymore.
When a batch is committed that also automatically commits all previous batches that were
sent in a stream for this partition.
| Name | Located in | Description |
|---|---|---|
| subscription_id | path |
String
Id of subscription |
| X-Nakadi-StreamId | header |
String
Id of stream which client uses to read events. It is not possible to make a commit for a terminated or |
| cursors | body |
Object
Object properties:
items
:
Array of
SubscriptionCursor
List of cursors that the consumer acknowledges to have successfully processed. |
Offsets were committed
list of items which describe commit result for each cursor
At least one cursor which was tried to be committed is older or equal to already committed one. Array
of commit results is returned for this status code.
Access forbidden
Subscription not found
Unprocessable Entity
Reset subscription offsets to specified values.
Client connected after this operation will get events starting from next offset position.
During this operation the subscription’s consumers will be disconnected. The request can hang up until
subscription commit timeout. During that time requests to subscription streaming endpoint
will be rejected with 409. The clients should reconnect once the request is finished with 204.
In case, when subscription was never streamed, and therefore does not have cursors initialized, this call
will first initialize starting offsets, and then perform actual patch.
In order to provide explicit cursor initialization functionality this method supports empty cursors list,
allowing to initialize subscription cursors without side effects.
| Name | Located in | Description |
|---|---|---|
| subscription_id | path |
String
Id of subscription |
| cursors | body |
Object
Object properties:
items
:
Array of
SubscriptionCursorWithoutToken
List of cursors to reset subscription to. |
Starts a new stream for reading events from this subscription. The data will be automatically rebalanced
between streams of one subscription. The minimal consumption unit is a partition, so it is possible to start as
many streams as the total number of partitions in event-types of this subscription. The rebalance currently
only operates with the number of partitions so the amount of data in event-types/partitions is not considered
during autorebalance.
The position of the consumption is managed by Nakadi. The client is required to commit the cursors he gets in
a stream.
| Name | Located in | Description |
|---|---|---|
| subscription_id | path |
String
format:
uuid
Id of subscription. |
| max_uncommitted_events | query |
Integer
format:
int32
default:
10
minimum:
1
The maximum number of uncommitted events that Nakadi will stream before pausing the stream. When in |
| batch_limit | query |
Integer
format:
int32
default:
1
Maximum number of |
| stream_limit | query |
Integer
format:
int32
default:
0
Maximum number of
|
| batch_flush_timeout | query |
Number
format:
int32
default:
30
Maximum time in seconds to wait for the flushing of each chunk (per partition).
|
| stream_timeout | query |
Number
format:
int32
default:
0
minimum:
0
maximum:
4200
Maximum time in seconds a stream will live before connection is closed by the server. |
| stream_keep_alive_limit | query |
Integer
format:
int32
default:
0
Maximum number of empty keep alive batches to get in a row before closing the connection. |
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
Ok. Stream started.
Stream format is a continuous series of SubscriptionEventStreamBatchs separated by \n
Bad Request
Access forbidden
Subscription not found.
Conflict. There are several possible reasons for receiving this status code:
GET with body.
Starts a new stream for reading events from this subscription. The minimal consumption unit is a partition, so
it is possible to start as many streams as the total number of partitions in event-types of this subscription.
The position of the consumption is managed by Nakadi. The client is required to commit the cursors he gets in
a stream.
If you create a stream without specifying the partitions to read from - Nakadi will automatically assign
partitions to this new stream. By default Nakadi distributes partitions among clients trying to give an equal
number of partitions to each client (the amount of data is not considered). This is default and the most common
way to use streaming endpoint.
It is also possible to directly request specific partitions to be delivered within the stream. If these
partitions are already consumed by another stream of this subscription - Nakadi will trigger a rebalance that
will assign these partitions to the new stream. The request will fail if user directly requests partitions that
are already requested directly by another active stream of this subscription. The overall picture will be the
following: streams which directly requested specific partitions will consume from them; streams that didn’t
specify which partitions to consume will consume partitions that left - Nakadi will autobalance free partitions
among these streams (balancing happens by number of partitions).
Specifying partitions to consume is not a trivial way to consume as it will require additional coordination
effort from the client application, that’s why it should only be used if such way of consumption should be
implemented due to some specific requirements.
Also, when using streams with directly assigned partitions, it is the user’s responsibility to detect, and react
to, changes in the number of partitions in the subscription (following the re-partitioning of an event type).
Using the GET /subscriptions/{subscription_id}/stats endpoint can be helpful.
| Name | Located in | Description |
|---|---|---|
| streamParameters | body |
Object
Object properties:
partitions
:
Array of
EventTypePartition
List of partitions to read from in this stream. If absent or empty - then the partitions will be
max_uncommitted_events
:
Integer
format:
int32
default:
10
minimum:
1
The maximum number of uncommitted events that Nakadi will stream before pausing the stream. When in
batch_limit
:
Integer
format:
int32
default:
1
Maximum number of
stream_limit
:
Integer
format:
int32
default:
0
Maximum number of
batch_flush_timeout
:
Number
format:
int32
default:
30
Maximum time in seconds to wait for the flushing of each chunk (per partition).
stream_timeout
:
Number
format:
int32
default:
0
minimum:
0
maximum:
4200
Maximum time in seconds a stream will live before connection is closed by the server. |
| subscription_id | path |
String
format:
uuid
Id of subscription. |
| X-Flow-Id | header |
String
The flow id of the request, which is written into the logs and passed to called services. Helpful |
Ok. Stream started.
Stream format is a continuous series of SubscriptionEventStreamBatchs separated by \n
Bad Request
Access forbidden
Subscription not found.
Conflict. There are several possible reasons for receiving this status code:
At least one of specified partitions doesn’t belong to this subscription.
exposes statistics of specified subscription
| Name | Located in | Description |
|---|---|---|
| subscription_id | path |
String
format:
uuid
Id of subscription. |
| show_time_lag | query |
Boolean
default:
false
show consumer time lag |
statistics list for specified subscription
Ok
Subscription not found
Authorization section for admin operations. This section defines three access control lists: one for writing to admin endpoints and producing events (‘writers’), one for reading from admin endpoints and consuming events (‘readers’), and one for updating the list of administrators (‘admins’).
| Name | Description |
|---|---|
| admins |
Array of
AuthorizationAttribute
minItems:
1
An array of subject attributes that are required for updating the list of administrators. Any one of the |
| readers |
Array of
AuthorizationAttribute
minItems:
1
An array of subject attributes that are required for reading from admin endpoints. Any one of the |
| writers |
Array of
AuthorizationAttribute
minItems:
1
An array of subject attributes that are required for writing to admin endpoints. Any one of the |
An attribute for authorization. This object includes a data type, which represents the type of the attribute attribute (which data types are allowed depends on which authorization plugin is deployed, and how it is configured), and a value. A wildcard can be represented with data type ‘’, and value ‘’. It means that all authenticated users are allowed to perform an operation.
| Name | Description |
|---|---|
| data_type |
String
the type of attribute (e.g., ‘team’, or ‘permission’, depending on the Nakadi configuration) |
| value |
String
the value of the attribute |
A status corresponding to one individual Event’s publishing attempt.
| Name | Description |
|---|---|
| eid |
String
format:
uuid
eid of the corresponding item. Will be absent if missing on the incoming Event. |
| publishing_status |
String
enum:
submitted, failed, aborted
Indicator of the submission of the Event within a Batch.
|
| step |
String
enum:
none, validating, partitioning, enriching, publishing
Indicator of the step in the publishing process this Event reached.
|
| detail |
String
Human readable information about the failure on this item. Items that are not “submitted” |
A Business Event.
Usually represents a status transition in a Business process.
| Name | Description |
|---|---|
| metadata | EventMetadata |
| Name | Description |
|---|---|
| partition |
String
Id of the partition pointed to by this cursor. |
| offset |
String
Offset of the event being pointed to.
|
The result of single cursor commit. Holds a cursor itself and a result value.
| Name | Description |
|---|---|
| cursor | SubscriptionCursor |
| result |
String
The result of cursor commit.
|
Number of events between two offsets. Initial offset is exclusive. It’s only zero when both provided offsets
are equal.
A Data change Event.
Represents a change on a resource. Also contains indicators for the data type and the type of operation performed.
| Name | Description |
|---|---|
| data_type |
String
example:
pennybags:order
|
| data_op |
String
enum:
C, U, D, S
The type of operation executed on the entity.
|
| metadata | EventMetadata |
| data |
Object
The payload of the type |
Note The Event definition will be externalized in future versions of this document.
A basic payload of an Event. The actual schema is dependent on the information configured for the EventType, as is its enforcement (see POST /event-types). Setting of metadata properties are dependent on the configured enrichment as well.
For explanation on default configurations of validation and enrichment, see documentation of
EventType.category.
For concrete examples of what will be enforced by Nakadi see the objects BusinessEvent and DataChangeEvent below.
Metadata for this Event.
Contains commons fields for both Business and DataChange Events. Most are enriched by Nakadi upon reception, but they in general MIGHT be set by the client.
| Name | Description |
|---|---|
| eid |
String
format:
uuid
example:
105a76d8-db49-4144-ace7-e683e8f4ba46
Identifier of this Event. |
| event_type |
String
example:
pennybags.payment-business-event
The EventType of this Event. This is enriched by Nakadi on reception of the Event |
| occurred_at |
String
format:
RFC 3339 date-time
example:
1996-12-19T16:39:57-08:00
Timestamp of creation of the Event generated by the producer. |
| received_at |
String
readOnly:
true
format:
RFC 3339 date-time
example:
1996-12-19T16:39:57-08:00
Timestamp of the reception of the Event by Nakadi. This is enriched upon reception of |
| version |
String
readOnly:
true
Version of the schema used for validating this event. This is enriched upon reception. |
| parent_eids |
Array of
String
format:
uuid
example:
105a76d8-db49-4144-ace7-e683e8f4ba46
Event identifier of the Event that caused the generation of this Event. |
| flow_id |
String
example:
JAh6xH4OQhCJ9PutIV_RYw
The flow-id of the producer of this Event. As this is usually a HTTP header, this is |
| partition |
String
example:
0
Indicates the partition assigned to this Event. |
| partition_compaction_key |
String
example:
329ed3d2-8366-11e8-adc0-fa7ae01bbebc
Value used for per-partition compaction of the event type. Given two events with the same |
One chunk of events in a stream. A batch consists of an array of Events plus a Cursor
pointing to the offset of the last Event in the stream.
The size of the array of Event is limited by the parameters used to initialize a Stream.
If acting as a keep alive message (see GET /event-type/{name}/events) the events array will
be omitted.
Sequential batches might present repeated cursors if no new events have arrived.
| Name | Description |
|---|---|
| cursor | Cursor |
| info | StreamInfo |
| events |
Array of
Event
|
An event type defines the schema and its runtime properties.
| Name | Description |
|---|---|
| name |
String
pattern:
[a-zA-Z][-0-9a-zA-Z_]*(\.[0-9a-zA-Z][-0-9a-zA-Z_]*)*
example:
order.order_cancelled, acme-platform.users
Name of this EventType. The name is constrained by a regular expression. |
| owning_application |
String
example:
price-service
Indicator of the (Stups) Application owning this |
| category |
String
enum:
undefined, data, business
Defines the category of this EventType.
|
| enrichment_strategies |
Array of
String
enum:
metadata_enrichment
Determines the enrichment to be performed on an Event upon reception. Enrichment is |
| partition_strategy |
String
default:
random
Determines how the assignment of the event to a partition should be handled. |
| compatibility_mode |
String
default:
forward
Compatibility mode provides a mean for event owners to evolve their schema, given changes respect the
|
| schema | EventTypeSchema |
| partition_key_fields |
Array of
String
Required when ‘partition_resolution_strategy’ is set to ‘hash’. Must be absent otherwise. |
| cleanup_policy |
String
x-extensible-enum:
delete
compact
default:
delete
Event type cleanup policy. There are two possible values:
|
| default_statistic | EventTypeStatistics |
| options | EventTypeOptions |
| authorization | EventTypeAuthorization |
| ordering_key_fields |
Array of
String
example:
data.incremental_counter
Indicates a single ordering field. This is a dot separated string, which is applied This is only an informational field. The events are delivered to consumers in the order they were published. |
| audience |
String
x-extensible-enum:
component-internal
business-unit-internal
company-internal
external-partner
external-public
Intended target audience of the event type. Relevant for standards around quality of design and documentation, |
| created_at |
String
pattern:
RFC 3339 date-time
Date and time when this event type was created. |
| updated_at |
String
pattern:
RFC 3339 date-time
Date and time when this event type was last updated. |
Authorization section for an event type. This section defines three access control lists: one for producing events (‘writers’), one for consuming events (‘readers’), and one for administering an event type (‘admins’). Regardless of the values of the authorization properties, administrator accounts will always be authorized.
| Name | Description |
|---|---|
| admins |
Array of
AuthorizationAttribute
minItems:
1
An array of subject attributes that are required for updating the event type. Any one of the attributes |
| readers |
Array of
AuthorizationAttribute
minItems:
1
An array of subject attributes that are required for reading events from the event type. Any one of the |
| writers |
Array of
AuthorizationAttribute
minItems:
1
An array of subject attributes that are required for writing events to the event type. Any one of the |
Additional parameters for tuning internal behavior of Nakadi.
| Name | Description |
|---|---|
| retention_time |
Integer
format:
int64
default:
345600000
Number of milliseconds that Nakadi stores events published to this event type. |
represents event-type:partition pair.
| Name | Description |
|---|---|
| event_type |
String
event-type name. |
| partition |
String
partition of the event-type. |
The most recent schema for this EventType. Submitted events will be validated against it.
| Name | Description |
|---|---|
| version |
String
readOnly:
true
default:
1.0.0
This field is automatically generated by Nakadi. Values are based on semantic versioning. Changes to |
| created_at |
String
readOnly:
true
format:
RFC 3339 date-time
example:
1996-12-19T16:39:57-08:00
Creation timestamp of the schema. This is generated by Nakadi. It should not be |
| type |
String
enum:
json_schema
The type of schema definition. Currently only json_schema (JSON Schema v04) is supported, but in the |
| schema |
String
The schema as string in the syntax defined in the field type. Failure to respect the |
Operational statistics for an EventType. This data may be provided by users on Event Type creation. Nakadi uses this object in order to provide an optimal number of partitions from a throughput perspective.
| Name | Description |
|---|---|
| messages_per_minute |
Integer
Write rate for events of this EventType. This rate encompasses all producers of this |
| message_size |
Integer
Average message size for each Event of this EventType. Includes in the count the whole serialized |
| read_parallelism |
Integer
Amount of parallel readers (consumers) to this EventType. |
| write_parallelism |
Integer
Amount of parallel writers (producers) to this EventType. |
URI identifying another page of items
| Name | Description |
|---|---|
| href |
String
format:
uri
example:
/subscriptions?offset=20&limit=10
|
contains links to previous and next pages of items
| Name | Description |
|---|---|
| prev | PaginationLink |
| next | PaginationLink |
Partition information. Can be helpful when trying to start a stream using an unmanaged API.
This information is not related to the state of the consumer clients.
| Name | Description |
|---|---|
| partition | String |
| oldest_available_offset |
String
An offset of the oldest available Event in that partition. This value will be changing |
| newest_available_offset |
String
An offset of the newest available Event in that partition. This value will be changing |
| unconsumed_events |
Number
format:
int64
Approximate number of events unconsumed by the client. This is also known as consumer lag and is used for |
| Name | Description |
|---|---|
| type |
String
format:
uri
example:
http://httpstatus.es/503
An absolute URI that identifies the problem type. When dereferenced, it SHOULD provide |
| title |
String
example:
Service Unavailable
A short, summary of the problem type. Written in English and readable for engineers |
| status |
Integer
format:
int32
example:
503
The HTTP status code generated by the origin server for this occurrence of the problem. |
| detail |
String
example:
Connection to database timed out
A human readable explanation specific to this occurrence of the problem. |
| instance |
String
format:
uri
An absolute URI that identifies the specific occurrence of the problem. |
This number is a modifier for the offset. It moves the cursor forward or backwards by the number of events
provided.
For example, suppose a user wants to read events starting 100 positions before offset
“001-000D-0000000000000009A8”, it’s possible to specify shift with -100 and Nakadi will make the
necessary calculations to move the cursor backwards relatively to the given offset.
Users should use this feature only for debugging purposes. Users should favor using cursors provided in
batches when streaming from Nakadi. Navigating in the stream using shifts is provided only for
debugging purposes.
A storage backend.
| Name | Description |
|---|---|
| id |
String
maxLength:
36
The ID of the storage backend. |
| storage_type |
String
the type of storage. Possible values: [‘kafka’] |
| kafka_configuration |
Object
Object properties:
exhibitor_address
:
String
the Zookeeper address
exhibitor_port
:
String
the Zookeeper path
zk_address
:
String
the Zookeeper address
zk_path
:
String
the Zookeeper path configuration settings for kafka storage. Only necessary if the storage type is ‘kafka’ |
This object contains general information about the stream. Used only for debugging purposes. We recommend logging this object in order to solve connection issues. Clients should not parse this structure.
Subscription is a high level consumption unit. Subscriptions allow applications to easily scale the number of clients by managing consumed event offsets and distributing load between instances. The key properties that identify subscription are ‘owning_application’, ‘event_types’ and ‘consumer_group’. It’s not possible to have two different subscriptions with these properties being the same.
| Name | Description |
|---|---|
| id |
String
readOnly:
true
Id of subscription that was created. Is generated by Nakadi, should not be specified when creating |
| owning_application |
String
example:
gizig
minLength:
1
The id of application owning the subscription. |
| event_types |
Array of
String
EventTypes to subscribe to. |
| consumer_group |
String
example:
read-product-updates
minLength:
1
default:
default
The value describing the use case of this subscription. |
| created_at |
String
readOnly:
true
format:
RFC 3339 date-time
example:
1996-12-19T16:39:57-08:00
Timestamp of creation of the subscription. This is generated by Nakadi. It should not be |
| read_from |
String
default:
end
Position to start reading events from. Currently supported values:
|
| initial_cursors |
Array of
SubscriptionCursorWithoutToken
List of cursors to start reading from. This property is required when |
| status |
Array of
SubscriptionEventTypeStatus
Subscription status. This data is only available when querying the subscriptions endpoint for |
| authorization | SubscriptionAuthorization |
Authorization section of a Subscription. This section defines two access control lists: one for consuming events and committing cursors (‘readers’), and one for administering a subscription (‘admins’). Regardless of the values of the authorization properties, administrator accounts will always be authorized.
| Name | Description |
|---|---|
| admins |
Array of
AuthorizationAttribute
minItems:
1
An array of subject attributes that are required for updating the subscription. Any one of the attributes |
| readers |
Array of
AuthorizationAttribute
minItems:
1
An array of subject attributes that are required for reading events and committing cursors to this |
The name of the event type this partition’s events belong to.
An opaque value defined by the server.
The name of the event type this partition’s events belong to.
Analogue to EventStreamBatch but used for high level streamming. It includes specific cursors for committing in the high level API.
| Name | Description |
|---|---|
| cursor | SubscriptionCursor |
| info | StreamInfo |
| events |
Array of
Event
|
statistics of one event-type within a context of subscription
| Name | Description |
|---|---|
| event_type |
String
event-type name |
| partitions |
Array of
Object
Object properties:
partition
:
String
the partition id
state
:
String
The state of this partition in current subscription. Currently following values are possible:
unconsumed_events
:
Number
The amount of events in this partition that are not yet consumed within this subscription.
consumer_lag_seconds
:
Number
Subscription consumer lag for this partition in seconds. Measured as the age of the oldest event of
stream_id
:
String
the id of the stream that consumes data from this partition
assignment_type
:
String
statistics of partition within a subscription context statistics of partitions of this event-type |
Status of one event-type within a context of subscription
| Name | Description |
|---|---|
| event_type |
String
event-type name |
| partitions |
Array of
Object
Object properties:
partition
:
String
The partition id
state
:
String
The state of this partition in current subscription. Currently following values are possible:
stream_id
:
String
the id of the stream that consumes data from this partition
assignment_type
:
String
status of partition within a subscription context status of partitions of this event-type |