For security best practices, it’s recommended to use IAM authentication with SASL when sending messages to MSK.
Currently, there are only a few Go libraries available for interacting with MSK. Some of them rely on C dependencies, which is not ideal for Go developers, as it requires enabling CGO and managing additional language dependencies.
To avoid these issues, I used the segmentio/kafka-go library, which is a Kafka client implemented entirely in Go.
MSK IAM SASL Implementation Example
func NewProducer() (*Producer, error) {
kafkaBrokers := os.Getenv("KAFKA_BROKERS")
if kafkaBrokers == "" {
return nil, fmt.Errorf("KAFKA_BROKERS environment variable is required")
}
// Validate that we have at least one broker
brokers := strings.Split(kafkaBrokers, ",")
if len(brokers) == 0 {
return nil, fmt.Errorf("KAFKA_BROKERS must contain at least one broker")
}
// Trim whitespace from broker addresses
for i, broker := range brokers {
brokers[i] = strings.TrimSpace(broker)
if brokers[i] == "" {
return nil, fmt.Errorf("invalid empty broker address in KAFKA_BROKERS")
}
}
log.Printf("Kafka producer initializing with brokers: %v", brokers)
awsCfg, err := config.LoadDefaultConfig(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}
// Create IAM SASL mechanism with credentials provider (not static credentials)
iamSaslMechanism := &aws_msk_iam_v2.Mechanism{
Signer: signer.NewSigner(),
Credentials: awsCfg.Credentials,
Region: awsCfg.Region,
}
// Configure transport with TLS and SASL
sharedTransport := &kafka.Transport{
SASL: iamSaslMechanism,
TLS: &tls.Config{},
}
// Initialize Kafka writer immediately for provisioned Lambda
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(brokers...),
RequiredAcks: kafka.RequireOne,
BatchTimeout: 10 * time.Millisecond, // For Low latency
BatchSize: 1, // Send messages immediately
Compression: kafka.Snappy, // Use Snappy compression
Transport: sharedTransport,
}
log.Printf("Kafka producer initialized successfully with %d brokers", len(brokers))
return &Producer{
kafkaWriter: kafkaWriter,
}, nil
}
There is a Go Test, you can put a break point and follow the process how it works. you may need a local stack or AWS Mock to test fully
func TestHandlerWithJSONFile(t *testing.T) {
// Set up test environment (optional)
os.Setenv("KAFKA_BROKERS", "localhost:9098,localhost:9098")
// Read the JSON file
jsonData, err := os.ReadFile("test/kafka-event-id-41350679.json")
if err != nil {
t.Fatalf("Failed to read test JSON file: %v", err)
}
// Unmarshal into KafkaEvent
var event events.KafkaEvent
if err := json.Unmarshal(jsonData, &event); err != nil {
t.Fatalf("Failed to unmarshal JSON into KafkaEvent: %v", err)
}
// Call the handler function
ctx := context.Background()
err = handler(ctx, event)
// Check result
if err != nil {
t.Errorf("Handler returned error: %v", err)
} else {
t.Log("Handler executed successfully")
}
// Log some details about what was processed
for topic, records := range event.Records {
t.Logf("Processed topic: %s with %d records", topic, len(records))
for _, record := range records {
t.Logf(" - Partition: %d, Offset: %d", record.Partition, record.Offset)
}
}
}
For a whole implementation example, you can refer to the pull request or the branch linked below, which show exactly how to set this up.https://github.com/gogo-boot/cdk-lambda-go/pull/1
No comments:
Post a Comment