Intro to Apache Flink

Arun Mathew
8 min readMay 10, 2023

--

My work has recently been involved with data engineering and as part of that, I had to spend some time learning real-time data analytics, specifically with Apache Flink. In this article, I want to share some of the basics that I learned as part of this exercise.

What is Apache Flink?

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

This is the definition from the official Apache Flink docs. Over the course of this article, we will try to understand the definition a bit better.

Unbounded and Bounded data streams

Most data generated in a real life scenario are generated as a stream of events. Some examples are the measurements recorded by a sensor or logs generated by a server. The data generated by them were artificially restricted to a batch due to the limitation of processing engines available to us. Now with the advancement in real-time processors, we can process the data as is with frameworks like Flink, which excel in processing both bounded and unbounded streams.

  • Unbounded streams: Streams have a defined start but no end. Data is continuously generated and has to be processed as we receive it.
  • Bounded streams: Streams have a defined start and end, the processor waits to receive all events in the particular batch before performing some computation.

Leverage in-memory performance

The high performance of Flink can be attributed to the fact that it stores state in memory and if the size of the state exceeds the available memory, then it is stored on disk (managed by RocksDB). The state is stored locally to the tasks that perform computation over it, hence providing very low latencies.

Flink also provides fault tolerance in case of failures by backing up the local state in durable/remote storage like HDFS or Amazon S3, by incrementally and asynchronously checkpointing local state.

Deploy anywhere, run at any scale

Flink can be run as stand-alone single node clusters (mostly for local development and debugging) or on top of popular cluster resource managers like YARN, Apache Mesos, or Kubernetes. Before starting a Flink application it requests the required resources from the cluster manager or in case of a failure to replace the failed container.

Since Flink is a distributed parallel processing engine, the applications are broken down into smaller tasks and can be executed on clusters in parallel. More resources can be added to the cluster and Flink will be able to leverage it. This is why Flink can be considered to run at any scale.

Source and Sink

Sources are where Flink reads data from and Sinks are where it writes data to. Flink has some basic data sources/sinks like files, sockets, stdout, etc. It also comes bundled with some third-party connectors to connect with systems like Kafka, RabbitMQ, Cassandra, and many more.

APIs and Libraries

Flink provides mainly three types of API for different use cases:

  • Datastream API — For stream processing
  • Dataset API(Legacy) — For batch processing
  • Table API — SQL-like paradigm for both stream and batch processing

It also provides libraries for Event processing and Graph processing but we are not interested in those at the moment.

Flink Components

Once the Flink app has been developed, it can be submitted for execution via a Dispatcher. It starts a JobManager and hands over the application to it

JobManager is the central control for job execution. Each application will have a single JobManager. The job manager requests the necessary resources from the ResourceManager as TaskManager slots and then divides the application to run on the available task slots

ResourceManager is responsible for TaskManager slots. When a JobManager requests resources, the ResourceManager instructs the TaskManagers with idle slots to offer them up. If there are not enough task slots available to meet the JobManagers requirements, ResourceManager talks to a resource provider(YARN, Mesos, or Kubernetes) to provide more resources

TaskManagers are where the data is processed in a Flink application. There are typically several TaskManagers running, each providing multiple slots. The task slots are registered with the ResourceManager and depending on the requirement of the application the task slots are offered to the JobManager.

Anomaly Detector with Flink

In this section, we will try to build demo application that detects anomalies from a stream of data. To set some context, let’s assume we have a server, that serves some response when a user calls its endpoint and the server logs the user ID to a Kafka topic whenever it gets a request.

Our rule to detect an anomaly is as follows

A user making more than 5 requests in 15 seconds is considered an anomaly

Once an anomaly is detected, the Flink application will log the user ID and the count of requests received into another Kafka topic and from here another service can pick these messages and decide on how to handle these users.

Now let us look at some code

DataStream.java

public class DataStreamJob {

public static void main(String[] args) throws Exception {
// Sets up the execution environment, which is the main entry point
// to building Flink applications.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<Request> source = KafkaSource.<Request>builder()
.setBootstrapServers("localhost:9092")
.setTopics("flink_input")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new RequestDeserializationSchema())
.build();

DataStream<Request> inputStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

KafkaSink<RequestWithCount> sink = KafkaSink.<RequestWithCount>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("flink_output")
.setValueSerializationSchema(new RequestSerializationSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

inputStream.
keyBy(Request::getUserID).
process(new AnomalyDetector()).
filter(new RequestFilter()).
sinkTo(sink);

env.execute("Flink Java API Skeleton");
}
}

This is the entry point for our Flink job. We are creating our execution environment, setting up our source and sink by configuring it to read from Kafka and setting up our serializer/deserializer as well.

inputStream.
keyBy(Request::getUserID).
process(new AnomalyDetector()).
filter(new RequestFilter()).
sinkTo(sink);

This section is where the core of our business logic lies. We will look at this section once we cover all the other parts of the Flink job.

Request.java

public class Request {
public String UserID;

public String getUserID() {
return UserID;
}
}

The Request class shows the structure of the input we receive from the input Kafka topic, UserID denoting the user who made the request to our server.

RequestWithCount.java

public class RequestWithCount {
public String UserID;
public Integer Count;
}

This class represents the structure of our output, which we will be publishing to the output Kafka topic — The user identifier and the number of times they made a request to the server.

RequestDeserializationSchema.java

public class RequestDeserializationSchema implements DeserializationSchema<Request> {
private static final ObjectMapper objectMapper = new ObjectMapper();


@Override
public Request deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, Request.class);
}

@Override
public boolean isEndOfStream(Request request) {
return false;
}

@Override
public TypeInformation<Request> getProducedType() {
return TypeInformation.of(Request.class);
}
}

RequestSerializationSchema.java

public class RequestSerializationSchema implements SerializationSchema<RequestWithCount> {
private static final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(RequestWithCount request) {
try {
return objectMapper.writeValueAsBytes(request);
} catch (Exception e) {
throw new IllegalArgumentException("Could not serialize record: " + request, e);
}
}
}

Serializer/Deserializer to convert the bytes we receive from input Kafka to the request object and convert our RequestWithCount to bytes for the output Kafka topic.

AnomalyDetector.java

public class AnomalyDetector extends KeyedProcessFunction<String, Request, RequestWithCount> {

private transient ValueState<Integer> countState;
private transient ValueState<Long> timerState;

@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>(
"count",
Types.INT);
countState = getRuntimeContext().getState(countDescriptor);

ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}

@Override
public void processElement(Request request, Context context, Collector<RequestWithCount> collector) throws Exception {
Integer currentCount = countState.value();
if (currentCount == null) {
long timer = context.timerService().currentProcessingTime() + (15 * 1000);
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
currentCount = 0;
}
RequestWithCount r = new RequestWithCount();

countState.update(currentCount+1);
r.Count = currentCount+1;
r.UserID = request.UserID;

collector.collect(r);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<RequestWithCount> out) {
System.out.println("TIMER AND COUNTER cleared");
timerState.clear();
countState.clear();
}
}

The AnomalyDetector is where most of our logic lies. The detector class is called in context of a user ID

inputStream.
keyBy(Request::getUserID).
process(new AnomalyDetector())

which essentially means the logs are grouped by the userID and they will be sharing the same context. The first time we receive a userID, the detector will initialize two variables, countDescriptor to keep track of the number of times the user sent a request to the server and a timerDescriptor to keep count for 15 seconds.

Every time the Flink job gets a request from Kafka it updates the count for that specific user and is sent out via the collector. After 15 seconds, the onTimer hook is called and both the timer and count are reset.

RequestFilter.java

public class RequestFilter implements FilterFunction<RequestWithCount> {

@Override
public boolean filter(RequestWithCount request) throws Exception {
System.out.println(request.Count > 5);
return request.Count > 5;
}
}

The next step after the detector is a simple filter that checks if the count is greater than 5. If the count is greater than 5, it is published into our output Kafka topic because based on our rule mentioned above, these users will be considered anomalous. The users with count less than 5 are dropped.

At this point, we have gone through all the code for the Flink application. The application can be run locally by following the steps mentioned in the readme of this Github repo.

Once the application is running, you can publish messages to the input Kafka topic to test it out. I have added a Golang script in the repo to publish some messages to the input Kafka topic and trigger anomaly detection.

Here we are consuming the output Kafka topic and are able to see users who hit our endpoint more than 5 times in 15 seconds. This information will help us handle the anomaly.

Conclusion

This is a brief summary of my understanding of streaming application with Flink. You can find the full code, steps to run and the utility script in this repo.

Sign up to discover human stories that deepen your understanding of the world.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Arun Mathew
Arun Mathew

Written by Arun Mathew

Software Engineer - Currently learning and working on Data Engineering systems, Feel free to reach out on twitter @arun_v_m

No responses yet

Write a response