How to build an Event Pipeline within 1 hour and minimum lines of code in AWS

Bahadir Cambel
Big Data WAL
Published in
8 min readFeb 18, 2018

--

Goal: Build a event pipeline that can accept arbitrary JSON messages and store it in S3.

Photo by Scott Webb on Unsplash

Note: Check out our latest Serverless Smart Radio series

Rationale: Many businesses today require much more intelligence to quickly decide and react on going things. Such as;

  • Is this product converting ?
  • Is this marketing campaign worth-while?
  • Are we paying too much to ads ?
  • Do people like this product ?
  • Who is interacting with what?
  • Can we play with the price of the product ?
  • Multi-arm bandit test ? A/B Tests ?

In order to answer all these questions, the tradition methods of development by using a database server will not help you to get the answers quickly and efficiently. The inhouse existing development designs will force you wake up in the middle of the night, you will end up with sleepless nights. But should it be this way ? Can we design systems that are much simpler to reason about, handles downtime more gracefully and heals itself ? or even run different versions of the systems at the same time while can handle 1000+ requests per second.

Answer: Event Pipe

Amazon Services that we will be utilizing

AWS API Gateway: Accepts events from different sources and forwards to the Kinesis.

AWS Kinesis Where events sequenced and ready to be consumed (stored in disk)

AWS Kinesis Firehose: Consumes the Kinesis stream and serialize into a destination in batches of N minutes or N megabytes.

AWS S3: Store all the events in batches.

API GW -> Kinesis -> Kinesis Firehose -> S3

Final API Gateway endpoints

As an end result we will create the following endpoints in API Gateway in order to figure out which streams can we access, what are their status and to place data into those streams we will use the PUT operation. API Gateway is the cornerstone of our service that will be exposed to the rest of the world or to our internal services. It’s important to get it right.

Before we start building the API Gateway, we need to create a Kinesis Stream first. Head over to your Kinesis console and click the get started and you will end up with the following screen showing you all the possible Kinesis Services. We need to have the Data Stream in order to

Select “Create Data Stream”
Input Stream name and # of shards

Input the Kinesis stream name as event-pipe and # of shards and click the Create Kinesis Stream button. Viole! Now let’s get to the next stage. Go to the API Gateway console and create a new one.

OK. Let start building the API Gateway points.

First we have to create the general one

Once the resource is created before getting into the next resource, we have to give permission to the API Gateway service to talk with the Kinesis service service on our behalf.

  • Go to the IAM Console, clicks roles.
  • Create a Role, start by selecting the API Gateway as the trusted entity
  • Click the Next: Permission
  • Type in the Role Name and click “Create Role” button
Trusted Entity is API Gateway

Once the Role created, we have to give the AWS FullKinesis access to the role

Attach Kinesis Full Access to the Role

Once the policy is attached to the Role, we are ready to go. Copy the Role ARN; in my case it’s called arn:aws:iam::XXXXXXXXXX:role/apigw-eventpipe Store your RoleARN somewhere easy to grab from.

After that it’s time to create the child resources. With this endpoint, we will able to list all the available Kinesis Streams that are readable through our roles. Click the Create Source from the Actions menu again and type in Streams.

Create a Child resource

fill that information into the

  • Select Integration Type: AWS Service
  • AWS Region: Which ever region that you created the Kinesis Stream
  • AWS Service: Kinesis
  • HTTP Method: POST
  • Action: List Streams
  • Execution Role: arn:aws:iam::XXXXXXXXXX:role/apigw-eventpipe
List Streams Get ACTION

Once you hit the save button, you will come to the Method Execution flow screen

Click the Integration Request on the top right and once the screen opened, expand the HTTP headers and input the following (including single quotes)

Now expand the Body Mapping templates and select the option as in the following picture, Add Mapping template application/json and type {} into the visible textbox. Since we already specified the ListStreams actions in the previous step, all we need to set the HTTP Headers and empty JSON body to Kinesis Client API

After you hit the save button, click the Streams GET Method and go back to the flow diagram and click the Test link

Hit the Test button and look at the returned results. As seen on the right side, the request took 478ms and returned the available Streams. We would use this name to get the details of the Kinesis Stream in the next section.

Kinesis Stream Details

In order to get into details of a specific Kinesis stream, we need to specify the stream that we want to work on, and we will accomplish this via the URL parameter. The final result of our API will be ; /Streams/{stream-name} or in our case /Streams/event-pipe

{stream-name} is a template argument syntax API gateway uses behind the scenes. We will use this template argument down the pipe as well. Thus first create the child resource below the /Streams endpoint as follows

Next up is to answer a GET Request to give a bit more details about the stream

And we will use the DescribeStream method of Kinesis to give an explanation of our consumers about the stream that they are interested in.

This time for the Integration Request we have to pass the stream-name as a parameter in order to describe the stream

Again set the HTTP Headers, add the Content-Type “application/json” and set the above as the body

Integration Request Details

And once the settings are saved and test the following JSON returned from the test result

Kinesis Stream PUT Record

So far we have created a general endpoint for

  • /Streams Listing all the available Streams
  • /Streams/{stream-name} Describe the given stream

The step will be to create a endpoint to accept data. This time when we are creating the child resource, we will select the Enable API Gateway CORS so that we can send requests from browsers

The most crucial part of the setup is the following piece of JSON data that will structure the data called Integration Request (remove the comments)

Remove the comments above

which will transform the input data into a version that Kinesis would receive

Enter event-pipe into the text field (stream-name) and add the exact same request body to test the functionality. And once you click the test button you should get a similar response body from the Kinesis

Kinesis Firehose

Once we go back to the Kinesis dashbord you will be able to see the Data Streams and Firehose streams. What we need to do next is to create a “Delivery Stream”.

On the left side, Kinesis Firehose is created to write data into S3. In the first step of the creation we select which stream the Firehouse should consume from.

Next up is to choose whether we want to transform our incoming data into another format. For the brevity of this article, we won’t get into Lambda transformations, thus pick Disabled and move to the next step.

Firehose give us the opportunity to load the Kinesis Data Stream into different sources such as;

  • AWS S3 ( Choose this )
  • Redshift
  • ElasticSearch Service
  • Splunk
Pick a S3 Bucket and leave the prefix empty

Finally we select how we would like our data to be batched which the default settings as follows;

  • Every 5 MB of data
  • Every 300 seconds

I wish there was an option to also batch with # of records however there isn’t. For now leave the S3 compression as disabled but once we go to production, we would also better to select the GZIP S3 Compression for the finalized data since it’s pointless to pay extra for the storage.

The final page of the wizard shows you the Review and that’s it. It’s time to deploy our API Gateway.

Now lets head over to our API. Mine is at the following address, and yours will be provided on the top part of the screen. As you can see, the end url contains the word “test

Use the Invoke URL to test your API

https://m2v3xe5shl.execute-api.us-east-1.amazonaws.com/test/Streams

{"HasMoreStreams":false,"StreamNames":["event-pipe"]}

https://m2v3xe5shl.execute-api.us-east-1.amazonaws.com/test/Streams/event-pipe

will return you the details of the stream

and finally it’s time to put a record into our pipe.

Now head over to S3 after couple of minutes and you should be able to see records within the S3 bucket in the following format YYYY/MM/DD/Events-1-<uuid>

Part 2 is here Transforming records using Lambda Functions

--

--

Bahadir Cambel
Big Data WAL

(Ultra)Runner — Distributed Software/Data/ML engineer, Clojure & Python craftsman. Built a recsys