AWS CDK Fargate Schedular Write to DynamoDB

In this post I’ll show you how to create a scheduled Fargate task, in this case a locally developed node js Docker container that can push items to DynamoDB on a 1-minute timer.

A simpler architecture can be developed using event bridge and a Lambda function but the lambda is not helpful if the task takes longer than 15 minutes to run.

To start using AWS scheduled Fargate tasks in the CDK you must first create a VPC and cluster. The VPC is a logically isolated network that contains our EC2 cluster for managing the scheduled tasks.

The following code creates our VPC and Cluster in a very simple way using two availability zones.

    const vpc = new Vpc(this, 'VpcName', {
      maxAzs: 2
    });

    const cluster = new Cluster(this, 'ClusterName', {
      vpc: vpc
    });

Creating the DynamoDB table is simple in CDK, we also will add a gateway endpoint to the VPC so that the Fargate task can access the tables.

    const dynamoTable = new Table(this, 'DynamoTable', {
      partitionKey: {name:'ID', type: AttributeType.STRING},
      billingMode: BillingMode.PAY_PER_REQUEST,
      removalPolicy: RemovalPolicy.DESTROY
    });

    const dynamoGatewayEndpoint = vpc.addGatewayEndpoint('dynamoGatewayEndpoint', {
      service: GatewayVpcEndpointAwsService.DYNAMODB
    });

Next, we can create the Container image from a local asset.

    const containerImage = ContainerImage.fromAsset(path.join(__dirname, '../containers/scheduler/'));

In the project root I have created the folder containers and within that a folder called scheduler but you can choose your own structure as long as its mirrored in the above.
In the directory we will create 3 files; Dockerfile, package.json, tsconfig.json. We will also create a folder called src and within that our index.ts.

Dockerfile, this builds and defines our container

FROM node:14-alpine
WORKDIR /usr
COPY package.json ./
COPY tsconfig.json ./
COPY src ./src
RUN npm install
RUN npm run build
CMD ["node","./dist/index.js"]

package.json

{
  "name": "typescript",
  "version": "1.0.0",
  "description": "deploys a typescript that interacts with DynamoDB",
  "scripts": {
    "build": "tsc",
    "prestart": "npm run build",
    "start": "node .",
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "dependencies": {
    "aws-sdk": "^2.1009.0"
  },
  "devDependencies": {
    "@types/node": "^16.11.0",
    "typescript": "^4.4.4"
  }
}

tsconfig.json

{
  "compilerOptions": {
    "module": "commonjs",
    "esModuleInterop": true,
    "target": "es2017",
    "noImplicitAny": true,
    "moduleResolution": "node",
    "sourceMap": true,
    "outDir": "dist",
    "baseUrl": ".",
    "paths": {
      "*": [
        "node_modules/*"
      ]
    }
  },
  "include": [
    "src/index.ts"
  ]
}

index.ts, as an example this application creates a random number and adds it to the DynamoDB table we created above.

const aws = require('aws-sdk');
aws.config.update({region: process.env.region});
class App {
  constructor() {
  }
  async start() {
    const documentClient = new aws.DynamoDB.DocumentClient();
    const params = {
      TableName : process.env.databaseTable,
      Item: {
        ID: Math.floor(Math.random() * Math.floor(10000000)).toString(),
      }
    }
    console.log('Add to dynamo');
    const data = documentClient.put(params).promise();
  }
  
}
 
const app = new App();
app.start();

Now we can create the actual Fargate task itself. This example scheduled task will tick every minute.

const fargate = new ScheduledFargateTask(this, 'ScheduledFargateTask', {
      cluster,
      scheduledFargateTaskImageOptions: {
        image: containerImage,
        memoryLimitMiB: 512,
        environment: {
          databaseTable: dynamoTable.tableName,
          region: process.env.CDK_DEFAULT_REGION!
        },
      },
      schedule: Schedule.expression('rate(1 minute)'),
      platformVersion: FargatePlatformVersion.LATEST,
    });

Next up we need to add the permissions so that the Fargate task can do the put operation on the DynamoDB table and through the gateway.

    dynamoGatewayEndpoint.addToPolicy(
      new PolicyStatement({
        effect: Effect.ALLOW,
        principals: [new AnyPrincipal()],
        actions: [
          'dynamodb:PutItem',
        ],
        resources: [
          `${dynamoTable.tableArn}`
        ],
        conditions: {
          'ArnEquals': {
            'aws:PrincipalArn': `${fargate.taskDefinition.taskRole.roleArn}`
          }
        }
      })
    );

    // Write permissions for Fargate
    dynamoTable.grantWriteData(fargate.taskDefinition.taskRole);

Thats all the code needed; the stack can now be deployed. Once deployed you will be able to see the tasks in the AWS ECS dashboard.

We can also go to the DynamoDB table and see the entries getting added every minute.

See below the full stack code.

import * as cdk from '@aws-cdk/core';
import { CfnOutput, Construct, RemovalPolicy, Stack, StackProps} from '@aws-cdk/core';
import { Table, BillingMode, AttributeType } from '@aws-cdk/aws-dynamodb';
import { GatewayVpcEndpointAwsService, Vpc } from '@aws-cdk/aws-ec2';
import { Cluster, ContainerImage, FargatePlatformVersion } from '@aws-cdk/aws-ecs';
import { ApplicationLoadBalancedFargateService, ScheduledFargateTask } from '@aws-cdk/aws-ecs-patterns';
import { AnyPrincipal, Effect, PolicyStatement } from '@aws-cdk/aws-iam';
import path = require('path');
import { Schedule } from '@aws-cdk/aws-applicationautoscaling';


export class SchedulerExampleStack extends cdk.Stack {
  constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const dynamoTable = new Table(this, 'DynamoTable', {
      partitionKey: {name:'ID', type: AttributeType.STRING},
      billingMode: BillingMode.PAY_PER_REQUEST,
      removalPolicy: RemovalPolicy.DESTROY
    });

    const vpc = new Vpc(this, 'MyVpc', {
      maxAzs: 2
    });

    const dynamoGatewayEndpoint = vpc.addGatewayEndpoint('dynamoGatewayEndpoint', {
      service: GatewayVpcEndpointAwsService.DYNAMODB
    });

    const cluster = new Cluster(this, 'MyCluster', {
      vpc: vpc
    });

    const cntIcontainerImage = ContainerImage.fromAsset(path.join(__dirname, '../containers/test-scheduler/'));

    const fargate = new ScheduledFargateTask(this, 'ScheduledFargateTask', {
      cluster,
      scheduledFargateTaskImageOptions: {
        image: containerImage,
        memoryLimitMiB: 512,
        environment: {
          databaseTable: dynamoTable.tableName,
          region: process.env.CDK_DEFAULT_REGION!
        },
      },
      schedule: Schedule.expression('rate(1 minute)'),
      platformVersion: FargatePlatformVersion.LATEST,
    });

    // Allow PutItem action from the Fargate Task Definition only
    dynamoGatewayEndpoint.addToPolicy(
      new PolicyStatement({
        effect: Effect.ALLOW,
        principals: [new AnyPrincipal()],
        actions: [
          'dynamodb:PutItem',
        ],
        resources: [
          `${dynamoTable.tableArn}`
        ],
        conditions: {
          'ArnEquals': {
            'aws:PrincipalArn': `${fargate.taskDefinition.taskRole.roleArn}`
          }
        }
      })
    );

    // Write permissions for Fargate
    dynamoTable.grantWriteData(fargate.taskDefinition.taskRole);

    // Outputs
    new CfnOutput(this, 'DynamoDbTableName', { value: dynamoTable.tableName });

    }


}

Container code

index.json

const aws = require('aws-sdk');
aws.config.update({region: process.env.region});
class App {
  constructor() {
  }
  async start() {
    const documentClient = new aws.DynamoDB.DocumentClient();
    const params = {
      TableName : process.env.databaseTable,
      Item: {
        ID: Math.floor(Math.random() * Math.floor(10000000)).toString(),
      }
    }
    console.log('Add to dynamo');
    const data = documentClient.put(params).promise();
  }
  
}
 
const app = new App();
app.start();

Dockerfile

FROM node:14-alpine
WORKDIR /usr
COPY package.json ./
COPY tsconfig.json ./
COPY src ./src
RUN npm install
RUN npm run build
CMD ["node","./dist/index.js"]

package.json

{
  "name": "typescript",
  "version": "1.0.0",
  "description": "deploys a typescript that interacts with DynamoDB",
  "scripts": {
    "build": "tsc",
    "prestart": "npm run build",
    "start": "node .",
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "dependencies": {
    "aws-sdk": "^2.1009.0"
  },
  "devDependencies": {
    "@types/node": "^16.11.0",
    "typescript": "^4.4.4"
  }
}

tsconfig.json

{
  "compilerOptions": {
    "module": "commonjs",
    "esModuleInterop": true,
    "target": "es2017",
    "noImplicitAny": true,
    "moduleResolution": "node",
    "sourceMap": true,
    "outDir": "dist",
    "baseUrl": ".",
    "paths": {
      "*": [
        "node_modules/*"
      ]
    }
  },
  "include": [
    "src/index.ts"
  ]
}


Categories: AWS

Tags: , , , , , , , ,

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: