API 参考

DC/OS Apache Kafka 的 API 参考

DC/OS Apache Kafka 服务实施可从群集外部访问的 REST API。下面引用的<dcos_url> 参数指示了部署 Apache Kafka 服务的 DC/OS 群集的基础 URL。

REST API 身份认证

REST API 请求必须经过身份认证。此身份认证仅适用于直接与 Apache Kafka REST API 的交互。访问 Apache Kafka 节点自身不需要令牌。

如果您正在使用 Enterprise DC/OS,请遵循以下说明 创建服务帐户和身份认证令牌。然后,您可以配置服务以在其到期时自动刷新身份认证令牌。要更快速地开始,您还可以在没有服务帐户的情况下获得认证令牌,但您需要手动刷新令牌。

如果您正在使用开源 DC/OS,请遵循以下说明 将您的 HTTP API 令牌传递到 DC/OS 端点

您拥有身份认证令牌之后,您可以将其存储在环境变量中,并在您的 REST API 调用中引用:

$ export auth_token=uSeR_t0k3n

本文中的curl示例假定身份认证令牌已存储在名为auth_token的环境变量中。

如果您正在使用 Enterprise DC/OS,安全模式安装在调用 REST 时也可能需要 --ca-cert 标记。请参阅 在 cURL 请求中获取并传递 DC/OS 证书 了解如何使用 --cacert 标记。如果您的安全模式是 disabled,不要使用 --ca-cert 标记。

Plan API

Plan API 提供用于监控和控制服务安装和配置更新的端点。

列表计划

您可以列出服务的已配置计划。默认情况下,所有服务至少有一个 deploy 计划和一个 recovery 计划。某些服务可能已定义额外的自定义计划。

$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans
$ dcos kafka --name=kafka plan list

查看计划

您可以查看所列计划的当前状态:

$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans/<plan>

CLI 可用于显示计划的格式树(默认),或从上述 HTTP 端点检索的基础 JSON 数据:

$ dcos kafka --name=kafka plan show <plan>
$ dcos kafka --name=kafka plan show <plan> --json

暂停计划

完成当前节点的操作后,安装将暂停,并等待用户输入后再继续。

$ curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans/deploy/interrupt
$ dcos kafka --name=kafka plan pause deploy

恢复计划

下面的 REST API 请求将恢复下一个挂起节点的操作。

$ curl -X PUT -H "Authorization:token=$auth_token" <dcos_surl>/service/kafka/v1/plans/deploy/continue
$ dcos kafka --name=kafka plan continue deploy

节点 API

pod API 提供用于检索节点信息、重新启动节点和更换节点的端点。

列表节点

可以通过发送 GET 请求到 /v1/pod来检索可用节点 ID 的列表:

CLI 示例

$ dcos kafka --name=kafka pod list

HTTP 示例

$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod

节点信息

您可以通过向 /v1/pod/<node-id>/info发送 GET 请求来检索节点信息:

$ curl  -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/info

CLI 示例

$ dcos kafka --name=kafka pod info journalnode-0

HTTP 示例

$ curl  -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/journalnode-0/info

替换节点

可以使用 replace endpoint 命令用其他代理节点上运行的实例来替换节点。

CLI 示例

$ dcos kafka --name=kafka pod replace <node-id>

HTTP 示例

$ curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/replace

如果操作成功,返回200 OK

重新启动节点

可以使用 restart endpoint 命令重新启动在同一代理节点上的节点。

CLI 示例

$ dcos kafka --name=kafka pod restart <node-id>

HTTP 示例

$ curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/restart

如果操作成功,返回 200 OK

暂停节点

可以使用 pause endpoint 命令重新启动在空闲命令状态下的节点,用于调试。

CLI 示例

dcos kafka --name=kafka debug pod pause <node-id>

HTTP 示例

$ curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/pause

配置 API

配置 API 提供查看群集的当前和先前配置的端点。

查看目标配置

您可以通过发送 GET 请求到 /v1/configurations/target 来查看当前目标配置。

CLI 示例

$ dcos kafka --name=kafka config target

HTTP 示例

$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations/target

列表配置

您可以通过发送 GET 请求到 /v1/configurations来列出所有配置 ID。

CLI 示例

$ dcos kafka --name=kafka config list
[
  "319ebe89-42e2-40e2-9169-8568e2421023",
  "294235f2-8504-4194-b43d-664443f2132b"
]

HTTP 示例

$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations
[
  "319ebe89-42e2-40e2-9169-8568e2421023",
  "294235f2-8504-4194-b43d-664443f2132b"
]

查看指定的配置

您可以通过向 /v1/configurations/ 发送 GET 请求来查看指定配置<config-id>.

CLI 示例

$ dcos kafka --name=kafka config show 9a8d4308-ab9d-4121-b460-696ec3368ad6

HTTP 示例

$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations/9a8d4308-ab9d-4121-b460-696ec3368ad6

主题操作

这些操作会镜像 bin/kafka-topics.sh 所提供的内容。

列表主题

$ dcos kafka --name=kafka topic list
[
  "topic1",
  "topic0"
]
$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics"
[
  "topic1",
  "topic0"
]

描述主题

$ dcos kafka --name=kafka topic describe topic1
{
  "partitions": [
  {
    "0": {
      "controller_epoch": 1,
      "isr": [
        0,
        1,
        2
      ],
      "leader": 0,
      "leader_epoch": 0,
      "version": 1
    }
  },
  {
    "1": {
      "controller_epoch": 1,
      "isr": [
        1,
        2,
        0
      ],
      "leader": 1,
      "leader_epoch": 0,
      "version": 1
    }
  },
  {
    "2": {
      "controller_epoch": 1,
      "isr": [
        2,
        0,
        1
      ],
      "leader": 2,
      "leader_epoch": 0,
      "version": 1
    }
  }
  ]
}
$ curl -X POST -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1"
{
  "partitions": [
  {
    "0": {
      "controller_epoch": 1,
      "isr": [
        0,
        1,
        2
      ],
      "leader": 0,
      "leader_epoch": 0,
      "version": 1
    }
  },
  {
    "1": {
      "controller_epoch": 1,
      "isr": [
        1,
        2,
        0
      ],
      "leader": 1,
      "leader_epoch": 0,
      "version": 1
    }
  },
  {
    "2": {
      "controller_epoch": 1,
      "isr": [
        2,
        0,
        1
      ],
      "leader": 2,
      "leader_epoch": 0,
      "version": 1
    }
  }
  ]
}

创建主题

$ dcos kafka --name=kafka topic create topic1 --partitions=3 --replication=3
{
  "message": "Output: Created topic \"topic1\"\n"
}
$ curl -X POST -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?partitions=3&replication=3"
{
  "message": "Output: Created topic \"topic1\"\n"
}

查看主题偏移

可选的 --time 参数可以被设置为“first”、“last” 或时间戳(以毫秒为单位),如 Kafka 文档中所述

$ dcos kafka --name=kafka topic offsets topic1 --time=last
[
  {
    "2": "334"
  },
  {
    "1": "333"
  },
  {
    "0": "333"
  }
]
$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1/offsets?time=-1"
[
  {
    "2": "334"
  },
  {
    "1": "333"
  },
  {
    "0": "333"
  }
]

修改主题分区计数

$ dcos kafka --name=kafka topic partitions topic1 2
{
  "message": "Output: WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectednAdding partitions succeeded!n"
}
$ curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?operation=partitions&partitions=2"
{
  "message": "Output: WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectednAdding partitions succeeded!n"
}

在主题上运行生产者测试

$ dcos kafka --name=kafka topic producer_test topic1 10
{
  "message": "10 records sent, 70.422535 records/sec (0.07 MB/sec), 24.20 ms avg latency, 133.00 ms max latency, 13 ms 50th, 133 ms 95th, 133 ms 99th, 133 ms 99.9th.n"
}
$ curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?operation=producer-test&messages=10"
{
  "message": "10 records sent, 70.422535 records/sec (0.07 MB/sec), 24.20 ms avg latency, 133.00 ms max latency, 13 ms 50th, 133 ms 95th, 133 ms 99th, 133 ms 99.9th.n"
}

这相当于运行计算机运行 Kafka 调度器时所用的以下命令:

$ kafka-producer-perf-test.sh \
  --topic <topic> \
  --num-records <messages> \
  --throughput 100000 \
  --record-size 1024 \
  --producer-props bootstrap.servers=<current broker endpoints>

删除主题

$ dcos kafka --name=kafka topic delete topic1
{
  "message": "Topic topic1 is marked for deletion.nNote: This will have no impact if delete.topic.enable is not set to true.n"
}
$ curl -X DELETE -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1"
{
  "message": "Topic topic1 is marked for deletion.nNote: This will have no impact if delete.topic.enable is not set to true.n"
}

请注意以上命令输出中的警告。您可以更改指示 delete.topic.enable 配置值,作为配置更改。

列出 Under Replicated 分区

$ dcos kafka --name=kafka topic under_replicated_partitions
{
  "message": ""
}
$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/under_replicated_partitions"
{
  "message": ""
}

列出不可用分区

$ dcos kafka --name=kafka topic unavailable_partitions
{
  "message": ""
}
$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/unavailable_partitions"
{
  "message": ""
}