Friday, October 24, 2025

An Example of Kafka Topic Consume and Produce by AWS Lambda in Go

In this article, I demonstrate how to consume messages from Amazon MSK (Managed Streaming for Apache Kafka) with AWS Lambda and then produce messages using Lambda in Go.

For security best practices, it’s recommended to use IAM authentication with SASL when sending messages to MSK.

Currently, there are only a few Go libraries available for interacting with MSK. Some of them rely on C dependencies, which is not ideal for Go developers, as it requires enabling CGO and managing additional language dependencies.

To avoid these issues, I used the segmentio/kafka-go library, which is a Kafka client implemented entirely in Go.

MSK IAM SASL Implementation Example

func NewProducer() (*Producer, error) {
	kafkaBrokers := os.Getenv("KAFKA_BROKERS")
	if kafkaBrokers == "" {
		return nil, fmt.Errorf("KAFKA_BROKERS environment variable is required")
	}

	// Validate that we have at least one broker
	brokers := strings.Split(kafkaBrokers, ",")
	if len(brokers) == 0 {
		return nil, fmt.Errorf("KAFKA_BROKERS must contain at least one broker")
	}

	// Trim whitespace from broker addresses
	for i, broker := range brokers {
		brokers[i] = strings.TrimSpace(broker)
		if brokers[i] == "" {
			return nil, fmt.Errorf("invalid empty broker address in KAFKA_BROKERS")
		}
	}

	log.Printf("Kafka producer initializing with brokers: %v", brokers)

	awsCfg, err := config.LoadDefaultConfig(context.Background())
	if err != nil {
		return nil, fmt.Errorf("failed to load AWS config: %w", err)
	}

	// Create IAM SASL mechanism with credentials provider (not static credentials)
	iamSaslMechanism := &aws_msk_iam_v2.Mechanism{
		Signer:      signer.NewSigner(),
		Credentials: awsCfg.Credentials,
		Region:      awsCfg.Region,
	}

	// Configure transport with TLS and SASL
	sharedTransport := &kafka.Transport{
		SASL: iamSaslMechanism,
		TLS:  &tls.Config{},
	}

	// Initialize Kafka writer immediately for provisioned Lambda
	kafkaWriter := &kafka.Writer{
		Addr:         kafka.TCP(brokers...),
		RequiredAcks: kafka.RequireOne,
		BatchTimeout: 10 * time.Millisecond, // For Low latency
		BatchSize:    1,                     // Send messages immediately
		Compression:  kafka.Snappy,          // Use Snappy compression
		Transport:    sharedTransport,
	}

	log.Printf("Kafka producer initialized successfully with %d brokers", len(brokers))

	return &Producer{
		kafkaWriter: kafkaWriter,
	}, nil
}
There is a Go Test, you can put a break point and follow the process how it works. you may need a local stack or AWS Mock to test fully

func TestHandlerWithJSONFile(t *testing.T) {

	// Set up test environment (optional)
	os.Setenv("KAFKA_BROKERS", "localhost:9098,localhost:9098")

	// Read the JSON file
	jsonData, err := os.ReadFile("test/kafka-event-id-41350679.json")
	if err != nil {
		t.Fatalf("Failed to read test JSON file: %v", err)
	}

	// Unmarshal into KafkaEvent
	var event events.KafkaEvent
	if err := json.Unmarshal(jsonData, &event); err != nil {
		t.Fatalf("Failed to unmarshal JSON into KafkaEvent: %v", err)
	}

	// Call the handler function
	ctx := context.Background()
	err = handler(ctx, event)

	// Check result
	if err != nil {
		t.Errorf("Handler returned error: %v", err)
	} else {
		t.Log("Handler executed successfully")
	}

	// Log some details about what was processed
	for topic, records := range event.Records {
		t.Logf("Processed topic: %s with %d records", topic, len(records))
		for _, record := range records {
			t.Logf("  - Partition: %d, Offset: %d", record.Partition, record.Offset)
		}
	}
}
For a whole implementation example, you can refer to the pull request or the branch linked below, which show exactly how to set this up.

https://github.com/gogo-boot/cdk-lambda-go/pull/1



No comments:

Post a Comment