Skip to main content

Display Real-time Data from Kafka (Confluent Cloud)

This page shows how to display and stream real-time data from Kafka - Confluent Cloud using WebSockets.

Image

Prerequisites

Retrieve Kafka Credentials

Follow these steps to fetch cluster and API credentials from Kafka:

  1. Go to Confluent Cloud, navigate to the dashboard, and open your Cluster.

  2. Click on Cluster settings and copy the server endpoint. Save it for future use.

Example:

pkc-11ab.us-east-2.aws.confluent.cloud:1010
  1. Navigate to the Connector page, and select the connector associated with your desired data streaming topic.

  2. Inside the Connector settings, find the API Key, Kafka API Secret, and Topic Name.

Image

Configure WebSocket Server

Follow these steps to set up a WebSocket server, either locally or on your preferred platform. This server will act as a bridge to stream real-time data from your Kafka topic to your Appsmith application.

  1. Select a WebSocket library or framework based on your preferred programming language, and decide whether to set up the WebSocket server locally or on a cloud platform (e.g., AWS, Azure, Google Cloud).

  2. Install a Kafka client library and a WebSocket server library in your local development environment or on your cloud server.

Example: If you want to set up locally using Node.js, install:

npm install kafkajs ws
  1. Write a WebSocket server script that listens for connections and handles data streaming. Ensure the server can communicate with your Kafka cluster using the credentials and broker URL you retrieved earlier.

Example: If you want to create a script locally using Node.js, use the following code.

const { Kafka } = require('kafkajs');
const WebSocket = require('ws');

// Kafka setup
const Kafka = new Kafka({
clientId: 'Kafka-websocket-bridge',
brokers: ['pkc-11ab.us-east-2.aws.confluent.cloud:1010'], // Replace with your Kafka broker URL
ssl: true,
sasl: {
mechanism: 'plain',
username: 'your-Kafka-api-key', // Replace with your Kafka API key
password: 'your-Kafka-api-secret', // Replace with your Kafka API secret
},
});

const consumer = Kafka.consumer({ groupId: 'websocket-group' });

// WebSocket server setup
const wss = new WebSocket.Server({ port: 8080 });

async function run() {
await consumer.connect();
await consumer.subscribe({ topic: 'your-topic-name', fromBeginning: true }); // Replace with your Kafka topic name

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const messageValue = message.value.toString();
console.log(`Received message: ${messageValue}`);

// Send message to all connected WebSocket clients
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(messageValue);
}
});
},
});
}

wss.on('connection', (ws) => {
console.log('New WebSocket client connected');

ws.on('close', () => {
console.log('WebSocket client disconnected');
});
});

run().catch(console.error);

console.log('Kafka to WebSocket bridge is running on port 8080');

This Node.js script sets up a WebSocket server that listens on port 8080 and a Kafka consumer to read messages from a specified Kafka topic. It streams incoming Kafka messages to all connected WebSocket clients in real-time.

info

If you're setting up a secure WebSocket server (WSS), you'll need to generate a self-signed certificate or obtain one from a Certificate Authority. You can use OpenSSL to generate a self-signed certificate.

  1. Run and test the WebSocket server script.

If you are using a local server setup and want to make your WebSocket server public, you can consider using platforms like Ngrok, Heroku, DigitalOcean, or AWS EC2.

Set Up WebSocket in Appsmith

Follow these steps to integrate and use the WebSocket server you set up with your Appsmith application:

  1. In Appsmith, create a new JSObject and configure your WebSocket connection. Define socket for the WebSocket instance, like:

Example:

export default {
// WebSocket URL (change this URL based on your WebSocket server)
socketURL: 'ws://localhost:8080',

// For local development: ws://localhost:8080
// If the server is hosted: wss://your-domain.ngrok.io

// WebSocket instance
socket: null,

// Function to initialize the WebSocket connection
initWebSocket() {
this.socket = new WebSocket(this.socketURL);

// Event handler for successful connection
this.socket.onopen = () => {
console.log('WebSocket connection established successfully');
};

// Event handler for incoming messages
this.socket.onmessage = (event) => {
// Parse the incoming data
const responseData = JSON.parse(event.data);

// Log the raw data for reference
console.log('Received data:', event.data);

// Add the new data to the top of the receivedData array
this.receivedData.unshift(responseData);

// Log the updated array
console.log('Updated Data:', this.receivedData);
};

// Event handler for errors
this.socket.onerror = (error) => {
console.error('WebSocket error:', error);
};

// Event handler for connection closure
this.socket.onclose = (event) => {
console.log('WebSocket connection closed:', event);
};
}
};

This code sets up a WebSocket connection to the specified server URL and handles different events such as successful connection, incoming messages, errors, and connection closures. It logs the received data and manages the WebSocket lifecycle.

  1. Bind response data with widgets as needed.

Example: To display data in Table widget, set Table data property to:

{{WebsocketUtils.receivedData}}

Format the data and bind it to widgets based on your requirements. For more information on WebSockets, see Websockets for real-time updates.