Create AWS Timestream Scheduled Queries

The aws:timestreamquery/scheduledQuery:ScheduledQuery resource, part of the Pulumi AWS provider, defines a Timestream scheduled query that runs SQL aggregations on time-series data at regular intervals and writes results to a target table. This guide focuses on two capabilities: infrastructure setup for scheduled query execution and SQL-based aggregations with result mapping.

Scheduled queries require source and results databases with tables, an IAM execution role, an S3 bucket for error reports, and an SNS topic for notifications. The examples are intentionally small. Combine them with your own data ingestion pipeline and monitoring setup.

Provision infrastructure for scheduled query execution

Before creating a scheduled query, you need supporting infrastructure: databases, tables, IAM permissions, error reporting, and notifications.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const test = new aws.s3.Bucket("test", {
    bucket: "example",
    forceDestroy: true,
});
const testTopic = new aws.sns.Topic("test", {name: "example"});
const testQueue = new aws.sqs.Queue("test", {
    name: "example",
    sqsManagedSseEnabled: true,
});
const testTopicSubscription = new aws.sns.TopicSubscription("test", {
    topic: testTopic.arn,
    protocol: "sqs",
    endpoint: testQueue.arn,
});
const testQueuePolicy = new aws.sqs.QueuePolicy("test", {
    queueUrl: testQueue.id,
    policy: pulumi.jsonStringify({
        Version: "2012-10-17",
        Statement: [{
            Effect: "Allow",
            Principal: {
                AWS: "*",
            },
            Action: ["sqs:SendMessage"],
            Resource: testQueue.arn,
            Condition: {
                ArnEquals: {
                    "aws:SourceArn": testTopic.arn,
                },
            },
        }],
    }),
});
const testRole = new aws.iam.Role("test", {
    name: "example",
    assumeRolePolicy: JSON.stringify({
        Version: "2012-10-17",
        Statement: [{
            Effect: "Allow",
            Principal: {
                Service: "timestream.amazonaws.com",
            },
            Action: "sts:AssumeRole",
        }],
    }),
    tags: {
        Name: "example",
    },
});
const testRolePolicy = new aws.iam.RolePolicy("test", {
    name: "example",
    role: testRole.id,
    policy: JSON.stringify({
        Version: "2012-10-17",
        Statement: [{
            Action: [
                "kms:Decrypt",
                "sns:Publish",
                "timestream:describeEndpoints",
                "timestream:Select",
                "timestream:SelectValues",
                "timestream:WriteRecords",
                "s3:PutObject",
            ],
            Resource: "*",
            Effect: "Allow",
        }],
    }),
});
const testDatabase = new aws.timestreamwrite.Database("test", {databaseName: "exampledatabase"});
const testTable = new aws.timestreamwrite.Table("test", {
    databaseName: testDatabase.databaseName,
    tableName: "exampletable",
    magneticStoreWriteProperties: {
        enableMagneticStoreWrites: true,
    },
    retentionProperties: {
        magneticStoreRetentionPeriodInDays: 1,
        memoryStoreRetentionPeriodInHours: 1,
    },
});
const results = new aws.timestreamwrite.Database("results", {databaseName: "exampledatabase-results"});
const resultsTable = new aws.timestreamwrite.Table("results", {
    databaseName: results.databaseName,
    tableName: "exampletable-results",
    magneticStoreWriteProperties: {
        enableMagneticStoreWrites: true,
    },
    retentionProperties: {
        magneticStoreRetentionPeriodInDays: 1,
        memoryStoreRetentionPeriodInHours: 1,
    },
});
import pulumi
import json
import pulumi_aws as aws

test = aws.s3.Bucket("test",
    bucket="example",
    force_destroy=True)
test_topic = aws.sns.Topic("test", name="example")
test_queue = aws.sqs.Queue("test",
    name="example",
    sqs_managed_sse_enabled=True)
test_topic_subscription = aws.sns.TopicSubscription("test",
    topic=test_topic.arn,
    protocol="sqs",
    endpoint=test_queue.arn)
test_queue_policy = aws.sqs.QueuePolicy("test",
    queue_url=test_queue.id,
    policy=pulumi.Output.json_dumps({
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {
                "AWS": "*",
            },
            "Action": ["sqs:SendMessage"],
            "Resource": test_queue.arn,
            "Condition": {
                "ArnEquals": {
                    "aws:SourceArn": test_topic.arn,
                },
            },
        }],
    }))
test_role = aws.iam.Role("test",
    name="example",
    assume_role_policy=json.dumps({
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {
                "Service": "timestream.amazonaws.com",
            },
            "Action": "sts:AssumeRole",
        }],
    }),
    tags={
        "Name": "example",
    })
test_role_policy = aws.iam.RolePolicy("test",
    name="example",
    role=test_role.id,
    policy=json.dumps({
        "Version": "2012-10-17",
        "Statement": [{
            "Action": [
                "kms:Decrypt",
                "sns:Publish",
                "timestream:describeEndpoints",
                "timestream:Select",
                "timestream:SelectValues",
                "timestream:WriteRecords",
                "s3:PutObject",
            ],
            "Resource": "*",
            "Effect": "Allow",
        }],
    }))
test_database = aws.timestreamwrite.Database("test", database_name="exampledatabase")
test_table = aws.timestreamwrite.Table("test",
    database_name=test_database.database_name,
    table_name="exampletable",
    magnetic_store_write_properties={
        "enable_magnetic_store_writes": True,
    },
    retention_properties={
        "magnetic_store_retention_period_in_days": 1,
        "memory_store_retention_period_in_hours": 1,
    })
results = aws.timestreamwrite.Database("results", database_name="exampledatabase-results")
results_table = aws.timestreamwrite.Table("results",
    database_name=results.database_name,
    table_name="exampletable-results",
    magnetic_store_write_properties={
        "enable_magnetic_store_writes": True,
    },
    retention_properties={
        "magnetic_store_retention_period_in_days": 1,
        "memory_store_retention_period_in_hours": 1,
    })
package main

import (
	"encoding/json"

	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sns"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sqs"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/timestreamwrite"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := s3.NewBucket(ctx, "test", &s3.BucketArgs{
			Bucket:       pulumi.String("example"),
			ForceDestroy: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		testTopic, err := sns.NewTopic(ctx, "test", &sns.TopicArgs{
			Name: pulumi.String("example"),
		})
		if err != nil {
			return err
		}
		testQueue, err := sqs.NewQueue(ctx, "test", &sqs.QueueArgs{
			Name:                 pulumi.String("example"),
			SqsManagedSseEnabled: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		_, err = sns.NewTopicSubscription(ctx, "test", &sns.TopicSubscriptionArgs{
			Topic:    testTopic.Arn,
			Protocol: pulumi.String("sqs"),
			Endpoint: testQueue.Arn,
		})
		if err != nil {
			return err
		}
		_, err = sqs.NewQueuePolicy(ctx, "test", &sqs.QueuePolicyArgs{
			QueueUrl: testQueue.ID(),
			Policy: pulumi.All(testQueue.Arn, testTopic.Arn).ApplyT(func(_args []interface{}) (string, error) {
				testQueueArn := _args[0].(string)
				testTopicArn := _args[1].(string)
				var _zero string
				tmpJSON0, err := json.Marshal(map[string]interface{}{
					"Version": "2012-10-17",
					"Statement": []map[string]interface{}{
						map[string]interface{}{
							"Effect": "Allow",
							"Principal": map[string]interface{}{
								"AWS": "*",
							},
							"Action": []string{
								"sqs:SendMessage",
							},
							"Resource": testQueueArn,
							"Condition": map[string]interface{}{
								"ArnEquals": map[string]interface{}{
									"aws:SourceArn": testTopicArn,
								},
							},
						},
					},
				})
				if err != nil {
					return _zero, err
				}
				json0 := string(tmpJSON0)
				return json0, nil
			}).(pulumi.StringOutput),
		})
		if err != nil {
			return err
		}
		tmpJSON1, err := json.Marshal(map[string]interface{}{
			"Version": "2012-10-17",
			"Statement": []map[string]interface{}{
				map[string]interface{}{
					"Effect": "Allow",
					"Principal": map[string]interface{}{
						"Service": "timestream.amazonaws.com",
					},
					"Action": "sts:AssumeRole",
				},
			},
		})
		if err != nil {
			return err
		}
		json1 := string(tmpJSON1)
		testRole, err := iam.NewRole(ctx, "test", &iam.RoleArgs{
			Name:             pulumi.String("example"),
			AssumeRolePolicy: pulumi.String(json1),
			Tags: pulumi.StringMap{
				"Name": pulumi.String("example"),
			},
		})
		if err != nil {
			return err
		}
		tmpJSON2, err := json.Marshal(map[string]interface{}{
			"Version": "2012-10-17",
			"Statement": []map[string]interface{}{
				map[string]interface{}{
					"Action": []string{
						"kms:Decrypt",
						"sns:Publish",
						"timestream:describeEndpoints",
						"timestream:Select",
						"timestream:SelectValues",
						"timestream:WriteRecords",
						"s3:PutObject",
					},
					"Resource": "*",
					"Effect":   "Allow",
				},
			},
		})
		if err != nil {
			return err
		}
		json2 := string(tmpJSON2)
		_, err = iam.NewRolePolicy(ctx, "test", &iam.RolePolicyArgs{
			Name:   pulumi.String("example"),
			Role:   testRole.ID(),
			Policy: pulumi.String(json2),
		})
		if err != nil {
			return err
		}
		testDatabase, err := timestreamwrite.NewDatabase(ctx, "test", &timestreamwrite.DatabaseArgs{
			DatabaseName: pulumi.String("exampledatabase"),
		})
		if err != nil {
			return err
		}
		_, err = timestreamwrite.NewTable(ctx, "test", &timestreamwrite.TableArgs{
			DatabaseName: testDatabase.DatabaseName,
			TableName:    pulumi.String("exampletable"),
			MagneticStoreWriteProperties: &timestreamwrite.TableMagneticStoreWritePropertiesArgs{
				EnableMagneticStoreWrites: pulumi.Bool(true),
			},
			RetentionProperties: &timestreamwrite.TableRetentionPropertiesArgs{
				MagneticStoreRetentionPeriodInDays: pulumi.Int(1),
				MemoryStoreRetentionPeriodInHours:  pulumi.Int(1),
			},
		})
		if err != nil {
			return err
		}
		results, err := timestreamwrite.NewDatabase(ctx, "results", &timestreamwrite.DatabaseArgs{
			DatabaseName: pulumi.String("exampledatabase-results"),
		})
		if err != nil {
			return err
		}
		_, err = timestreamwrite.NewTable(ctx, "results", &timestreamwrite.TableArgs{
			DatabaseName: results.DatabaseName,
			TableName:    pulumi.String("exampletable-results"),
			MagneticStoreWriteProperties: &timestreamwrite.TableMagneticStoreWritePropertiesArgs{
				EnableMagneticStoreWrites: pulumi.Bool(true),
			},
			RetentionProperties: &timestreamwrite.TableRetentionPropertiesArgs{
				MagneticStoreRetentionPeriodInDays: pulumi.Int(1),
				MemoryStoreRetentionPeriodInHours:  pulumi.Int(1),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var test = new Aws.S3.Bucket("test", new()
    {
        BucketName = "example",
        ForceDestroy = true,
    });

    var testTopic = new Aws.Sns.Topic("test", new()
    {
        Name = "example",
    });

    var testQueue = new Aws.Sqs.Queue("test", new()
    {
        Name = "example",
        SqsManagedSseEnabled = true,
    });

    var testTopicSubscription = new Aws.Sns.TopicSubscription("test", new()
    {
        Topic = testTopic.Arn,
        Protocol = "sqs",
        Endpoint = testQueue.Arn,
    });

    var testQueuePolicy = new Aws.Sqs.QueuePolicy("test", new()
    {
        QueueUrl = testQueue.Id,
        Policy = Output.JsonSerialize(Output.Create(new Dictionary<string, object?>
        {
            ["Version"] = "2012-10-17",
            ["Statement"] = new[]
            {
                new Dictionary<string, object?>
                {
                    ["Effect"] = "Allow",
                    ["Principal"] = new Dictionary<string, object?>
                    {
                        ["AWS"] = "*",
                    },
                    ["Action"] = new[]
                    {
                        "sqs:SendMessage",
                    },
                    ["Resource"] = testQueue.Arn,
                    ["Condition"] = new Dictionary<string, object?>
                    {
                        ["ArnEquals"] = new Dictionary<string, object?>
                        {
                            ["aws:SourceArn"] = testTopic.Arn,
                        },
                    },
                },
            },
        })),
    });

    var testRole = new Aws.Iam.Role("test", new()
    {
        Name = "example",
        AssumeRolePolicy = JsonSerializer.Serialize(new Dictionary<string, object?>
        {
            ["Version"] = "2012-10-17",
            ["Statement"] = new[]
            {
                new Dictionary<string, object?>
                {
                    ["Effect"] = "Allow",
                    ["Principal"] = new Dictionary<string, object?>
                    {
                        ["Service"] = "timestream.amazonaws.com",
                    },
                    ["Action"] = "sts:AssumeRole",
                },
            },
        }),
        Tags = 
        {
            { "Name", "example" },
        },
    });

    var testRolePolicy = new Aws.Iam.RolePolicy("test", new()
    {
        Name = "example",
        Role = testRole.Id,
        Policy = JsonSerializer.Serialize(new Dictionary<string, object?>
        {
            ["Version"] = "2012-10-17",
            ["Statement"] = new[]
            {
                new Dictionary<string, object?>
                {
                    ["Action"] = new[]
                    {
                        "kms:Decrypt",
                        "sns:Publish",
                        "timestream:describeEndpoints",
                        "timestream:Select",
                        "timestream:SelectValues",
                        "timestream:WriteRecords",
                        "s3:PutObject",
                    },
                    ["Resource"] = "*",
                    ["Effect"] = "Allow",
                },
            },
        }),
    });

    var testDatabase = new Aws.TimestreamWrite.Database("test", new()
    {
        DatabaseName = "exampledatabase",
    });

    var testTable = new Aws.TimestreamWrite.Table("test", new()
    {
        DatabaseName = testDatabase.DatabaseName,
        TableName = "exampletable",
        MagneticStoreWriteProperties = new Aws.TimestreamWrite.Inputs.TableMagneticStoreWritePropertiesArgs
        {
            EnableMagneticStoreWrites = true,
        },
        RetentionProperties = new Aws.TimestreamWrite.Inputs.TableRetentionPropertiesArgs
        {
            MagneticStoreRetentionPeriodInDays = 1,
            MemoryStoreRetentionPeriodInHours = 1,
        },
    });

    var results = new Aws.TimestreamWrite.Database("results", new()
    {
        DatabaseName = "exampledatabase-results",
    });

    var resultsTable = new Aws.TimestreamWrite.Table("results", new()
    {
        DatabaseName = results.DatabaseName,
        TableName = "exampletable-results",
        MagneticStoreWriteProperties = new Aws.TimestreamWrite.Inputs.TableMagneticStoreWritePropertiesArgs
        {
            EnableMagneticStoreWrites = true,
        },
        RetentionProperties = new Aws.TimestreamWrite.Inputs.TableRetentionPropertiesArgs
        {
            MagneticStoreRetentionPeriodInDays = 1,
            MemoryStoreRetentionPeriodInHours = 1,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.sns.Topic;
import com.pulumi.aws.sns.TopicArgs;
import com.pulumi.aws.sqs.Queue;
import com.pulumi.aws.sqs.QueueArgs;
import com.pulumi.aws.sns.TopicSubscription;
import com.pulumi.aws.sns.TopicSubscriptionArgs;
import com.pulumi.aws.sqs.QueuePolicy;
import com.pulumi.aws.sqs.QueuePolicyArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import com.pulumi.aws.timestreamwrite.Database;
import com.pulumi.aws.timestreamwrite.DatabaseArgs;
import com.pulumi.aws.timestreamwrite.Table;
import com.pulumi.aws.timestreamwrite.TableArgs;
import com.pulumi.aws.timestreamwrite.inputs.TableMagneticStoreWritePropertiesArgs;
import com.pulumi.aws.timestreamwrite.inputs.TableRetentionPropertiesArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Bucket("test", BucketArgs.builder()
            .bucket("example")
            .forceDestroy(true)
            .build());

        var testTopic = new Topic("testTopic", TopicArgs.builder()
            .name("example")
            .build());

        var testQueue = new Queue("testQueue", QueueArgs.builder()
            .name("example")
            .sqsManagedSseEnabled(true)
            .build());

        var testTopicSubscription = new TopicSubscription("testTopicSubscription", TopicSubscriptionArgs.builder()
            .topic(testTopic.arn())
            .protocol("sqs")
            .endpoint(testQueue.arn())
            .build());

        var testQueuePolicy = new QueuePolicy("testQueuePolicy", QueuePolicyArgs.builder()
            .queueUrl(testQueue.id())
            .policy(Output.tuple(testQueue.arn(), testTopic.arn()).applyValue(values -> {
                var testQueueArn = values.t1;
                var testTopicArn = values.t2;
                return serializeJson(
                    jsonObject(
                        jsonProperty("Version", "2012-10-17"),
                        jsonProperty("Statement", jsonArray(jsonObject(
                            jsonProperty("Effect", "Allow"),
                            jsonProperty("Principal", jsonObject(
                                jsonProperty("AWS", "*")
                            )),
                            jsonProperty("Action", jsonArray("sqs:SendMessage")),
                            jsonProperty("Resource", testQueueArn),
                            jsonProperty("Condition", jsonObject(
                                jsonProperty("ArnEquals", jsonObject(
                                    jsonProperty("aws:SourceArn", testTopicArn)
                                ))
                            ))
                        )))
                    ));
            }))
            .build());

        var testRole = new Role("testRole", RoleArgs.builder()
            .name("example")
            .assumeRolePolicy(serializeJson(
                jsonObject(
                    jsonProperty("Version", "2012-10-17"),
                    jsonProperty("Statement", jsonArray(jsonObject(
                        jsonProperty("Effect", "Allow"),
                        jsonProperty("Principal", jsonObject(
                            jsonProperty("Service", "timestream.amazonaws.com")
                        )),
                        jsonProperty("Action", "sts:AssumeRole")
                    )))
                )))
            .tags(Map.of("Name", "example"))
            .build());

        var testRolePolicy = new RolePolicy("testRolePolicy", RolePolicyArgs.builder()
            .name("example")
            .role(testRole.id())
            .policy(serializeJson(
                jsonObject(
                    jsonProperty("Version", "2012-10-17"),
                    jsonProperty("Statement", jsonArray(jsonObject(
                        jsonProperty("Action", jsonArray(
                            "kms:Decrypt", 
                            "sns:Publish", 
                            "timestream:describeEndpoints", 
                            "timestream:Select", 
                            "timestream:SelectValues", 
                            "timestream:WriteRecords", 
                            "s3:PutObject"
                        )),
                        jsonProperty("Resource", "*"),
                        jsonProperty("Effect", "Allow")
                    )))
                )))
            .build());

        var testDatabase = new Database("testDatabase", DatabaseArgs.builder()
            .databaseName("exampledatabase")
            .build());

        var testTable = new Table("testTable", TableArgs.builder()
            .databaseName(testDatabase.databaseName())
            .tableName("exampletable")
            .magneticStoreWriteProperties(TableMagneticStoreWritePropertiesArgs.builder()
                .enableMagneticStoreWrites(true)
                .build())
            .retentionProperties(TableRetentionPropertiesArgs.builder()
                .magneticStoreRetentionPeriodInDays(1)
                .memoryStoreRetentionPeriodInHours(1)
                .build())
            .build());

        var results = new Database("results", DatabaseArgs.builder()
            .databaseName("exampledatabase-results")
            .build());

        var resultsTable = new Table("resultsTable", TableArgs.builder()
            .databaseName(results.databaseName())
            .tableName("exampletable-results")
            .magneticStoreWriteProperties(TableMagneticStoreWritePropertiesArgs.builder()
                .enableMagneticStoreWrites(true)
                .build())
            .retentionProperties(TableRetentionPropertiesArgs.builder()
                .magneticStoreRetentionPeriodInDays(1)
                .memoryStoreRetentionPeriodInHours(1)
                .build())
            .build());

    }
}
resources:
  test:
    type: aws:s3:Bucket
    properties:
      bucket: example
      forceDestroy: true
  testTopic:
    type: aws:sns:Topic
    name: test
    properties:
      name: example
  testQueue:
    type: aws:sqs:Queue
    name: test
    properties:
      name: example
      sqsManagedSseEnabled: true
  testTopicSubscription:
    type: aws:sns:TopicSubscription
    name: test
    properties:
      topic: ${testTopic.arn}
      protocol: sqs
      endpoint: ${testQueue.arn}
  testQueuePolicy:
    type: aws:sqs:QueuePolicy
    name: test
    properties:
      queueUrl: ${testQueue.id}
      policy:
        fn::toJSON:
          Version: 2012-10-17
          Statement:
            - Effect: Allow
              Principal:
                AWS: '*'
              Action:
                - sqs:SendMessage
              Resource: ${testQueue.arn}
              Condition:
                ArnEquals:
                  aws:SourceArn: ${testTopic.arn}
  testRole:
    type: aws:iam:Role
    name: test
    properties:
      name: example
      assumeRolePolicy:
        fn::toJSON:
          Version: 2012-10-17
          Statement:
            - Effect: Allow
              Principal:
                Service: timestream.amazonaws.com
              Action: sts:AssumeRole
      tags:
        Name: example
  testRolePolicy:
    type: aws:iam:RolePolicy
    name: test
    properties:
      name: example
      role: ${testRole.id}
      policy:
        fn::toJSON:
          Version: 2012-10-17
          Statement:
            - Action:
                - kms:Decrypt
                - sns:Publish
                - timestream:describeEndpoints
                - timestream:Select
                - timestream:SelectValues
                - timestream:WriteRecords
                - s3:PutObject
              Resource: '*'
              Effect: Allow
  testDatabase:
    type: aws:timestreamwrite:Database
    name: test
    properties:
      databaseName: exampledatabase
  testTable:
    type: aws:timestreamwrite:Table
    name: test
    properties:
      databaseName: ${testDatabase.databaseName}
      tableName: exampletable
      magneticStoreWriteProperties:
        enableMagneticStoreWrites: true
      retentionProperties:
        magneticStoreRetentionPeriodInDays: 1
        memoryStoreRetentionPeriodInHours: 1
  results:
    type: aws:timestreamwrite:Database
    properties:
      databaseName: exampledatabase-results
  resultsTable:
    type: aws:timestreamwrite:Table
    name: results
    properties:
      databaseName: ${results.databaseName}
      tableName: exampletable-results
      magneticStoreWriteProperties:
        enableMagneticStoreWrites: true
      retentionProperties:
        magneticStoreRetentionPeriodInDays: 1
        memoryStoreRetentionPeriodInHours: 1

The IAM role’s assumeRolePolicy grants Timestream permission to execute queries on your behalf. The role policy allows Timestream to read from source tables, write to results tables, publish to SNS, and store error reports in S3. Both source and results tables configure magneticStoreWriteProperties to enable long-term storage and set retentionProperties to control data lifecycle. The SNS topic connects to an SQS queue for notification delivery.

Run aggregation queries on a recurring schedule

Time-series workloads often pre-compute aggregations at regular intervals to power dashboards without re-scanning raw data.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.timestreamquery.ScheduledQuery("example", {
    executionRoleArn: exampleAwsIamRole.arn,
    name: exampleAwsTimestreamwriteTable.tableName,
    queryString: `SELECT region, az, hostname, BIN(time, 15s) AS binned_timestamp,
\\tROUND(AVG(cpu_utilization), 2) AS avg_cpu_utilization,
\\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.9), 2) AS p90_cpu_utilization,
\\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.95), 2) AS p95_cpu_utilization,
\\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.99), 2) AS p99_cpu_utilization
FROM exampledatabase.exampletable
WHERE measure_name = 'metrics' AND time > ago(2h)
GROUP BY region, hostname, az, BIN(time, 15s)
ORDER BY binned_timestamp ASC
LIMIT 5
`,
    errorReportConfiguration: {
        s3Configuration: {
            bucketName: exampleAwsS3Bucket.bucket,
        },
    },
    notificationConfiguration: {
        snsConfiguration: {
            topicArn: exampleAwsSnsTopic.arn,
        },
    },
    scheduleConfiguration: {
        scheduleExpression: "rate(1 hour)",
    },
    targetConfiguration: {
        timestreamConfiguration: {
            databaseName: results.databaseName,
            tableName: resultsAwsTimestreamwriteTable.tableName,
            timeColumn: "binned_timestamp",
            dimensionMappings: [
                {
                    dimensionValueType: "VARCHAR",
                    name: "az",
                },
                {
                    dimensionValueType: "VARCHAR",
                    name: "region",
                },
                {
                    dimensionValueType: "VARCHAR",
                    name: "hostname",
                },
            ],
            multiMeasureMappings: {
                targetMultiMeasureName: "multi-metrics",
                multiMeasureAttributeMappings: [
                    {
                        measureValueType: "DOUBLE",
                        sourceColumn: "avg_cpu_utilization",
                    },
                    {
                        measureValueType: "DOUBLE",
                        sourceColumn: "p90_cpu_utilization",
                    },
                    {
                        measureValueType: "DOUBLE",
                        sourceColumn: "p95_cpu_utilization",
                    },
                    {
                        measureValueType: "DOUBLE",
                        sourceColumn: "p99_cpu_utilization",
                    },
                ],
            },
        },
    },
});
import pulumi
import pulumi_aws as aws

example = aws.timestreamquery.ScheduledQuery("example",
    execution_role_arn=example_aws_iam_role["arn"],
    name=example_aws_timestreamwrite_table["tableName"],
    query_string="""SELECT region, az, hostname, BIN(time, 15s) AS binned_timestamp,
\tROUND(AVG(cpu_utilization), 2) AS avg_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.9), 2) AS p90_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.95), 2) AS p95_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.99), 2) AS p99_cpu_utilization
FROM exampledatabase.exampletable
WHERE measure_name = 'metrics' AND time > ago(2h)
GROUP BY region, hostname, az, BIN(time, 15s)
ORDER BY binned_timestamp ASC
LIMIT 5
""",
    error_report_configuration={
        "s3_configuration": {
            "bucket_name": example_aws_s3_bucket["bucket"],
        },
    },
    notification_configuration={
        "sns_configuration": {
            "topic_arn": example_aws_sns_topic["arn"],
        },
    },
    schedule_configuration={
        "schedule_expression": "rate(1 hour)",
    },
    target_configuration={
        "timestream_configuration": {
            "database_name": results["databaseName"],
            "table_name": results_aws_timestreamwrite_table["tableName"],
            "time_column": "binned_timestamp",
            "dimension_mappings": [
                {
                    "dimension_value_type": "VARCHAR",
                    "name": "az",
                },
                {
                    "dimension_value_type": "VARCHAR",
                    "name": "region",
                },
                {
                    "dimension_value_type": "VARCHAR",
                    "name": "hostname",
                },
            ],
            "multi_measure_mappings": {
                "target_multi_measure_name": "multi-metrics",
                "multi_measure_attribute_mappings": [
                    {
                        "measure_value_type": "DOUBLE",
                        "source_column": "avg_cpu_utilization",
                    },
                    {
                        "measure_value_type": "DOUBLE",
                        "source_column": "p90_cpu_utilization",
                    },
                    {
                        "measure_value_type": "DOUBLE",
                        "source_column": "p95_cpu_utilization",
                    },
                    {
                        "measure_value_type": "DOUBLE",
                        "source_column": "p99_cpu_utilization",
                    },
                ],
            },
        },
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/timestreamquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := timestreamquery.NewScheduledQuery(ctx, "example", &timestreamquery.ScheduledQueryArgs{
			ExecutionRoleArn: pulumi.Any(exampleAwsIamRole.Arn),
			Name:             pulumi.Any(exampleAwsTimestreamwriteTable.TableName),
			QueryString: pulumi.String(`SELECT region, az, hostname, BIN(time, 15s) AS binned_timestamp,
\tROUND(AVG(cpu_utilization), 2) AS avg_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.9), 2) AS p90_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.95), 2) AS p95_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.99), 2) AS p99_cpu_utilization
FROM exampledatabase.exampletable
WHERE measure_name = 'metrics' AND time > ago(2h)
GROUP BY region, hostname, az, BIN(time, 15s)
ORDER BY binned_timestamp ASC
LIMIT 5
`),
			ErrorReportConfiguration: &timestreamquery.ScheduledQueryErrorReportConfigurationArgs{
				S3Configuration: &timestreamquery.ScheduledQueryErrorReportConfigurationS3ConfigurationArgs{
					BucketName: pulumi.Any(exampleAwsS3Bucket.Bucket),
				},
			},
			NotificationConfiguration: &timestreamquery.ScheduledQueryNotificationConfigurationArgs{
				SnsConfiguration: &timestreamquery.ScheduledQueryNotificationConfigurationSnsConfigurationArgs{
					TopicArn: pulumi.Any(exampleAwsSnsTopic.Arn),
				},
			},
			ScheduleConfiguration: &timestreamquery.ScheduledQueryScheduleConfigurationArgs{
				ScheduleExpression: pulumi.String("rate(1 hour)"),
			},
			TargetConfiguration: &timestreamquery.ScheduledQueryTargetConfigurationArgs{
				TimestreamConfiguration: &timestreamquery.ScheduledQueryTargetConfigurationTimestreamConfigurationArgs{
					DatabaseName: pulumi.Any(results.DatabaseName),
					TableName:    pulumi.Any(resultsAwsTimestreamwriteTable.TableName),
					TimeColumn:   pulumi.String("binned_timestamp"),
					DimensionMappings: timestreamquery.ScheduledQueryTargetConfigurationTimestreamConfigurationDimensionMappingArray{
						&timestreamquery.ScheduledQueryTargetConfigurationTimestreamConfigurationDimensionMappingArgs{
							DimensionValueType: pulumi.String("VARCHAR"),
							Name:               pulumi.String("az"),
						},
						&timestreamquery.ScheduledQueryTargetConfigurationTimestreamConfigurationDimensionMappingArgs{
							DimensionValueType: pulumi.String("VARCHAR"),
							Name:               pulumi.String("region"),
						},
						&timestreamquery.ScheduledQueryTargetConfigurationTimestreamConfigurationDimensionMappingArgs{
							DimensionValueType: pulumi.String("VARCHAR"),
							Name:               pulumi.String("hostname"),
						},
					},
					MultiMeasureMappings: &timestreamquery.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsArgs{
						TargetMultiMeasureName: pulumi.String("multi-metrics"),
						MultiMeasureAttributeMappings: timestreamquery.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArray{
							&timestreamquery.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs{
								MeasureValueType: pulumi.String("DOUBLE"),
								SourceColumn:     pulumi.String("avg_cpu_utilization"),
							},
							&timestreamquery.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs{
								MeasureValueType: pulumi.String("DOUBLE"),
								SourceColumn:     pulumi.String("p90_cpu_utilization"),
							},
							&timestreamquery.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs{
								MeasureValueType: pulumi.String("DOUBLE"),
								SourceColumn:     pulumi.String("p95_cpu_utilization"),
							},
							&timestreamquery.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs{
								MeasureValueType: pulumi.String("DOUBLE"),
								SourceColumn:     pulumi.String("p99_cpu_utilization"),
							},
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.TimestreamQuery.ScheduledQuery("example", new()
    {
        ExecutionRoleArn = exampleAwsIamRole.Arn,
        Name = exampleAwsTimestreamwriteTable.TableName,
        QueryString = @"SELECT region, az, hostname, BIN(time, 15s) AS binned_timestamp,
\tROUND(AVG(cpu_utilization), 2) AS avg_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.9), 2) AS p90_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.95), 2) AS p95_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.99), 2) AS p99_cpu_utilization
FROM exampledatabase.exampletable
WHERE measure_name = 'metrics' AND time > ago(2h)
GROUP BY region, hostname, az, BIN(time, 15s)
ORDER BY binned_timestamp ASC
LIMIT 5
",
        ErrorReportConfiguration = new Aws.TimestreamQuery.Inputs.ScheduledQueryErrorReportConfigurationArgs
        {
            S3Configuration = new Aws.TimestreamQuery.Inputs.ScheduledQueryErrorReportConfigurationS3ConfigurationArgs
            {
                BucketName = exampleAwsS3Bucket.Bucket,
            },
        },
        NotificationConfiguration = new Aws.TimestreamQuery.Inputs.ScheduledQueryNotificationConfigurationArgs
        {
            SnsConfiguration = new Aws.TimestreamQuery.Inputs.ScheduledQueryNotificationConfigurationSnsConfigurationArgs
            {
                TopicArn = exampleAwsSnsTopic.Arn,
            },
        },
        ScheduleConfiguration = new Aws.TimestreamQuery.Inputs.ScheduledQueryScheduleConfigurationArgs
        {
            ScheduleExpression = "rate(1 hour)",
        },
        TargetConfiguration = new Aws.TimestreamQuery.Inputs.ScheduledQueryTargetConfigurationArgs
        {
            TimestreamConfiguration = new Aws.TimestreamQuery.Inputs.ScheduledQueryTargetConfigurationTimestreamConfigurationArgs
            {
                DatabaseName = results.DatabaseName,
                TableName = resultsAwsTimestreamwriteTable.TableName,
                TimeColumn = "binned_timestamp",
                DimensionMappings = new[]
                {
                    new Aws.TimestreamQuery.Inputs.ScheduledQueryTargetConfigurationTimestreamConfigurationDimensionMappingArgs
                    {
                        DimensionValueType = "VARCHAR",
                        Name = "az",
                    },
                    new Aws.TimestreamQuery.Inputs.ScheduledQueryTargetConfigurationTimestreamConfigurationDimensionMappingArgs
                    {
                        DimensionValueType = "VARCHAR",
                        Name = "region",
                    },
                    new Aws.TimestreamQuery.Inputs.ScheduledQueryTargetConfigurationTimestreamConfigurationDimensionMappingArgs
                    {
                        DimensionValueType = "VARCHAR",
                        Name = "hostname",
                    },
                },
                MultiMeasureMappings = new Aws.TimestreamQuery.Inputs.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsArgs
                {
                    TargetMultiMeasureName = "multi-metrics",
                    MultiMeasureAttributeMappings = new[]
                    {
                        new Aws.TimestreamQuery.Inputs.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs
                        {
                            MeasureValueType = "DOUBLE",
                            SourceColumn = "avg_cpu_utilization",
                        },
                        new Aws.TimestreamQuery.Inputs.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs
                        {
                            MeasureValueType = "DOUBLE",
                            SourceColumn = "p90_cpu_utilization",
                        },
                        new Aws.TimestreamQuery.Inputs.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs
                        {
                            MeasureValueType = "DOUBLE",
                            SourceColumn = "p95_cpu_utilization",
                        },
                        new Aws.TimestreamQuery.Inputs.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs
                        {
                            MeasureValueType = "DOUBLE",
                            SourceColumn = "p99_cpu_utilization",
                        },
                    },
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.timestreamquery.ScheduledQuery;
import com.pulumi.aws.timestreamquery.ScheduledQueryArgs;
import com.pulumi.aws.timestreamquery.inputs.ScheduledQueryErrorReportConfigurationArgs;
import com.pulumi.aws.timestreamquery.inputs.ScheduledQueryErrorReportConfigurationS3ConfigurationArgs;
import com.pulumi.aws.timestreamquery.inputs.ScheduledQueryNotificationConfigurationArgs;
import com.pulumi.aws.timestreamquery.inputs.ScheduledQueryNotificationConfigurationSnsConfigurationArgs;
import com.pulumi.aws.timestreamquery.inputs.ScheduledQueryScheduleConfigurationArgs;
import com.pulumi.aws.timestreamquery.inputs.ScheduledQueryTargetConfigurationArgs;
import com.pulumi.aws.timestreamquery.inputs.ScheduledQueryTargetConfigurationTimestreamConfigurationArgs;
import com.pulumi.aws.timestreamquery.inputs.ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new ScheduledQuery("example", ScheduledQueryArgs.builder()
            .executionRoleArn(exampleAwsIamRole.arn())
            .name(exampleAwsTimestreamwriteTable.tableName())
            .queryString("""
SELECT region, az, hostname, BIN(time, 15s) AS binned_timestamp,
\tROUND(AVG(cpu_utilization), 2) AS avg_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.9), 2) AS p90_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.95), 2) AS p95_cpu_utilization,
\tROUND(APPROX_PERCENTILE(cpu_utilization, 0.99), 2) AS p99_cpu_utilization
FROM exampledatabase.exampletable
WHERE measure_name = 'metrics' AND time > ago(2h)
GROUP BY region, hostname, az, BIN(time, 15s)
ORDER BY binned_timestamp ASC
LIMIT 5
            """)
            .errorReportConfiguration(ScheduledQueryErrorReportConfigurationArgs.builder()
                .s3Configuration(ScheduledQueryErrorReportConfigurationS3ConfigurationArgs.builder()
                    .bucketName(exampleAwsS3Bucket.bucket())
                    .build())
                .build())
            .notificationConfiguration(ScheduledQueryNotificationConfigurationArgs.builder()
                .snsConfiguration(ScheduledQueryNotificationConfigurationSnsConfigurationArgs.builder()
                    .topicArn(exampleAwsSnsTopic.arn())
                    .build())
                .build())
            .scheduleConfiguration(ScheduledQueryScheduleConfigurationArgs.builder()
                .scheduleExpression("rate(1 hour)")
                .build())
            .targetConfiguration(ScheduledQueryTargetConfigurationArgs.builder()
                .timestreamConfiguration(ScheduledQueryTargetConfigurationTimestreamConfigurationArgs.builder()
                    .databaseName(results.databaseName())
                    .tableName(resultsAwsTimestreamwriteTable.tableName())
                    .timeColumn("binned_timestamp")
                    .dimensionMappings(                    
                        ScheduledQueryTargetConfigurationTimestreamConfigurationDimensionMappingArgs.builder()
                            .dimensionValueType("VARCHAR")
                            .name("az")
                            .build(),
                        ScheduledQueryTargetConfigurationTimestreamConfigurationDimensionMappingArgs.builder()
                            .dimensionValueType("VARCHAR")
                            .name("region")
                            .build(),
                        ScheduledQueryTargetConfigurationTimestreamConfigurationDimensionMappingArgs.builder()
                            .dimensionValueType("VARCHAR")
                            .name("hostname")
                            .build())
                    .multiMeasureMappings(ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsArgs.builder()
                        .targetMultiMeasureName("multi-metrics")
                        .multiMeasureAttributeMappings(                        
                            ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs.builder()
                                .measureValueType("DOUBLE")
                                .sourceColumn("avg_cpu_utilization")
                                .build(),
                            ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs.builder()
                                .measureValueType("DOUBLE")
                                .sourceColumn("p90_cpu_utilization")
                                .build(),
                            ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs.builder()
                                .measureValueType("DOUBLE")
                                .sourceColumn("p95_cpu_utilization")
                                .build(),
                            ScheduledQueryTargetConfigurationTimestreamConfigurationMultiMeasureMappingsMultiMeasureAttributeMappingArgs.builder()
                                .measureValueType("DOUBLE")
                                .sourceColumn("p99_cpu_utilization")
                                .build())
                        .build())
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:timestreamquery:ScheduledQuery
    properties:
      executionRoleArn: ${exampleAwsIamRole.arn}
      name: ${exampleAwsTimestreamwriteTable.tableName}
      queryString: |
        SELECT region, az, hostname, BIN(time, 15s) AS binned_timestamp,
        \tROUND(AVG(cpu_utilization), 2) AS avg_cpu_utilization,
        \tROUND(APPROX_PERCENTILE(cpu_utilization, 0.9), 2) AS p90_cpu_utilization,
        \tROUND(APPROX_PERCENTILE(cpu_utilization, 0.95), 2) AS p95_cpu_utilization,
        \tROUND(APPROX_PERCENTILE(cpu_utilization, 0.99), 2) AS p99_cpu_utilization
        FROM exampledatabase.exampletable
        WHERE measure_name = 'metrics' AND time > ago(2h)
        GROUP BY region, hostname, az, BIN(time, 15s)
        ORDER BY binned_timestamp ASC
        LIMIT 5        
      errorReportConfiguration:
        s3Configuration:
          bucketName: ${exampleAwsS3Bucket.bucket}
      notificationConfiguration:
        snsConfiguration:
          topicArn: ${exampleAwsSnsTopic.arn}
      scheduleConfiguration:
        scheduleExpression: rate(1 hour)
      targetConfiguration:
        timestreamConfiguration:
          databaseName: ${results.databaseName}
          tableName: ${resultsAwsTimestreamwriteTable.tableName}
          timeColumn: binned_timestamp
          dimensionMappings:
            - dimensionValueType: VARCHAR
              name: az
            - dimensionValueType: VARCHAR
              name: region
            - dimensionValueType: VARCHAR
              name: hostname
          multiMeasureMappings:
            targetMultiMeasureName: multi-metrics
            multiMeasureAttributeMappings:
              - measureValueType: DOUBLE
                sourceColumn: avg_cpu_utilization
              - measureValueType: DOUBLE
                sourceColumn: p90_cpu_utilization
              - measureValueType: DOUBLE
                sourceColumn: p95_cpu_utilization
              - measureValueType: DOUBLE
                sourceColumn: p99_cpu_utilization

The queryString defines a SQL query that bins time-series data into 15-second intervals and calculates CPU utilization statistics (average, p90, p95, p99). The scheduleConfiguration sets the query to run every hour using a rate expression. The targetConfiguration maps query results to the results table: timeColumn specifies which query column contains timestamps, dimensionMappings defines metadata columns (region, az, hostname), and multiMeasureMappings groups numeric metrics under a single multi-measure name. The errorReportConfiguration sends failures to S3, while notificationConfiguration publishes state changes to SNS.

Beyond these examples

These snippets focus on specific scheduled query features: infrastructure provisioning, SQL-based aggregation with time binning, and multi-measure result mapping. They’re intentionally minimal rather than full time-series analytics pipelines.

The examples assume pre-existing infrastructure such as the AWS provider with configured region and time-series data ingested into source tables via the WriteRecords API. They focus on configuring the scheduled query rather than data ingestion or downstream consumption.

To keep things focused, common scheduled query patterns are omitted, including:

  • KMS encryption for query resources and error reports (kmsKeyId)
  • Resource tagging for organization and cost tracking
  • Query parameters beyond @scheduled_runtime
  • Mixed measure mappings (combining single and multi-measure outputs)

These omissions are intentional: the goal is to illustrate how scheduled query features are wired, not provide drop-in analytics modules. See the Timestream Scheduled Query resource reference for all available configuration options.

Let's create AWS Timestream Scheduled Queries

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Setup & Prerequisites
What do I need before creating a scheduled query?
You need a source database and table with ingested data, a results database and table, an error report S3 bucket, an SNS topic, and an IAM role with appropriate permissions.
How do I ingest data into the source table?
Data ingestion is done using the Amazon Timestream Write WriteRecords API, not through Pulumi.
Query Configuration
What's the @scheduled_runtime parameter?
A reserved parameter initialized to the scheduled execution timestamp. For a query scheduled at 2021-12-01 00:00:00, @scheduled_runtime equals that timestamp.
How do I set the query schedule?
Configure scheduleConfiguration with a scheduleExpression like "rate(1 hour)".
Where do query results get written?
Results are written to the location specified in targetConfiguration, which includes the target database, table, time column, dimension mappings, and measure mappings.
IAM & Permissions
What permissions does my execution role need?
The role needs kms:Decrypt, sns:Publish, timestream:describeEndpoints, timestream:Select, timestream:SelectValues, timestream:WriteRecords, and s3:PutObject.
Results & Error Handling
How are query errors reported?
Errors are reported to the S3 bucket specified in errorReportConfiguration.
What notifications does Timestream send?
Timestream sends notifications to the configured SNS topic when a scheduled query is created, its state is updated, or when it’s deleted.
Encryption & State
What does kmsKeyId encrypt?
It encrypts the scheduled query resource at rest. If errorReportConfiguration uses SSE_KMS encryption, the same kmsKeyId also encrypts error reports.
What are the possible states for a scheduled query?
Either ENABLED or DISABLED.

Using a different cloud?

Explore analytics guides for other cloud providers: