Amazon Kinesis Data Streams can

  • collect and process large streams of data records in real time.
  • create data-processing applications called Kinesis Data Streams applications.
  • Kinesis Data Streams application reads data from a data stream as data records.
  • use for rapid and continuous data intake and aggregation.
  • Input data includes data from logs, social media, market feeds and web clickstream.
  • the processing is typically lightweight.
  • Streams (collect & process large streams of data in real time),
  • Analytics(run standard SQL queries against streaming data),
  • Firehouse(load streaming data into AWS from many sources into S3 and Redshift)
  • Kinesis Streams vs SQS – Kinesis for data processed at the same time or within 24 hrs by different consumers, data can be reused with in 24 hrs
  • 24 hrs retention
  • SQS can write multiple queue using fanout, data cannot be reused, 14 days retention

High-level architecture of Kinesis Data Streams –

  • Producers continually push data to Kinesis Data Streams
  • consumers process the data in real time.
  • Consumers can be custom application running on Amazon EC2 or Amazon Kinesis Data Firehose delivery stream
  • Store their results using AWS DynamoDB, Redshift, or S3.

Kinesis Data Streams Terminology

  • Kinesis Data Stream – A Kinesis data stream is a set of shards. Each shard has a sequence of data records. Each data record has a sequence number that is assigned by Kinesis Data Streams.
  • Data Record – A data record is the unit of data stored in a Kinesis data stream. Data records are composed of a sequence number, a partition key, and a data blob, which is an immutable sequence of bytes. Kinesis Data Streams does not inspect, interpret, or change the data in the blob in any way. A data blob can be up to 1 MB.
  • Retention Period – The retention period is the length of time that data records are accessible after they are added to the stream. A stream’s retention period is set to a default of 24 hours after creation. You can increase the retention period up to 168 hours (7 days) using the IncreaseStreamRetentionPeriod operation, and decrease the retention period down to a minimum of 24 hours using the DecreaseStreamRetentionPeriod operation. Additional charges apply for streams with a retention period set to more than 24 hours.
  • Producer – Producers put records into Amazon Kinesis Data Streams. For example, a web server sending log data to a stream is a producer.
  • Consumer – Consumers get records from Amazon Kinesis Data Streams and process them. These consumers are known as Amazon Kinesis Data Streams Application.
  • Amazon Kinesis Data Streams Application – An Amazon Kinesis Data Streams application is a consumer of a stream that commonly runs on a fleet of EC2 instances. There are two types of consumers that you can develop: shared fan-out consumers and enhanced fan-out consumers.
  • Shard – A shard is a uniquely identified sequence of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity. Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second and up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys). The data capacity of stream is a function of the number of shards that you specify for the stream. The total capacity of the stream is the sum of the capacities of its shards.
  • Partition Key – A partition key is used to group data by shard within a stream. Kinesis Data Streams segregates the data records belonging to a stream into multiple shards. It uses the partition key that is associated with each data record to determine which shard a given data record belongs to. Partition keys are Unicode strings with a maximum length limit of 256 bytes. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards. When an application puts data into a stream, it must specify a partition key.
  • Sequence Number – Each data record has a sequence number that is unique per partition-key within its shard. Kinesis Data Streams assigns the sequence number after you write to the stream with client.putRecords or client.putRecord. Sequence numbers for the same partition key generally increase over time. The longer the time period between write requests, the larger the sequence numbers become.
  • Kinesis Client Library – The Kinesis Client Library is compiled into application to enable fault-tolerant consumption of data from the stream. The Kinesis Client Library ensures that for every shard there is a record processor running and processing that shard. The library also simplifies reading data from the stream. The Kinesis Client Library uses an Amazon DynamoDB table to store control data. It creates one table per application that is processing data.

AWS Kinesis Summary

Data Structures

  • The unit of data stored by Kinesis Data Streams is a data record.
  • A stream represents a group of data records.
  • The data records in a stream are distributed into shards.
  • A shard has a sequence of data records in a stream.

Stream Creation

  • When you create a stream, specify the number of shards for the stream.
  • Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second.
  • Shards support up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys).
  • The total capacity of a stream is the sum of the capacities of its shards.
  • Increase or decrease the number of shards in a stream as needed.
  • You are charged on a per-shard basis. Billing is per shard provisioned, can have as many shards as you want. Records are ordered per shard.

Kinesis Data Stream

  • Kinesis data stream, is composed of
    • a sequence number or unique ID of the record within its shard. Type: String
    • partition key -identifies which shard in the stream the data record is assigned to. Type: String 
    • data blob –  Data in blob is opaque and immutable so it is not inspected, interpreted, or changed in any way.
  • When the data blob, the payload before base64-encoding, is added to the partition key size, the total size must not exceed the maximum record size of 1 MB. Type: Base64-encoded binary data object.
  • Use server-side data encryption for sensitive data using Amazon Kinesis Data Firehose but, Kinesis stream as data source is required, though the data is stored in the Kinesis stream.
  • When you send data to Kinesis stream, data is encrypted using an AWS KMS key before storing it at rest.
  • When Kinesis Data Firehose delivery stream reads the data from Kinesis stream, the Kinesis Data Streams service first decrypts the data and then sends it to Kinesis Data Firehose. Kinesis Data Firehose buffers the data in memory based on the buffering hints that you specify and then delivers it to destinations without storing the unencrypted data at rest.
  • In Kinesis, to prevent skipped records, handle all exceptions within processRecords appropriately.
  • For each Amazon Kinesis Data Streams application, the KCL or Kinesis Client Library, uses a unique Amazon DynamoDB table to keep track of the application’s state. KCL uses the Amazon Kinesis Data Streams application name to create the name of the table so, each application name must be unique.
  • If Amazon Kinesis Data Streams application receives provisioned-throughput exceptions, then, increase the provisioned throughput for the DynamoDB table. The KCL creates the table with a provisioned throughput of 10 reads per second and 10 writes per second, but this might not be sufficient for application. For example, if Amazon Kinesis Data Streams application does frequent checkpointing or operates on a stream that is composed of many shards, you might need more throughput.
  • PutRecord returns the shard ID of where the data record was placed and the sequence number that was assigned to the data record.
  • Sequence numbers increase over time and are specific to a shard within a stream, not across all shards within a stream.
  • For guaranteeing strict increasing ordering, write serially to a shard and use the SequenceNumberForOrdering parameter.
  • For live streaming Kinesis gets ruled out if record size greater than 1 MB , in that case Kafka can support bigger records.
  • To use Lambda with Kinesis Streams, create Lambda functions to automatically read batches of records from Amazon Kinesis stream and process them if records are detected on the stream. AWS Lambda then polls the stream periodically (once per second) for new records. Only One lambda per shard be triggered.
  • In Kinesis stream, the PutRecordBatch() operation can take up to 500 records per call or 4 MB per call, whichever is smaller. Buffer size ranges from 1 MB to 128 MB.
  • In circumstances where data delivery to the destination is falling behind data ingestion into the delivery stream, Amazon Kinesis Firehose raises the buffer size automatically so that all data is delivered to the destination.
  • If data delivery to  Redshift fail from Kinesis Firehose , Amazon Kinesis Firehose retries data delivery every 5 minutes for up to a maximum period of 60 minutes. After 60 minutes, Amazon Kinesis Firehose skips the current batch of S3 objects that are ready for COPY and moves on to the next batch. The information about the skipped objects is delivered to S3 bucket as a manifest file in the errors folder, which you can use for manual backfill.
  • If data delivery to Amazon S3 bucket fails, Amazon Kinesis Firehose retries to deliver data every 5 seconds for up to a maximum period of 24 hours. If the issue continues beyond the 24-hour maximum retention period, it discards the data.
  • Aggregation refers to the storage of multiple records in a Streams record. Aggregation allows customers to increase the number of records sent per API call, which effectively increases producer throughput. Aggregation Storing multiple records within a single Kinesis Data Streams record while Collection using the API operation PutRecords to send multiple Kinesis Data Streams records to one or more shards in Kinesis data stream.You can first aggregate stream record and then send them to stream using collection putrecords() in multiple shard.
  • Spark Streaming uses the Kinesis Client Library (KCL) to consume data from a Kinesis stream. KCL handles complex tasks like load balancing, failure recovery, and check-pointing

Amazon Kinesis Data Streams limits

  • No upper limit on the number of shards in a stream or account may be thousands.
  • No upper limit on the number of streams you can have in an account.
  • A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes. So, stream with 5,000 shards, it can ingest up to 5 GiB per second or 5 million records per second. Add the number of shards in the stream using the AWS Management Console or the UpdateShardCount API for more ingest capacity.
  • Default shard limit is 500 shards for the following AWS Regions: US East (N. Virginia), US West (Oregon), and EU (Ireland). For all other Regions, the default shard limit is 200 shards.
  • The maximum size of the data payload of a record before base64-encoding is up to 1 MiB.
  • GetRecords can retrieve up to 10 MiB of data per call from a single shard, and up to 10,000 records per call. Each call to GetRecords is counted as one read transaction.
  • Each shard can support up to five read transactions per second. Each read transaction can provide up to 10,000 records with an upper limit of 10 MiB per transaction.
  • Each shard can support up to a maximum total data read rate of 2 MiB per second via GetRecords. If a call to GetRecords returns 10 MiB, subsequent calls made within the next 5 seconds throw an exception.

Creating a Stream in Amazon Kinesis

A stream can be created by

  • the Kinesis Data Streams console
  • the Kinesis Data Streams API
  • the AWS Command Line Interface (AWS CLI).

Steps to create a data stream using the console

  • Sign in to the AWS Management Console and open the Kinesis console at https://console.aws.amazon.com/kinesis.
  • In the navigation bar, expand the Region selector and choose a Region.
  • Choose Create data stream.
  • On the Create Kinesis stream page, enter a name for stream and the number of shards you need, and then click Create Kinesis stream.
  • On the Kinesis streams page, stream’s Status is Creating while the stream is being created. When the stream is ready to use, the Status changes to Active.
  • Choose the name of stream. The Stream Details page displays a summary of stream configuration, along with monitoring information.

Kinesis Data Streams Producers

A producer puts data records into Amazon Kinesis data streams. For example, a web server sending log data to a Kinesis data stream is a producer.

To put data into the stream following is needed

  • name of the stream
  • a partition key, it determines which shard in the stream the data record is added to and is as per application logic. Its count is usually greater than the number of shards for evenly distribution
  • the data blob to be added to the stream.

All the data in the shard is sent to the same worker that is processing the shard.

Using KPL

  • The KPL is an easy-to-use, highly configurable library, to write to Kinesis data stream.
  • KPL writes with an automatic and configurable retry mechanism
  • Acts as an intermediary between producer application code and the Kinesis Data Streams API actions.
  • Collects records and uses PutRecords to write multiple records to multiple shards per request
  • Aggregates user records to increase payload size and improve throughput
  • Integrates seamlessly with the Kinesis Client Library (KCL) to de-aggregate batched records on the consumer
  • Submits Amazon CloudWatch metrics on behalf to provide visibility into producer performance

Using the Amazon Kinesis Data Streams API –

  • Develop producers using the Amazon Kinesis Data Streams API with the AWS SDK for Java.
  • Two different operations to add data to a stream – PutRecords and PutRecord.
  • PutRecords sends multiple records to stream per HTTP request,
  • PutRecord operation sends records to stream one at a time (a separate HTTP request is required for each record).

Using Kinesis Agent –

  • Kinesis Agent is a stand-alone Java software application
  • It easily collects and sends data to Kinesis Data Streams.
  • The agent continuously monitors a set of files and sends new data to stream.
  • The agent handles file rotation, checkpointing, and retry upon failures.
  • It delivers all of data in a reliable, timely, and simple manner.
  • It also emits Amazon CloudWatch metrics to help you better monitor and troubleshoot the streaming process.
  • By default, records are parsed from each file based on the newline (‘\n’) character.
  • Operating system must be either Amazon Linux AMI with version 2015.09 or later, or Red Hat Enterprise Linux version 7 or later.

Consumers with Enhanced Fan-Out

  • Enhanced fan-out, enables consumers to receive records from a stream with throughput of up to 2 MiB of data per second per shard.
  • This throughput is dedicated, which means that consumers that use enhanced fan-out don’t have to contend with other consumers that are receiving data from the stream.
  • Kinesis Data Streams pushes data records from the stream to consumers that use enhanced fan-out.
  • Therefore, these consumers don’t need to poll for data.
  • You can register up to five consumers per stream to use enhanced fan-out.
  • If you need to register more than five consumers, you can request a limit increase

Splitting a Shard –

  • Need to specify how hash key values from the parent shard should be redistributed to the child shards.
  • When you add a data record to a stream, it is assigned to a shard based on a hash key value.
  • The hash key value is the MD5 hash of the partition key that you specify for the data record at the time that you add the data record to the stream.
  • Data records that have the same partition key also have the same hash key value.

Merging Two Shards –

  • It takes two specified shards and combines them into a single shard.
  • After the merge, the single child shard receives data for all hash key values covered by the two parent shards.
  • The shards must be adjacent, for merging.
  • Considered adjacent, if the union of the hash key ranges for the two shards forms a contiguous set with no gaps. For example, two shards, hash key range of 276…381 and 382…454, can be merged into a single shard that would have a hash key range of 276…454.

Kinesis Data Streams Consumers

  • A consumer, or Amazon Kinesis Data Streams application, reads and processes data records from Kinesis data streams.
  • To send stream records directly to services like Amazon S3, Redshift, Elasticsearch Service or Splunk, use a Kinesis Data Firehose delivery stream instead of creating a consumer application.
Menu