This tutorial builds a service that transforms a stream of items. The service uses a data connection to retrieve a connection to a relational database, and uses a table in the database to enrich a stream of numbers with a textual representation of the last digit.
Before you begin
To complete this tutorial, you need the following:
| Prerequisites | Useful resources | 
|---|---|
| Java 17 or newer | |
| Maven 3.9 | |
| Docker | 
Step 1. Create and populate database
This tutorial uses Docker to run the Postgres database.
Run the following command to start Postgres:
docker run --name postgres --rm -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgresStart psql client:
docker exec -it postgres psql -U postgresCreate a table my_table and populate it with data:
CREATE TABLE my_table(id INTEGER PRIMARY KEY, value VARCHAR(128));
INSERT INTO my_table VALUES (0, 'zero');
INSERT INTO my_table VALUES (1, 'one');
INSERT INTO my_table VALUES (2, 'two');
INSERT INTO my_table VALUES (3, 'three');
INSERT INTO my_table VALUES (4, 'four');
INSERT INTO my_table VALUES (5, 'five');
INSERT INTO my_table VALUES (6, 'six');
INSERT INTO my_table VALUES (7, 'seven');
INSERT INTO my_table VALUES (8, 'eight');
INSERT INTO my_table VALUES (9, 'nine');Step 2. Create new java project
Create a blank Java project named pipeline-service-data-connection-example` and copy the Maven file into it:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>maploader-data-connection-example</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>maploader-data-connection-example</name>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.release>17</maven.compiler.release>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
            <version>5.7.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.24.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j2-impl</artifactId>
            <version>2.24.1</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.7.4</version>
        </dependency>
    </dependencies>
</project>Step 3. Create pipeline and job
Create a class EnrichUsingDataConnection with main method that will be the main job class. The main method will do the following steps:
- 
configure data connection 
- 
create a service and mapping function using the data connection 
- 
create a pipeline that - 
takes a sequence of numbers 
- 
maps each number using the service 
- 
prints the result to the logger sink 
 
- 
- 
submit the job 
To configure the data connection, copy the following code snippet:
public class EnrichUsingDataConnection {
    public static void main(String[] args) {
        HazelcastInstance hz = HazelcastBootstrap.getInstance();
        DataConnectionConfig dcc = new DataConnectionConfig("my_data_connection");
        dcc.setType("JDBC");
        dcc.setProperty("jdbcUrl", "jdbc:postgresql://localhost:5432/postgres");
        dcc.setProperty("user", "postgres");
        dcc.setProperty("password", "postgres");
        hz.getConfig().addDataConnectionConfig(dcc);
        // ...
    }
}Use the following to create the service factory:
public class EnrichUsingDataConnection {
    public static void main(String[] args) {
        // ...
        ServiceFactory<DataConnectionService, JdbcDataConnection> sf =
                ServiceFactory.withCreateContextFn(Context::dataConnectionService)
                        .withCreateServiceFn(
                                (context, dcs) -> dcs.getAndRetainDataConnection("my_data_connection", JdbcDataConnection.class)
                        )
                        .withDestroyServiceFn(DataConnectionBase::release);
        // ...
    }
}Use the following to create the mapping function:
        BiFunctionEx<JdbcDataConnection, Long, Tuple2<Long, String>> mapFunction = (dc, key) -> {
            try (Connection connection = dc.getConnection()) {
                PreparedStatement statement = connection.prepareStatement(
                        "SELECT value FROM my_table WHERE id = ?");
                statement.setLong(1, key % 10);
                ResultSet resultSet = statement.executeQuery();
                String value = null;
                if (resultSet.next()) {
                    value = resultSet.getString("value");
                }
                return tuple2(key, value);
            } catch (SQLException e) {
                throw new RuntimeException("Failed to load value for key=" + key, e);
            }
        };Now, you can create the pipeline and submit it:
public class EnrichUsingDataConnection {
    public static void main(String[] args) {
        // ...
        Pipeline p = Pipeline.create();
        p.readFrom(TestSources.itemStream(1))
                .withoutTimestamps()
                .map(SimpleEvent::sequence)
                .mapUsingService(sf, mapFunction)
                .writeTo(Sinks.logger());
        hz.getJet().newJob(p).join();
    }
}Running the main method should produce log containing the following:
13:21:41.479 [ INFO] [c.h.j.i.c.WriteLoggerP] [127.0.0.1]:5701 [dev] [5.7.0-SNAPSHOT] [0c92-06c7-1a00-0001/loggerSink#0] (0, zero)
13:21:42.250 [ INFO] [c.h.j.i.c.WriteLoggerP] [127.0.0.1]:5701 [dev] [5.7.0-SNAPSHOT] [0c92-06c7-1a00-0001/loggerSink#0] (1, one)
13:21:43.253 [ INFO] [c.h.j.i.c.WriteLoggerP] [127.0.0.1]:5701 [dev] [5.7.0-SNAPSHOT] [0c92-06c7-1a00-0001/loggerSink#0] (2, two)
...Next steps
You can learn how to submit the job to a running cluster by reading the Submitting Jobs page.