In this example we will look at how to create a Kinesis Data Stream in our serverless app using SST.

Requirements

Create an SST app

Let’s start by creating an SST app.

$ npx create-sst@latest --template=base/example kinesisstream
$ cd kinesisstream
$ npm install

By default, our app will be deployed to the us-east-1 AWS region. This can be changed in the sst.config.ts in your project root.

import { SSTConfig } from "sst";

export default {
  config(_input) {
    return {
      name: "kinesisstream",
      region: "us-east-1",
    };
  },
} satisfies SSTConfig;

Project layout

An SST app is made up of two parts.

  1. stacks/ — App Infrastructure

    The code that describes the infrastructure of your serverless app is placed in the stacks/ directory of your project. SST uses AWS CDK, to create the infrastructure.

  2. packages/functions/ — App Code

    The code that’s run when your API is invoked is placed in the packages/functions/ directory of your project.

Adding a Kinesis Data Stream

Amazon Kinesis Data Streams is a serverless streaming data service that makes it easy to capture, process, and store data streams at any scale. And you won’t get charged if you are not using it.

Replace the stacks/ExampleStack.ts with the following.

import { Api, KinesisStream, StackContext } from "sst/constructs";

export function ExampleStack({ stack }: StackContext) {
  // create a kinesis stream
  const stream = new KinesisStream(stack, "Stream", {
    consumers: {
      consumer1: "packages/functions/src/consumer1.handler",
      consumer2: "packages/functions/src/consumer2.handler",
    },
  });
}

This creates an Kinesis Data Stream using KinesisStream and it has a consumer that polls for messages from the Kinesis Data Stream. The consumer function will run when it has polled 1 or more messages.

Setting up the API

Now let’s add the API.

Add this below the KinesisStream definition in stacks/ExampleStack.ts.

// Create a HTTP API
const api = new Api(stack, "Api", {
  defaults: {
    function: {
      bind: [stream],
    },
  },
  routes: {
    "POST /": "packages/functions/src/lambda.handler",
  },
});

// Show the endpoint in the output
stack.addOutputs({
  ApiEndpoint: api.url,
});

Our API simply has one endpoint (the root). When we make a POST request to this endpoint the Lambda function called handler in packages/functions/src/lambda.ts will get invoked.

We’ll also bind our stream to our API.

Adding function code

We will create three functions, one for handling the API request, and the other two for the consumers.

Replace the packages/functions/src/lambda.ts with the following.

export async function handler() {
  console.log("Message queued!");
  return {
    statusCode: 200,
    body: JSON.stringify({ status: "successful" }),
  };
}

Add a packages/functions/src/consumer1.ts.

export async function handler() {
  console.log("Message 1 processed!");
  return {};
}

Add a packages/functions/src/consumer2.ts.

export async function handler() {
  console.log("Message 2 processed!");
  return {};
}

Now let’s test our new API.

Starting your dev environment

SST features a Live Lambda Development environment that allows you to work on your serverless apps live.

$ npm run dev

The first time you run this command it’ll take a couple of minutes to deploy your app and a debug stack to power the Live Lambda Development environment.

===============
 Deploying app
===============

Preparing your SST app
Transpiling source
Linting source
Deploying stacks
dev-kinesisstream-ExampleStack: deploying...

 ✅  dev-kinesisstream-ExampleStack


Stack dev-kinesisstream-ExampleStack
  Status: deployed
  Outputs:
    ApiEndpoint: https://i8ia1epqnh.execute-api.us-east-1.amazonaws.com

The ApiEndpoint is the API we just created.

Let’s test our endpoint with the SST Console. The SST Console is a web based dashboard to manage your SST apps. Learn more about it in our docs.

Go to the Functions tab and click the Invoke button of the POST / function to send a POST request.

Functions tab invoke button

After you see a success status in the logs, go to the Local tab in the console to see all function invocations. Local tab displays real-time logs from your Live Lambda Dev environment.

Local tab response without kinesis

You should see Message queued! logged in the console.

Sending messages to our Kinesis Data Stream

Now let’s send a message to our Kinesis Data Stream.

Replace the packages/functions/src/lambda.ts with the following.

import AWS from "aws-sdk";
import { KinesisStream } from "sst/node/kinesis-stream";

const stream = new AWS.Kinesis();

export async function handler() {
  await stream
    .putRecord({
      Data: JSON.stringify({
        message: "Hello from Lambda!",
      }),
      PartitionKey: "key",
      StreamName: KinesisStream.Stream.streamName,
    })
    .promise();

  console.log("Message queued!");
  return {
    statusCode: 200,
    body: JSON.stringify({ status: "successful" }),
  };
}

Here we are getting the Kinesis Data Stream name from the environment variable, and then sending a message to it.

Let’s install the aws-sdk package in the packages/functions/ folder.

$ npm install aws-sdk

And now if you head over to your console and invoke the function again. You’ll notice in the Local tab that our consumers are called. You should see Message 1 processed! and Message 2 processed! being printed out.

Local tab response with kinesis

Deploying to prod

To wrap things up we’ll deploy our app to prod.

$ npx sst deploy --stage prod

This allows us to separate our environments, so when we are working in dev, it doesn’t break the API for our users.

Cleaning up

Finally, you can remove the resources created in this example using the following commands.

$ npx sst remove
$ npx sst remove --stage prod

Conclusion

And that’s it! We’ve got a completely serverless Kinesis Data Stream system. Check out the repo below for the code we used in this example. And leave a comment if you have any questions!