Categories
Uncategorized

Creating a Data Pipeline with AWS CloudFormation

Something I’ve found to be highly useful in several projects is to store data in JSON format on S3. The data later can be pulled down to be analyzed ad hoc, or it can be queried with Athena in-place, or it can be loaded into a big data analysis platform such as Redshift or Databricks.

A simple way to do this is by posting JSON objects to a Kinesis Data Stream (KDS), and then configuring a Kinesis Firehose Delivery Stream (KDF) to take that data and store it to an S3 bucket. By default, it will partition the data by year, month, day, and hour within the bucket (based on UTC).

Naturally, when I created a Discord bot that ridicules someone for posting a paywall, I decided I wanted to track who’s posting paywalls. Not wanting to think too hard about table structure, a pattern such as above was a natural fit for capturing this metric.

I’m a big, big fan of AWS CloudFormation (AWS’s answer to Infrastructure as Code) as it allows for version control and historical tracking of infrastructure, as well as an easy way to leverage snippets of someone else’s setup! It takes a bit to get used to

I had to leverage several different examples and look through the documentation in order to accomplish setting up a simple data pipeline using CloudFormation, and I hope that this real-world example could help someone else who is looking to work with CloudFormation.

Note: even though you can write CloudFormation templates in JSON, I prefer to write them in YAML. Hence the blog name 🙂

You can view the complete template at https://github.com/gmlyth/paywallbot/blob/main/infrastructure/paywallbot.yaml, but it may be a doozy to look at all at once. So let’s build it piece-by-piece.

  S3BucketPaywalls:
    Type: 'AWS::S3::Bucket'
    Properties:
      BucketName: !Sub "${AWS::AccountId}-paywallbot"
      Tags:
        - Key: Vendor
          Value: PaywallBot
      PublicAccessBlockConfiguration:
        BlockPublicAcls : true
        BlockPublicPolicy : true
        IgnorePublicAcls : true
        RestrictPublicBuckets : true

Probably the best thing to start with in CloudFormation is making an S3 bucket. a resource in CloudFormation has a Type, in this case ‘AWS::S3::Bucket’, as well as a collection of Properties. The tags section is just for fun, but giving it a unique name is important. In this case I wanted it to have a name that was my AWS account id plus “-paywallbot.” If you leave the BucketName property blank, CloudFormation will assign a random ID. the PublicAccessBlockConfiguration is optional; I don’t want my bucket to be public, even though there’s not much interesting going on there.

Here you can see the main tab of the bucket that CloudFormation creates, along with the Tags section from the Properties tab. You’ll see that the Vendor tag I created in CloudFormation is assigned here, as well as some tags created by CloudFormation. This is how CloudFormation keeps track of what to update, delete, etc. as you create and modify your templates.

The next part of this is a Kinesis Data Stream.

  KinesisDataStreamPaywall:
    Type: AWS::Kinesis::Stream
    Properties: 
      Name: !Sub "${AWS::AccountId}-paywallbot-stream"
      RetentionPeriodHours: 24
      ShardCount: 1
      Tags: 
        - Key: Vendor
          Value: PaywallBot
        - Key: Name
          Value: !Sub "${AWS::AccountId}-paywallbot-stream"

What’s different here versus the S3 bucket is the inclusion of a couple of new properties. I’m setting the RetentionPeriodHours to 24 and the ShardCount to 1 – the smallest possible values for either – because the data will be dumped to S3 almost immediately and I don’t have the need to look back at the data. I also don’t anticipate going anywhere close to the 1 MB or 1000 record per second limit per Kinesis shard.

Once we have the data stream, we need a Kinesis Data Firehose delivery stream.

  KinesisFirehosePaywall:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties: 
      DeliveryStreamName: !Sub "${AWS::AccountId}-paywallbot-firehose"
      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration: 
        KinesisStreamARN: !GetAtt KinesisDataStreamPaywall.Arn
        RoleARN: !GetAtt KinesisSourceIAMRole.Arn
      S3DestinationConfiguration: 
        BucketARN: !GetAtt S3BucketPaywalls.Arn
        RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 5
      Tags: 
        - Key: Vendor
          Value: PaywallBot
        - Key: Name
          Value: !Sub "${AWS::AccountId}-paywallbot-firehose" 

This is where things start getting kind of interesting. There are a lot of options for destinations for a Kinesis Data Firehose, as you’ll see if you look at the documentation here: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisfirehose-deliverystream.html

However, a lot of that is noise for the task at hand. Let’s look at this resource definition broken down a bit.

      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration: 
        KinesisStreamARN: !GetAtt KinesisDataStreamPaywall.Arn
        RoleARN: !GetAtt KinesisSourceIAMRole.Arn

Here, we’re specifying that the delivery stream (the Kinesis Data Firehose) is expecting a Kinesis Data Stream as its source. Then, we’re condiguring that source – we’re giving it the Arn (Amazon Resource Name) of the Kinesis Data Stream we created above. CloudFormation will create things in the order it needs to create them in order to be able to figure out what that Arn is, when it needs it.

We’re also specifying the ARN for the role that will let the Kinesis Data Firehose get data *from* the Kinesis Data Stream. We haven’t seen that role definition yet, but here it is:

  KinesisSourcePageLoadsIAMRole:
    Type: AWS::IAM::Role
    Condition: CreatePageLoadS3    
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Sid: ''
          Effect: Allow
          Principal:
            Service: firehose.amazonaws.com
          Action: sts:AssumeRole
      Tags:
        - Key: Vendor
          Value: PaywallBot
        - Key: Name
          Value: !Sub "mpows-${AWS::AccountId}-${AWS::Region}-page-loads-kinesis-role"            
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: "kinesis:*"
                Resource: !GetAtt KinesisDataStreamPageLoads.Arn 

This is a whole bunch of stuff, especially because I insist on adding tags, for a few key lines:

          Principal:
            Service: firehose.amazonaws.com
          Action: sts:AssumeRole
....
              - Effect: Allow
                Action: "kinesis:*"
                Resource: !GetAtt KinesisDataStreamPageLoads.Arn

Essentially this role definition says that a Kinesis Data Firehose can assume this role, and that it allows you to make any kinesis action (API call) on the Kinesis Data Stream we created. The delivery stream will assume this role whenever it wants to fetch data from the source data stream.

As for how it delivers to S3, let’s look at that section from the firehose resource definition:

      S3DestinationConfiguration: 
        BucketARN: !GetAtt S3BucketPaywalls.Arn
        RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 5

It’s giving the ARN of the bucket we created at the very beginning, as that’s where it’s delivering the data. It’s going to assume another role to deliver the data (we’ll see that in a second), and we’ve set the buffer to 300 seconds and 5 MB. Once it has 5 MB of data in the buffer, it will write it to the S3 bucket as a JSON file. If that doesn’t happen within 300 seconds, whatever the firehose has will get written at that time.

For the S3 delivery role:

  FirehoseDeliveryIAMRole:
    Type: AWS::IAM::Role 
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Sid: ''
          Effect: Allow
          Principal:
            Service: firehose.amazonaws.com
          Action: sts:AssumeRole
      Tags:
        - Key: Vendor
          Value: PaywallBot
        - Key: Name
          Value: !Sub "${AWS::AccountId}-firehose-role"           
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: "s3:*"
                Resource: 
                  - !GetAtt S3BucketPaywalls.Arn   
                  - !Sub "${S3BucketPaywalls.Arn}/*"  

Again, a whole bunch of extra stuff. Here’s what’s pertintent.

          Effect: Allow
          Principal:
            Service: firehose.amazonaws.com
          Action: sts:AssumeRole
....
              - Effect: Allow
                Action: "s3:*"
                Resource: 
                  - !GetAtt S3BucketPaywalls.Arn   
                  - !Sub "${S3BucketPaywalls.Arn}/*"

Again, we’re allowing the firehose.amazonaws.com service to assume this role. As for what this role does, it allows any s3 API calls on the bucket we created, as well as any objects in the bucket (that’s what the /* is).

Now that we have this, all we need is to have a role that has the rights to put objects to the Kinesis Data Stream. For that, here’s an example with as much extra stuff stripped out as possible:

  PaywallBotTaskRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Sid: ''
          Effect: Allow
          Principal:
            Service: ecs-tasks.amazonaws.com
          Action: sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17"
            Statement:  
              - Effect: Allow
                Action: "kinesis:*"
                Resource:
                  - !GetAtt KinesisDataStreamPaywall.Arn   

In this case, I’m allowing an ECS task (i.e., a container running on ECS) to assume this role, and to do anything it wants to with the data stream.

Once data is written to the data stream, the firehose will assume the role that lets it read from the stream, do so, and then after 5 MB or 300 seconds, write that data to the S3 bucket.

Here’s a code snippet of writing to the stream with Python (using the boto3 library):

cache.kinesis_client = boto3.client('kinesis', region_name='us-east-2')

record = {'guild_id' : message.guild.id
                , 'guild_name' : message.guild.name
                , 'paywall_url' : message.content
                , 'timestamp' :        datetime.datetime.now().strftime("%m/%d/%Y %H:%M:%S")
                , 'author_id' : message.author.id
                , 'author_name' : message.author.display_name}
record_massaged = json.dumps(record) + "\n"
                           
cache.kinesis_client.put_record(
                StreamName=cache.KINESIS_STREAM_NAME,
                Data=record_massaged.encode('utf-8'),
                PartitionKey='string'
            )

And here’s a screenshot of the ultimate result after I posted the url for Wordle to my Discord server:

If you made it this far: congratulations! It’s a long-winded explanation, but hopefully it helps more than just posting the whole template. That said, feel free to take a look at the template on GitHub, and use it as a starting point for your own!