Friday, October 24, 2025

An Example of Kafka Topic Consume and Produce by AWS Lambda in Go

In this article, I demonstrate how to consume messages from Amazon MSK (Managed Streaming for Apache Kafka) with AWS Lambda and then produce messages using Lambda in Go.

For security best practices, it’s recommended to use IAM authentication with SASL when sending messages to MSK.

Currently, there are only a few Go libraries available for interacting with MSK. Some of them rely on C dependencies, which is not ideal for Go developers, as it requires enabling CGO and managing additional language dependencies.

To avoid these issues, I used the segmentio/kafka-go library, which is a Kafka client implemented entirely in Go.

MSK IAM SASL Implementation Example

func NewProducer() (*Producer, error) {
	kafkaBrokers := os.Getenv("KAFKA_BROKERS")
	if kafkaBrokers == "" {
		return nil, fmt.Errorf("KAFKA_BROKERS environment variable is required")
	}

	// Validate that we have at least one broker
	brokers := strings.Split(kafkaBrokers, ",")
	if len(brokers) == 0 {
		return nil, fmt.Errorf("KAFKA_BROKERS must contain at least one broker")
	}

	// Trim whitespace from broker addresses
	for i, broker := range brokers {
		brokers[i] = strings.TrimSpace(broker)
		if brokers[i] == "" {
			return nil, fmt.Errorf("invalid empty broker address in KAFKA_BROKERS")
		}
	}

	log.Printf("Kafka producer initializing with brokers: %v", brokers)

	awsCfg, err := config.LoadDefaultConfig(context.Background())
	if err != nil {
		return nil, fmt.Errorf("failed to load AWS config: %w", err)
	}

	// Create IAM SASL mechanism with credentials provider (not static credentials)
	iamSaslMechanism := &aws_msk_iam_v2.Mechanism{
		Signer:      signer.NewSigner(),
		Credentials: awsCfg.Credentials,
		Region:      awsCfg.Region,
	}

	// Configure transport with TLS and SASL
	sharedTransport := &kafka.Transport{
		SASL: iamSaslMechanism,
		TLS:  &tls.Config{},
	}

	// Initialize Kafka writer immediately for provisioned Lambda
	kafkaWriter := &kafka.Writer{
		Addr:         kafka.TCP(brokers...),
		RequiredAcks: kafka.RequireOne,
		BatchTimeout: 10 * time.Millisecond, // For Low latency
		BatchSize:    1,                     // Send messages immediately
		Compression:  kafka.Snappy,          // Use Snappy compression
		Transport:    sharedTransport,
	}

	log.Printf("Kafka producer initialized successfully with %d brokers", len(brokers))

	return &Producer{
		kafkaWriter: kafkaWriter,
	}, nil
}
There is a Go Test, you can put a break point and follow the process how it works. you may need a local stack or AWS Mock to test fully

func TestHandlerWithJSONFile(t *testing.T) {

	// Set up test environment (optional)
	os.Setenv("KAFKA_BROKERS", "localhost:9098,localhost:9098")

	// Read the JSON file
	jsonData, err := os.ReadFile("test/kafka-event-id-41350679.json")
	if err != nil {
		t.Fatalf("Failed to read test JSON file: %v", err)
	}

	// Unmarshal into KafkaEvent
	var event events.KafkaEvent
	if err := json.Unmarshal(jsonData, &event); err != nil {
		t.Fatalf("Failed to unmarshal JSON into KafkaEvent: %v", err)
	}

	// Call the handler function
	ctx := context.Background()
	err = handler(ctx, event)

	// Check result
	if err != nil {
		t.Errorf("Handler returned error: %v", err)
	} else {
		t.Log("Handler executed successfully")
	}

	// Log some details about what was processed
	for topic, records := range event.Records {
		t.Logf("Processed topic: %s with %d records", topic, len(records))
		for _, record := range records {
			t.Logf("  - Partition: %d, Offset: %d", record.Partition, record.Offset)
		}
	}
}
For a whole implementation example, you can refer to the pull request or the branch linked below, which show exactly how to set this up.

https://github.com/gogo-boot/cdk-lambda-go/pull/1



A Practical Guide to Deploying Go Lambdas with AWS CDK

I needed to create a Lambda function with low latency and fast computation. To achieve this, I chose Go, as it is easy to learn and consumes fewer resources compared to other languages.

However, I found that there are very few examples available for building Lambda functions in Go, especially when using AWS CDK or SAM for deployment. Even AI tools couldn't provide sufficient guidance, likely because there isn’t much related content available online.

As a result, I decided to build the solution myself.

Below is a simple example, It is deployed using AWS CDK and utilizes an AWS Linux image, which is lightweight. The CDK makes it straightforward to deploy and manage the Lambda function end to end.

import * as cdk from 'aws-cdk-lib';
import {Duration, RemovalPolicy} from "aws-cdk-lib";
import { Construct } from 'constructs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as path from "path";


export interface MyLambdaStackProps extends cdk.StackProps {}

export class MyLambdaStack extends cdk.Stack {

    constructor(scope: Construct, id: string, props: MyLambdaStackProps) {
        super(scope, id, props);

        // Define the ExampleLambda Lambda function
        const goLambda = new lambda.Function(this, "ExampleLambda", {
            runtime: lambda.Runtime.PROVIDED_AL2023, // Use the custom runtime
            handler: "bootstrap", // Go binary name
            architecture: lambda.Architecture.ARM_64, // Use ARM architecture, cheaper and better performance

            currentVersionOptions: {
                removalPolicy: RemovalPolicy.RETAIN
            },

            // lambda.Code.fromAsset will pack artifact from "/asset-output" directory.
            // therefor the "bootstrap" file must be located in this directory.
            code: lambda.Code.fromAsset(path.join(__dirname, '../example-lambda'), {  // Path to Go binary
                bundling: {
                    image: cdk.DockerImage.fromRegistry("golang:1.25"),
                    command: [
                        'bash', '-c',
                        'GOARCH=arm64 GOOS=linux CGO_ENABLED=0 go build -tags lambda.norpc -o /asset-output/bootstrap main.go'
                    ],
                },
            }),
            // environment: {
            //     'DB_HOST': '',
            // },
            // vpc: this.vpc,
            // vpcSubnets: {
            //     subnets: [subnet1, subnet2, subnet3]
            // },
            // securityGroups: [MySecurityGroup],
            // timeout: cdk.Duration.seconds(10) // Limit timeout to 10 seconds
        });
    }
}


you can have full example code https://github.com/gogo-boot/cdk-lambda-go It has better description in README.md.

You may put the Eventsource `goLambda.addEventSource` for triggering the Lambda by event.

also Provision the Lambda and Auto Scale by `goLambda.addAutoScaling({ minCapacity: 1, maxCapacity: 5 });`, so you lambda will be pre provisioned and run without cold start. so it responses very fast.