In this tutorial, we will learn how to create a basic application for publishing real-time notifications via websocket api from API Gateway. Before having API Gateway support for websockets we had to
have a separate websocket server to publish notifications or sending messages to the available connections at that point of time. API Gateway literally simplifies this process and fits it into the serverless world.
The flow basically consists of persisting the connections in a data store where we will be using DynamoDB
and getting connections which we want to send notifications to. Websocket API has 3 routes;
- $connect: This route is triggered automatically when a client sends a connect request.
- $disconnect: This route is triggered automatically when a client sends a disconnect request.
- $default: Obviously this is the default handler if there are no any routes matched.
API Gateway selects routes based on the request.body.action
value, meaning we can define custom routes to handle different type of logic. And if there is no any matching route then API Gateway will forward to request to the $default
handler.
We won’t be using $default
handler in this tutorial since there are many examples around the web. Instead, we will see how to send real time notifications like; notify clients when an order is submitted. We can take a book seller company in Amazon
marketplace with a few online employees as an example in this tutorial. When a client orders a book belonging to this merchant all the online employees will be notified. The related flow will be like;
- Client sends a connect request then
$connect
handler persists the connection into DynamoDB by itsconnectionId
coming fromevent.requestContext.connectionId
. - Another client calls an API which will generate a notification for other online clients.
- Client sends a disconnect request which will invoke
$disconnect
handler and the relatedconnectionId
will be deleted from DynamoDB. This
DynamoDB Schema
I will be using the simplest schema for this example where the connectionId
will be the Primary Key
of the table. In most cases, connectionId
of the clients are being kept along with an entity id which can be the Primary Key
and connectionId
will be the Sort Key
so that we can get all specific online clients based on that entity id. Anyway, the REST API lambda will send a Scan
request to get all connections.
Lambda Handlers
We will have 3 Lambda functions where two of them belongs to Websocket API to handle $connect
, $disconnect
requests and one as a REST API handler.
$connect Route Lambda Handler
When a client sends a connection request this Lambda simply gets the connectionId
and persists into DynamoDB.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import {
APIGatewayProxyResultV2,
APIGatewayProxyWebsocketEventV2,
APIGatewayProxyWebsocketHandlerV2,
} from 'aws-lambda';
import 'source-map-support/register';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, PutCommand } from '@aws-sdk/lib-dynamodb';
const dynamoDBClient = new DynamoDBClient({
region: process.env.AWS_REGION,
});
const dynamoDBDocClient = DynamoDBDocumentClient.from(dynamoDBClient);
export const handler: APIGatewayProxyWebsocketHandlerV2 =
async (event: APIGatewayProxyWebsocketEventV2): Promise<APIGatewayProxyResultV2> => {
console.log('Event:', JSON.stringify(event));
const putCommand = new PutCommand({
TableName: process.env.CONN_TABLE_NAME,
Item: {
connectionId: event.requestContext.connectionId,
},
});
const resp = await dynamoDBDocClient.send(putCommand);
console.log(`putCommand resp => ${JSON.stringify(resp)}`);
return { statusCode: 200 };
};
$disconnect Route Lambda Handler
It is same as $connect
Lambda handler instead it will delete the connectionId
from DynamoDB.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import {
APIGatewayProxyResultV2,
APIGatewayProxyWebsocketEventV2,
APIGatewayProxyWebsocketHandlerV2,
} from 'aws-lambda';
import 'source-map-support/register';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DeleteCommand, DynamoDBDocumentClient } from '@aws-sdk/lib-dynamodb';
const dynamoDBClient = new DynamoDBClient({
region: process.env.AWS_REGION,
});
const dynamoDBDocClient = DynamoDBDocumentClient.from(dynamoDBClient);
export const handler: APIGatewayProxyWebsocketHandlerV2 =
async (event: APIGatewayProxyWebsocketEventV2): Promise<APIGatewayProxyResultV2> => {
console.log('Event:', JSON.stringify(event));
const deleteCommand = new DeleteCommand({
TableName: process.env.CONN_TABLE_NAME,
Key: {
connectionId: event.requestContext.connectionId,
},
});
const resp = await dynamoDBDocClient.send(deleteCommand);
console.log(`deleteCommand resp => ${JSON.stringify(resp)}`);
return { statusCode: 200 };
};
REST API Lambda Handler
As mentioned before, our REST API Lambda will scan the DynamoDB table to get all the connections for sending push notifications. We will be using ApiGatewayManagementApiClient from AWS SDK.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import { APIGatewayProxyEventV2, APIGatewayProxyResultV2 } from 'aws-lambda';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, ScanCommand } from '@aws-sdk/lib-dynamodb';
import { ApiGatewayManagementApiClient, PostToConnectionCommand } from '@aws-sdk/client-apigatewaymanagementapi';
import { TextEncoder } from 'util';
const dynamoDBClient = new DynamoDBClient({
region: process.env.AWS_REGION,
});
const dynamoDBDocClient = DynamoDBDocumentClient.from(dynamoDBClient);
const apiGwManApiClient = new ApiGatewayManagementApiClient({
region: process.env.AWS_REGION,
endpoint: process.env.WS_API_ENDPOINT,
});
export const handler = async function (event: APIGatewayProxyEventV2): Promise<APIGatewayProxyResultV2> {
console.log(`event => ${JSON.stringify(event)}`);
const scanCommand = new ScanCommand({
TableName: process.env.CONN_TABLE_NAME,
});
const scanCommandResp = await dynamoDBDocClient.send(scanCommand);
console.log(`scanCommand resp => ${JSON.stringify(scanCommandResp)}`);
const textEncoder = new TextEncoder();
const connectionItems = scanCommandResp.Items || [];
for (let ind = 0; ind < connectionItems.length; ind++) {
const postToConnectionCommandResp = await apiGwManApiClient.send(new PostToConnectionCommand({
ConnectionId: connectionItems[ind].connectionId,
Data: textEncoder.encode('A new review published for book with id 123'),
}));
console.log(`postToConnectionCommand resp => ${JSON.stringify(postToConnectionCommandResp)}`);
}
return {
statusCode: 200,
body: JSON.stringify({
id: 123,
review: 'awesome book',
}),
};
};
endpoint
: It will be in the form ofhttps://{api-id}.execute-api.{region}.amazonaws.com/{stage}
which we are passing from CDK side into Lambda as environment variable.PostToConnectionCommand
: This is the command object we need to use for sending messages to connections. Hence the fact that,Data
field expects an encoded string instead of raw string value, so we need to useTextEncoder
to encode the data. If we are sending a JSON object then we need to first serialize it into a string and then encode it.
CDK Code
We will be provisioning a DynamoDB
table, three Lambda
functions, one WebSocketApi
and one RestApi
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import * as cdk from 'aws-cdk-lib';
import * as apigw from 'aws-cdk-lib/aws-apigateway';
import * as apigw2 from '@aws-cdk/aws-apigatewayv2-alpha';
import * as apigw2Integrations from '@aws-cdk/aws-apigatewayv2-integrations-alpha';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as lambdaNodejs from 'aws-cdk-lib/aws-lambda-nodejs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
export class AppStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const connectionTable = new dynamodb.Table(this, 'main-dynamo-table', {
partitionKey: {
name: 'connectionId',
type: dynamodb.AttributeType.STRING,
},
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
const wsConnectLambda = new lambdaNodejs.NodejsFunction(this, 'ws-connect-lambda', {
entry: './src/ws-connect-lambda.ts',
handler: 'handler',
runtime: lambda.Runtime.NODEJS_18_X,
environment: {
CONN_TABLE_NAME: connectionTable.tableName,
NODE_OPTIONS: '--enable-source-maps',
},
});
connectionTable.grantWriteData(wsConnectLambda);
const wsDisconnectLambda = new lambdaNodejs.NodejsFunction(this, 'ws-disconnect-lambda', {
entry: './src/ws-disconnect-lambda.ts',
handler: 'handler',
runtime: lambda.Runtime.NODEJS_18_X,
environment: {
CONN_TABLE_NAME: connectionTable.tableName,
NODE_OPTIONS: '--enable-source-maps',
},
});
connectionTable.grantWriteData(wsDisconnectLambda);
const webSocketApi = new apigw2.WebSocketApi(this, 'websocket-api', {
connectRouteOptions: {
integration: new apigw2Integrations.WebSocketLambdaIntegration('ws-connect-integration', wsConnectLambda),
},
disconnectRouteOptions: {
integration: new apigw2Integrations.WebSocketLambdaIntegration('ws-disconnect-integration', wsDisconnectLambda),
},
});
const webSocketStage = new apigw2.WebSocketStage(this, 'websocket-stage', {
webSocketApi: webSocketApi,
stageName: 'prod',
autoDeploy: true,
});
const restApi = new apigw.RestApi(this, 'rest-api', {
deployOptions: {
stageName: 'prod',
},
deploy: true,
defaultCorsPreflightOptions: {
allowMethods: ['POST', 'OPTIONS'],
allowOrigins: apigw.Cors.ALL_ORIGINS,
},
});
const apiLambda = new lambdaNodejs.NodejsFunction(this, 'api-lambda', {
entry: './src/api-lambda.ts',
handler: 'handler',
runtime: lambda.Runtime.NODEJS_18_X,
environment: {
WS_API_ENDPOINT: `https://${webSocketApi.apiId}.execute-api.${props?.env?.region}.amazonaws.com/${webSocketStage.stageName}`,
CONN_TABLE_NAME: connectionTable.tableName,
NODE_OPTIONS: '--enable-source-maps',
},
});
connectionTable.grantReadData(apiLambda);
webSocketApi.grantManageConnections(apiLambda);
restApi.root.addMethod('POST', new apigw.LambdaIntegration(apiLambda));
}
}
Testing
Our testing phase will consist of two steps. We will connect to our API Gateway Websocket Server from command line. And send a POST request from another terminal to verify a post notification is being sent to the online client.
Connect to Websocket Server
We will be using wscat
module for this. You can either install it as a global library from npm or homebrew.
1
2
> wscat -c wss://8po1pcbuke.execute-api.us-east-1.amazonaws.com/prod
Connected (press CTRL+C to quit)
After a successful connection is initiated we can also verify that the related connectionId
exists in our DynamoDB table.
1
2
3
4
5
6
7
8
9
10
11
12
> aws dynamodb scan --table-name AppStack-maindynamotableAB55E21F-8QAZTQJQFQ6C --region us-east-1
{
"Items": [{
"connectionId": {
"S": "CzRC9d73IAMCIZA="
}
}],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
Publish a Notification
We will send a POST request from a new terminal and our API Lambda will get all the available connections, send a push notification to each available connection and return back a JSON response.
1
2
3
> curl -i -X POST 'https://y69qybuse7.execute-api.us-east-1.amazonaws.com/prod/'
{"id":123,"review":"awesome book"}
After sending the request, a new message should pop-up from the terminal window with a message A new review published for book with id 123
.
Disconnect from Websocket Server
We will close the terminal window to disconnect from Websocket Server and verify that there are no any connection items in the DynamoDB table.
1
2
3
4
5
6
7
8
> aws dynamodb scan --table-name AppStack-maindynamotableAB55E21F-8QAZTQJQFQ6C --region us-east-1
{
"Items": [],
"Count": 0,
"ScannedCount": 0,
"ConsumedCapacity": null
}
Synth Stack(s)
1
2
3
> cdk ls
AppStack
Deploy Stack(s)
1
> cdk deploy --all
Destroy Stack(s)
Don’t forget to delete the stack after your testing.
1
> cdk destroy --all
Final Note
We implemented a very simple application for sending push notifications. However, there is one important point that we didn’t handle which is security. As you can see, all of our APIs (Websocket and REST) are public and not authorizing the inbound requests. There are a couple of ways to add authorization into Websocket requests;
- Using a
REQUEST
type Lambda Custom Authorizer for$connect
route (remember my previous post) - Handle the authorization logic inside the
$connect
route handler by passing a token withSec-WebSocket-Protocol
header. This was the way I used in one of our prod application, and it worked as expected without any issues. For more information you can follow the AWS guide.
You can also find my related github repository here.
Comments powered by Disqus.