Leverage Flink Windowing to process streams based on event time
The concepts explained in this article require working knowledge of Flink. If you have no prior experience, I’d recommend giving a read to one of my previous articles that explore the fundamental concepts of Flink.
What’s Windowing and why you might need it:
Imagine splitting an unbounded stream of events and processing them as individual buckets (or sets). You can perform computations on these individual sets of events to derive a meaningful event stream flowing out of Flink. There are a variety of use cases that you can achieve by making use of the operators/functions that Flink provides as part of its framework. The process of figuring out how to implement and chain those operators together rests with you. In this article, we’ll explore the basics of windowing operator and how you can process out-of-order events.
Types of windows:
Event streams may be keyed/non-keyed, and hence this factor will decide whether the windowing computation will occur in parallel across multiple tasks or in a single task.
Windows can be of 4 types:
- Tumbling windows — Non-overlapping processing of events with fixed time duration (aka window size).
- Sliding windows — Similar to tumbling windows with fixed window size with the ability to decide when the next window starts. This allows overlapping windows to share data between them.
- Session windows — In contrast to tumbling and sliding windows, session windows don’t have a fixed window size but instead rely on the period of inactivity (defined by you) since the last event receipt to end that window, and then start a new window for subsequent events.
- Global windows — A global windows assigner assigns all elements with the same key to the same single global window. This windowing scheme is only useful if you also specify a custom trigger. Otherwise, no computation will be performed, as the global window does not have a natural end at which we could process the aggregated elements.
Setting up the Flink Job:
For the purposes of an example, we look at processing events based on the event’s time.
Before using the window operator/assigner, the source stream needs a WatermarkStrategy. There are two watermark generators:
- forMonotonousTimestamps() — To be used when it is known that the arriving events will always be in order.
- forBoundedOutOfOrderness() — If the events are known to be out-of-order, a certain degree of lateness will be tolerated. This value can be set (in minutes/seconds/milliseconds) which will allow events after comparing with the current timestamp. If an event falls outside the timeframe that you have set for lateness, it is discarded from the stream (unless the allowedLateness value is set beyond this value).
There are different kinds of watermarking strategies that you can employ depending on your specific needs. To explore more, refer to this documentation.
Using the withTimestampAssigner() function, you can then extract the timestamp value that is embedded in the event’s body. Note that both timestamps/watermarks are specified as milliseconds since the Java epoch of 1970–01–01T00:00:00Z.
Here is a breakdown of the above snippet,
- You first watermark every event that comes in from the source.
- The timestamp needs to be extracted from the incoming event and converted to milliseconds.
- Convert the stream to a keyed stream and place the window operator. Observe that the TumblingEventTimeWindows function is used since we deal with the event’s time and not processing time.
- Notice we’ve also used the optional function allowedLateness that allows us to process events that may come beyond the limit set with forBoundedOutOfOrderness(). This essentially allows us to process those events that arrived late but in a separate window in contrast to the window it could have been part of had it arrived in the correct sequence.
- Before we move to the next step, we can optionally place a trigger() and evictor() right after the window assigner function if your use case requires such functionality. Strategic use of these functions can help solve unique scenarios, so it’s worth exploring them before employing some other custom logic.
- Finally, the last piece is the process function where you can place your custom windowing logic for those sets of events. This is quite similar to the generic process function except the events here are Iterable.
Deploying Flink apps as Serverless:
After your application logic is ready, running the Flink job is straight forward with AWS Kinesis Data analytics. You drop the built .jar file into an S3 bucket, create a new Flink application by pointing to that S3 bucket and that’s it.
The event source can either be a Kafka topic or a Kinesis Data stream
You set the computing power of the Flink instance by setting the parallelism value. This value decides the number of KPUs that AWS will provision for your application. You can also enable auto-scaling to ensure that the application can handle an increased throughput if in case your initial parallelism isn’t sufficient.
Conclusion:
Windowing is at the heart of the Flink framework. In addition to what we saw in the window assigners, it is also possible to build your own custom windowing logic. Also, like any other keyed data stream, you can make use of state if such functionality is needed to perform computations. Once you gain enough understanding of the source event stream that you’re working with, windowing can solve some complex problems that you might have. With that being said, experiment with windows and let me know if you have questions!