API Reference

API reference for DC/OS Apache Kafka

The DC/OS Apache Kafka Service implements a REST API that may be accessed from outside the cluster. The <dcos_url> parameter referenced below indicates the base URL of the DC/OS cluster on which the DC/OS Apache Kafka Service is deployed.

REST API Authentication

REST API requests must be authenticated. This authentication is only applicable for interacting with the Apache Kafka REST API directly. You do not need the token to access the Apache Kafka nodes themselves.

If you are using Enterprise DC/OS, follow these instructions to create a service account and an authentication token. You can then configure your service to automatically refresh the authentication token when it expires. To get started more quickly, you can also get the authentication token without a service account, but you will need to manually refresh the token.

If you are using open source DC/OS, follow these instructions to pass your HTTP API token to the DC/OS endpoint.

Once you have the authentication token, you can store it in an environment variable and reference it in your REST API calls:

export auth_token=uSeR_t0k3n

The curl examples in this document assume that an auth token has been stored in an environment variable named auth_token.

If you are using Enterprise DC/OS, the security mode of your installation may also require the --ca-cert flag when making REST calls. Refer to Obtaining and passing the DC/OS certificate in cURL requests for information on how to use the --cacert flag. If your security mode is disabled, do not use the --ca-cert flag.

Plan API

The Plan API provides endpoints for monitoring and controlling service installation and configuration updates.

List plans

You may list the configured plans for the service. By default, all services at least have a deploy plan and a recovery plan. Some services may have additional custom plans defined.

curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans
dcos kafka --name=kafka plan list

View plan

You may view the current state of a listed plan:

curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans/<plan>

The CLI may be used to show a formatted tree of the plan (default), or the underlying JSON data as retrieved from the above HTTP endpoint:

dcos kafka --name=kafka plan show <plan>
dcos kafka --name=kafka plan show <plan> --json

Pause plan

The installation will pause after completing the operation of the current node and wait for user input before proceeding further.

curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans/deploy/interrupt
dcos kafka --name=kafka plan pause deploy

Resume plan

The REST API request below will resume the operation at the next pending node.

curl -X PUT -H "Authorization:token=$auth_token" <dcos_surl>/service/kafka/v1/plans/deploy/continue
dcos kafka --name=kafka plan continue deploy

Nodes API

The pod API provides endpoints for retrieving information about nodes, restarting them, and replacing them.

List Nodes

A list of available node ids can be retrieved by sending a GET request to /v1/pod:

CLI Example

dcos kafka --name=kafka pod list

HTTP Example

curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod

Node Info

You can retrieve node information by sending a GET request to /v1/pod/<node-id>/info:

curl  -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/info

CLI Example

dcos kafka --name=kafka pod info journalnode-0

HTTP Example

curl  -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/journalnode-0/info

Replace a Node

The replace endpoint command can be used to replace a node with an instance running on another agent node.

CLI Example

dcos kafka --name=kafka pod replace <node-id>

HTTP Example

curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/replace

If the operation succeeds, a 200 OK is returned.

Restart a Node

The restart endpoint command can be used to restart a node in place on the same agent node.

CLI Example

dcos kafka --name=kafka pod restart <node-id>

HTTP Example

curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/restart

If the operation succeeds a 200 OK is returned.

Pause a Node

The pause endpoint command can be used to relaunch a node in an idle command state for debugging purposes.

CLI example

dcos kafka --name=kafka debug pod pause <node-id>

HTTP Example

curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/pause

Configuration API

The configuration API provides an endpoint to view current and previous configurations of the cluster.

View Target Config

You can view the current target configuration by sending a GET request to /v1/configurations/target.

CLI Example

dcos kafka --name=kafka config target

HTTP Example

curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations/target

List Configs

You can list all configuration IDs by sending a GET request to /v1/configurations.

CLI Example

dcos kafka --name=kafka config list
[
  "319ebe89-42e2-40e2-9169-8568e2421023",
  "294235f2-8504-4194-b43d-664443f2132b"
]

HTTP Example

curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations
[
  "319ebe89-42e2-40e2-9169-8568e2421023",
  "294235f2-8504-4194-b43d-664443f2132b"
]

View Specified Config

You can view a specific configuration by sending a GET request to /v1/configurations/<config-id>.

CLI Example

dcos kafka --name=kafka config show 9a8d4308-ab9d-4121-b460-696ec3368ad6

HTTP Example

curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations/9a8d4308-ab9d-4121-b460-696ec3368ad6

Topic Operations

These operations mirror what is available with bin/kafka-topics.sh.

List Topics

dcos kafka --name=kafka topic list
[
  "topic1",
  "topic0"
]
curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics"
[
  "topic1",
  "topic0"
]

Describe Topic

dcos kafka --name=kafka topic describe topic1
{
  "partitions": [
  {
    "0": {
      "controller_epoch": 1,
      "isr": [
        0,
        1,
        2
      ],
      "leader": 0,
      "leader_epoch": 0,
      "version": 1
    }
  },
  {
    "1": {
      "controller_epoch": 1,
      "isr": [
        1,
        2,
        0
      ],
      "leader": 1,
      "leader_epoch": 0,
      "version": 1
    }
  },
  {
    "2": {
      "controller_epoch": 1,
      "isr": [
        2,
        0,
        1
      ],
      "leader": 2,
      "leader_epoch": 0,
      "version": 1
    }
  }
  ]
}
curl -X GET -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1"
{
  "partitions": [
  {
    "0": {
      "controller_epoch": 1,
      "isr": [
        0,
        1,
        2
      ],
      "leader": 0,
      "leader_epoch": 0,
      "version": 1
    }
  },
  {
    "1": {
      "controller_epoch": 1,
      "isr": [
        1,
        2,
        0
      ],
      "leader": 1,
      "leader_epoch": 0,
      "version": 1
    }
  },
  {
    "2": {
      "controller_epoch": 1,
      "isr": [
        2,
        0,
        1
      ],
      "leader": 2,
      "leader_epoch": 0,
      "version": 1
    }
  }
  ]
}

Create Topic

dcos kafka --name=kafka topic create topic1 --partitions=3 --replication=3
{
  "message": "Output: Created topic \"topic1\"\n"
}
curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?partitions=3&replication=3"
{
  "message": "Output: Created topic \"topic1\"\n"
}

View Topic Offsets

There is an optional --time parameter which may be set to either “first”, “last”, or a timestamp in milliseconds as described in the Kafka documentation.

dcos kafka --name=kafka topic offsets topic1 --time=last
[
  {
    "2": "334"
  },
  {
    "1": "333"
  },
  {
    "0": "333"
  }
]
curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1/offsets?time=-1"
[
  {
    "2": "334"
  },
  {
    "1": "333"
  },
  {
    "0": "333"
  }
]

Alter Topic Partition Count

dcos kafka --name=kafka topic partitions topic1 2
{
  "message": "Output: WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectednAdding partitions succeeded!n"
}
curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?operation=partitions&partitions=2"
{
  "message": "Output: WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectednAdding partitions succeeded!n"
}

Run Producer Test on Topic

dcos kafka --name=kafka topic producer_test topic1 10
{
  "message": "10 records sent, 70.422535 records/sec (0.07 MB/sec), 24.20 ms avg latency, 133.00 ms max latency, 13 ms 50th, 133 ms 95th, 133 ms 99th, 133 ms 99.9th.n"
}
curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1/operation/producer-test&messages=10"
{
  "message": "10 records sent, 70.422535 records/sec (0.07 MB/sec), 24.20 ms avg latency, 133.00 ms max latency, 13 ms 50th, 133 ms 95th, 133 ms 99th, 133 ms 99.9th.n"
}

The above commands run the equivalent of the following command from the machine running the Kafka Scheduler:

kafka-producer-perf-test.sh \
  --topic <topic> \
  --num-records <messages> \
  --throughput 100000 \
  --record-size 1024 \
  --producer-props bootstrap.servers=<current broker endpoints>

Delete Topic

dcos kafka --name=kafka topic delete topic1
{
  "message": "Topic topic1 is marked for deletion.nNote: This will have no impact if delete.topic.enable is not set to true.n"
}
curl -X DELETE -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1"
{
  "message": "Topic topic1 is marked for deletion.nNote: This will have no impact if delete.topic.enable is not set to true.n"
}

NOTE: Note the warning in the output from the commands above. You can change the indicated `delete.topic.enable` configuration value as a configuration change.

List Under Replicated Partitions

dcos kafka --name=kafka topic under_replicated_partitions
{
  "message": ""
}
curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/under_replicated_partitions"
{
  "message": ""
}

List Unavailable Partitions

dcos kafka --name=kafka topic unavailable_partitions
{
  "message": ""
}
curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/unavailable_partitions"
{
  "message": ""
}