Mike Yawn
Mike Yawn is a Senior Solutions Architect at Hazelcast, which, by an amazing coincidence, provides products including an in-memory operational data store (Hazelcast IMDG) and an in-memory batch and stream processing engine (Hazelcast Jet) as suggested here for future-proofing your application architecture.

Monolithic apps are difficult to evolve and are almost certain to be bogged down with years of technical debt. This is why software architects are turning to microservices: Breaking apart a monolithic app into separate services with well-defined APIs is a key enabler for everything that follows. As a developer with a background in object-oriented programming, it seems to me that the key concepts that were drilled into us regarding encapsulation — the design criteria of creating object boundaries based on loose coupling and high cohesion — give us the skills needed to decompose a complex monolithic app into a well-designed set of microservices.

But how do you handle data in a microservices architecture?

As you’re working through the decomposition of the application into a set of services, one question that will come to mind is what to do with the data. Chances are, the application stores data in a relational database (RDBMS). In the move to microservices, it is generally considered good practice to let every service own its own data. The reason for this is clear — breaking apart an app into a set of services allows different components to evolve at different rates; some might be very stable, while others may change very frequently. But if all of them share a common underlying database, then you can’t change the database schema without affecting every service. Essentially, you’re still bound by the same rules and limitations that were in place when the app was a monolith.

There are different ways to handle this; one is of course to split up the data, divvying up the pieces between services. You get a database … you get a database … you get a database. If a service requires access to a data item it isn’t the owner of, then it makes a call to the service that is the owner for that piece of data. This is the approach that adheres most closely to microservice best practices, but software licensing costs (for proprietary databases) and the increased database administration workload of having many micro-installations may make this impractical.

A much better approach is to apply separation of concerns, and leave the System of Record (the RDBMS) as-is while adding an operational data store to meet the needs of the microservices. This provides the ability to provide a customized data view to each service, without necessarily changing the underlying database schema. What are the characteristics that are important in an operational data store? Given the trends we explored earlier — in particular, supporting use cases that are increasingly online and mobile, with the need for speed (low latency) being an ever-increasing demand — I would argue that an in-memory technology is required for services that demand high performance.

A Basic Caching Use Case for our Operational Data Store

There are a number of open source, in-memory platforms that can fill the role of an operational data store. All of them provide basic key-value store capability that can be used for basic caching uses, perfect for an operational data store. Some go well beyond this with capabilities such as distributed computation. Given the suggested criteria to make sure the technologies in our toolkit are multi-function and versatile, I’m going to make the case for a full-function in-memory compute platform such as Hazelcast IMDG.

Here are some of the capabilities we add to our technology stack when we incorporate the open-source version of Hazelcast IMDG into our platform:

  • High-speed, low-latency caching capabilities, offloading the RDBMS or other system-of-record while improving the speed of retrieval by at least an order of magnitude.
  • The ability to cache a subset or all of the data; support for caching patterns such as read-through, write-through, and write-behind depending on application needs.
  • The ability to manage cache size with Time-to-Live (TTL) and eviction policies.
  • The ability to retrieve data not just by the primary key, but to query the cache using either boolean logic or SQL-like predicate syntax.
  • The ability to store full objects in the cache; any object-relational mapping can be done infrequently, as the cache is loaded, rather than upon every use of the data, as would be required if the O/R mapping was being done during a fetch from a database.
Read More:   Update InfluxDB Moves to Cloud-Native Architecture

Just as our application services can be scaled up or down to meet workload demands, our operational data store is also elastically scalable — additional nodes can be added to the data grid, and the software will automatically re-balance the data partitions to take advantage of the increased capacity (when scaling up) or consolidate data onto fewer nodes (when scaling down). Backups of each data partition are automatically maintained so that in the event of an unplanned node outage, no data is lost.

Let’s take a look at how this basic caching architecture looks in practice:

Figure 1 shows a basic cache; data is loaded from the system-of-record into the operational data store, which can spread the data over some number of cache nodes. Microservices access the cache through a single unified API (that is, they don’t worry about how the cache data is distributed — they just call map.get(key) and get the corresponding object in return; or alternately, invoke a query and get back a collection of matching objects.

Caching historically has focused on minimizing cache size and keeping the most likely to be used objects in memory; as memory costs have dropped and latency requirements have become tighter, it’s not unusual to see cases where 100% of the data is kept in cache so that all accesses meet the required SLAs. In the case of partial caches, we can lazily load data in response to cache misses; in the case of a full cache, we’ll more likely fully load the cache at startup time. If the underlying database changes, in this simple architecture we will have stale data in the cache; setting a time-to-live will cause us to refresh the cache periodically to minimize this. In cases where stale data is unacceptable, a more advanced caching architecture as shown below can integrate the cache more tightly with the database.

If our application changes data in the cache, the data grid software will update the system of record, either synchronously (a write-through cache) or after-the-fact (a write-behind cache, which can efficiently batch updates to the database and improve transaction throughput).

An Advanced Caching Use Case for an Operational Data Store with Stream Processing

So far, we’ve described an application deployed as a set of microservices, and an operational data store as a place to hold our data — but how does the data originally come into the system? We assumed data was already held in a database, but obviously it had to get there somehow. We’ll turn our attention to that part of the system and see if we can improve it.

Increasingly, modern application architectures include some sort of streaming data source (Apache Kafka being perhaps the most ubiquitous). The data could be coming in from web pages, from mobile devices, from edge devices, or somewhere else, but it’s coming at us fast and needs to be handled as it arrives.

A stream processing engine — such as Hazelcast Jet (or competitive products such as Apache Spark, or Apache Flink) — can give you the capability to process this incoming data stream. The data can be filtered, transformed, analyzed, and stored. A useful combination is to have your stream processing engine serve as an extract-transform-load (ETL) engine for your operational data store or system of record. In some cases, this same combination can be hooked up to a Change Data Capture platform such as Debezium to help keep your operational store tightly synchronized with your System of Record by streaming database changes as events. Depending on the incoming data rate and how tight your tolerances for latency, you may want this solution to be in-memory as well. In some cases, such as the open source Hazelcast IMDG and Hazelcast Jet combination, there is a common platform that provides the ability to tightly integrate the stream processing and data store capabilities.

In the figure above, we still have an in-memory cache provided by IMDG, a system of record represented here as a relational database, and clients that access the data grid to get lower latency access to the data. We’ve added Hazelcast Jet, a stream processing engine that is built on the foundation of Hazelcast IMDG, to give us a number of additional capabilities including:

  • Jet’s Kafka Connector is used to connect Jet to incoming events (transactions) from Kafka. This allows us to move database update code from our legacy app into our modern cloud native architecture, where it can scale more easily (using the processing resources of all processors across all nodes of our Jet cluster, if needed).
  • Jet can write updates directly into IMDG cache. Jet may additionally update the system of record, or can allow the cache’s write-through or write-behind capabilities to be responsible for updating the database. Both are shown in the figure, as different services may take different approaches; but for any given transaction type we would expect to choose one method or the other.
  • If no other services or legacy apps are concurrently updating the database, then we know our cache will have the most up-to-date data. But in cases where the database receives updates from other sources, we can use a Change Data Capture (CDC) platform such as the open source Debezium CDC platform to capture those updates and forward them to Jet. Jet’s Debezium connector makes it a simple matter to connect to Debezium, and changes can then be made to the IMDG cache.
Read More:   Update AWS Re:Invent and KubeCon: The Race to Invisible Infrastructure

Compared to the basic caching architecture described above, we’ve added event stream processing, which can perform extract-transform-load functionality to the data grid, database, or both; it can also perform transaction processing as we’ll see in an example coming up shortly. We have also added a CDC capability, which is only needed if our database is being updated through other means than the updates originating from the event stream

We’re leaning on Jet here for a lot of our processing workload, and the reason for that is because of how efficiently Jet scales up to handle production workloads with optimal performance. Jet programs, or tasks, are written in the form of pipelines. A pipeline is simply a series of steps chained together; this might be a completely linear series of steps (which fits our mental image of a pipeline), but it also may be a complex graph of processing steps where the data flow can fork or merge to create a complex dataflow.

For illustration, below is a simple pipeline to load data from a JDBC data source and insert it into an IMDG Map. More complex pipelines could do filtering of data, transformation of the data (object-relational mapping), or enrichment of the data by joining items from the primary table we’re reading with additional data from other tables or other data sources.

The full source of this example is available from this GitHub repository

The integration with the Debezium CDC platform doesn’t require any more steps than the basic database code shown above; here’s an example that takes change events from a MySQL database and simply logs them to the console. Writing them into an IMDG Map instead would simply require replacing the final writeTo step of the pipeline to target a Map, as shown in the previous example. (The full source of this example is here.)

Transaction Processing Use Cases

A stream processing engine is such a powerful processing platform that once in place, you’ll no doubt find it filling multiple roles — data ingestion and change data capture, as shown above, as well as analytics and transaction processing.

These use cases were a great way to introduce stream processing since we already had a caching use case, and a stream processing engine gave us some useful ways to extend the functionality of our cache. But more useful, and generally more strategic applications for the business can be enabled when we use our stream processing engine to perform vital application processing that demands low-latency and high-throughput performance characteristics.

For example, Jet is used by some credit card processors to perform fraud detection. The incoming event stream is point-of-sale transactions, coming in from merchants all over the world. Because of the network latency involved in just getting the data to and from the central data center, typically the credit card processor has a very narrow time window — a single-digit number of milliseconds, in some cases — to render a decision as to whether to approve the transaction or not. During this time, the processor will need to perform a number of checks, preferably concurrently — is the account in good standing? Does the transaction exceed the available credit for the cardholder? Based on the card history, current location, merchant info, and many other factors, does the transaction appear likely to be fraudulent? All of these checks and calculations can only be performed in the required time window if all the data needed to make a determination are stored in-memory (a use case for IMDG), and the calculations can be distributed and parallelized to a great extent (a use case for Jet).

Read More:   Update Databricks Brings Data Pipeline Service to GA

Similar use cases are found throughout a number of industries — replace the credit card point-of-sale terminals with various edge devices (such as Internet of Things sensors), and the same architecture can support real-time tracking of a fleet of vehicles, or monitor the flow of material throughout the factory floor of a major manufacturer.

In one of the more exciting emerging markets for stream processing technology, It turns out that a stream processing engine is also a perfect tool for operationalizing machine learning. The models that are developed by data scientists may not be easily operationalized at large scale using the same toolset the data scientists use to build and train the models. Here again, the issue is primarily one of scale; training a model is very much a batch process and can be performed sequentially, and while the time to complete the training is important, it is not holding up business operations. Once the model becomes part of a production workflow, however, the ability to scale it up to handle many concurrent tasks becomes critical.

The same components we used for our Caching architecture can be reused in a transaction processing architecture that supports high-volume, low-latency processing of streaming input data (from an event source such as Kafka) integrated with using a Machine Learning model to perform analysis on the transactions.

In the figure above, we have an event stream coming in from a source like Kafka, or a message broker, and these events are handled by a Jet processing pipeline as we have shown in previous examples. The difference here is that our machine learning model is not internal to Jet; we need to invoke an external service. Since machine learning models aren’t typically written in Java, Jet provides specific pipeline stages that can be used to invoke an external Python task, or one written in any of the various languages that support the gRPC procedure calling mechanism. Results from the execution of the external service can then be passed on to downstream stages in the pipeline, to update an IMDG data grid as shown in the diagram, or these can be passed out of Jet via any of the various supported connectors, such as Kafka, JDBC, JMS, etc.

The code snippet below shows how this external service can be invoked. We read incoming items from Kafka, and then invoke a Machine Learning model that in this case is implemented in Python. In this example we set up 2 Python runtimes to help split the load; this value can be tuned depending on the incoming transaction rate and how much processing is required for each execution of the ML model.

The code above is a mash-up of a couple of different examples; the full example showing execution of Python helper routines is here, while an example showing ingestion from a Kafka data source is here.

Where does this leave us?

There are a number of technology trends that define the landscape we expect to operate in over the near to mid-term future; the good news is that a relatively small set of key technologies, employed in your software stacks, can help address all these requirements.

These key technologies are microservices (a design pattern), container management and orchestration such as Docker and Kubernetes, an operational data store (such as an in-memory data grid), and a stream processing engine. All of these are powerful individually, but the combination can provide a reusable software stack that can be applied to nearly any application domain, giving you performance at scale and flexibility to evolve to meet future trends as they emerge.

InApps is a wholly owned subsidiary of Insight Partners, an investor in the following companies mentioned in this article: Docker.

Feature image via Pixabay.