Skip to content

Kafka Module

Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Install

npm install @testcontainers/kafka --save-dev

Examples

it("should connect using in-built zoo-keeper", async () => {
  const kafkaContainer = await new KafkaContainer().withExposedPorts(9093).start();

  await testPubSub(kafkaContainer);

  await kafkaContainer.stop();
});
it("should connect using provided zoo-keeper and network", async () => {
  const network = await new Network().start();

  const zooKeeperHost = "zookeeper";
  const zooKeeperPort = 2181;
  const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4")
    .withNetwork(network)
    .withNetworkAliases(zooKeeperHost)
    .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
    .withExposedPorts(zooKeeperPort)
    .start();

  const kafkaContainer = await new KafkaContainer()
    .withNetwork(network)
    .withZooKeeper(zooKeeperHost, zooKeeperPort)
    .withExposedPorts(9093)
    .start();

  await testPubSub(kafkaContainer);

  await zookeeperContainer.stop();
  await kafkaContainer.stop();
  await network.stop();
});
it(`should connect locally`, async () => {
  const kafkaContainer = await new KafkaContainer()
    .withSaslSslListener({
      port: 9094,
      sasl: {
        mechanism: "SCRAM-SHA-512",
        user: {
          name: "app-user",
          password: "userPassword",
        },
      },
      keystore: {
        content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
        passphrase: "serverKeystorePassword",
      },
      truststore: {
        content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
        passphrase: "serverTruststorePassword",
      },
    })
    .start();

  await testPubSub(kafkaContainer, {
    brokers: [`${kafkaContainer.getHost()}:${kafkaContainer.getMappedPort(9094)}`],
    sasl: {
      username: "app-user",
      password: "userPassword",
      mechanism: "scram-sha-512",
    },
    ssl: {
      ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
    },
  });
  await kafkaContainer.stop();
});