Matillion Roadmap and Ideas

Welcome to the Matillion Roadmap

Last updated: 04/28/2025
Check out our recently delivered features here.

Output directly to a cloud watch log stream

I am currently working with a client that requires the ability to output several data points (text string) into a cloud watch log stream. Currently, we only support the ability to output metrics into an event stream.

I have been able to work around this requirement by using Python, but this has facilitated the move to CHA.

import boto3
import json
import time



# AWS Details
aws_region = "eu-west-1"
config_secret_name = "MATILLION_CONFIG"

secret_client = boto3.client('secretsmanager', region_name=aws_region)
config_secret = json.loads(secret_client.get_secret_value(SecretId=config_secret_name)['SecretString'])

client = boto3.client('logs', region_name=aws_region)

# Creates a log group
def creatre_log_group():
    response = client.create_log_group(
        logGroupName=config_secret['log_group_name']
    )
    print(response)

# Creates a log stream
def create_log_stream():
    response = client.create_log_stream(
        logGroupName=config_secret['log_group_name'],
        logStreamName=config_secret['log_stream_name']
    )
    print(response)

# Check if log group and log stream exists
def check_log_group_exists():
    response = client.describe_log_groups(
        logGroupNamePrefix=config_secret['log_group_name']
    )
    return len(response['logGroups']) > 0

# Check if log group and log stream exists
def check_log_stream_exists(): 
    response = client.describe_log_streams(
        logGroupName=config_secret['log_group_name'],
        logStreamNamePrefix=config_secret['log_stream_name']
    )
    return len(response['logStreams']) > 0

# Process text string into CloudWatch 
def log_to_cloudwatch(log_message):
    print("Log context: " + log_message)
    response = client.put_log_events(
        logGroupName=config_secret['log_group_name'],
        logStreamName=config_secret['log_stream_name'],
        logEvents=[
            {
                'timestamp': int(round(time.time() * 1000)),
                'message': log_message
            },
        ]
    )

    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        print("Log message sent successfully")
    else:
        print("Failed to send log message")

def main():
    if not check_log_group_exists():
        creatre_log_group()
    if not check_log_stream_exists():
        create_log_stream()

    log_to_cloudwatch("This is a test message")

if __name__ == "__main__":
    main()