Create Consumers
This page describes how to create a consumer.
Prerequisites
- A Macrometa account with admin permissions.
- An API key with admin permissions. For more information, refer to Create API Keys.
- The appropriate SDK installed. For more information, refer to Install SDKs.
Create Consumer Code
This code creates a new client, requests a stream object, and then creates a consumer.
If you're using JavaScript, the code creates a jsc8
client. If Python, it creates a C8Client
- JavaScript SDK
- Python SDK
const jsc8 = require("jsc8");
const BASE_URL = "https://play.paas.macrometa.io/"
client = new jsc8({
url: BASE_URL,
apiKey: "xxxxxx", // Update this with your API key
fabricName: "_system",
});
const streamName = "streamQuickstart";
const subscriptionName = "consumer-subscription"
async function main () {
async function createStream() {
if (await client.hasStream(streamName, false)) {
console.log("This stream already exists!");
console.log(`Existing Consumer = c8globals.${streamName}`);
} else {
console.log("\nCreating global stream...");
// To create a global stream, set the second parameter to false
// There is an option to create a local stream, which is only accessible within the region
const streamInfo = await client.createStream(streamName, false);
console.log(`New Consumer = ${streamInfo.result["stream-id"]}`);
}
}
async function consumer() {
try {
await console.log("\nConnecting consumer to global stream...");
// Create stream only if stream does not exist
createStream();
// Request stream object
const stream = client.stream(streamName, false);
// Request one-time password
const consumerOTP = await stream.getOtp();
// Create consumer
const consumer = await stream.consumer(subscriptionName, BASE_URL.replace("https://",""), {
otp: consumerOTP
});
// Run consumer - open connection to server
consumer.on("message", (msg) => {
const { payload, messageId } = JSON.parse(msg);
// Received message payload
console.log(Buffer.from(payload, "base64").toString("ascii"));
// Send message acknowledgement
consumer.send(JSON.stringify({ messageId }));
});
} catch (e) {
await console.log("Could not receive messages " + e);
}
}
consumer();
}
main();
import base64
import json
import os
from c8 import C8Client
""" For Python SDK we can omit https:// part of the URL """
BASE_URL = "play.paas.macrometa.io/"
stream_name = "stream_quickstart"
""" Connect to GDN """
client = C8Client(
protocol='https',
host=BASE_URL,
port=443,
apikey="xxxxxx",
geofabric="_system"
)
""" Create consumer and receive data through a stream """
def create_consumer():
print("\nConnecting consumer to global stream...")
consumer = client.subscribe(
stream_name,
local=False,
subscription_name="consumer_subscription"
)
while True:
message = json.loads(consumer.recv())
decoded_message = base64.b64decode(message['payload']).decode('utf-8')
print(f"Received message '{decoded_message}' id='{message['messageId']}'")
consumer.send(json.dumps(
{'messageId': message['messageId']}))
create_consumer()