In Part 1, we focussed on Redis and its data structures, and in Part 2 we focussed on Redis commands. In this section we will focus on Lettuce Java client for Redis, and use lettuce to connect to Redis and run commands.

  • Lettuce is a scalable thread-safe Redis client for synchronous, asynchronous and reactive usage.
  • Multiple threads may share one connection if they avoid blocking and transactional operations such as BLPOP and MULTI/EXEC.
  • Lettuce is built with netty.
  • Supports advanced Redis features such as Sentinel, Cluster, Pipelining, Auto-Reconnect and Redis data models.
  • Lettuce offers a natural interface for making asynchronous requests from the Redis database server and for creating streams.
    • Also provides asynchronous support via the Java 8’s CompletionStage interface
    • Has support for Reactive Streams.

 

Maven Dependency Information

<!-- https://mvnrepository.com/artifact/biz.paluch.redis/lettuce -->
<dependency>
 <groupId>biz.paluch.redis</groupId>
 <artifactId>lettuce</artifactId>
 <version>4.4.4.Final</version>
</dependency>

 

Gradle Dependency Information

// https://mvnrepository.com/artifact/biz.paluch.redis/lettuce
compile group: 'biz.paluch.redis', name: 'lettuce', version: '4.4.4.Final'

 

Creating a Connection

StatefulRedisConnection is a thread-safe connection to a Redis server that will maintain its connection to the server and reconnect if needed. Once we have a connection, we can use it to execute Redis commands either synchronously or asynchronously.

Redis connections are designed to be long-lived and thread-safe, and if the connection is lost it will reconnect until close() is called. Pending commands that have not timed out will be (re)sent after successful reconnection.

RedisClient uses substantial system resources, as it holds Netty resources for communicating with the Redis server. Applications that require multiple connections should use a single RedisClient.

//Create the RedisClient instance and provide a Redis URI pointing to localhost, Port 6379
RedisClient redisClient = RedisClient.create(
        new RedisURI("localhost", 6379, 30, TimeUnit.SECONDS));


//Open a Redis Standalone connection
StatefulRedisConnection<String, String> connection = redisClient.connect();


...
...
...

 //shutdown the client
 redisClient.shutdown();

Synchronous String operations

private static void executeSynchronousStringCommands(RedisClient redisClient) {
    //Open a Redis Standalone connection
    StatefulRedisConnection<String, String> connection = redisClient.connect();

    //Obtain the command API for synchronous execution.
    RedisCommands<String, String> commands = connection.sync();

    String key1 = "auth:type";
    String key2 = "email:client:maxSize";

    //delete existing keys
    commands.del(key1, key2);

    //Simple SET and GET commands
    commands.set(key1, "LDAP");


    //Simple SET command with a timeout value specified in SetArgs.
    commands.set(key2 , "30", SetArgs.Builder.ex(3600));


    //Simple GET Commands.
    String value1 = commands.get(key1);
    System.out.println(key1 + " = " + value1);

    String value2 = commands.get(key2);
    Long ttl2 = commands.ttl(key2);
    System.out.println(key2 + " = " + value2 + " ; TTL = " + ttl2);

    //Close the connection
    connection.close();
}

Object Serialization

For putting objects into Redis Cache, we need to implement a RedisCodec for the objects.
The codec needs to implement 4 methods

 // Decode the key returned by redis.
 K decodeKey(ByteBuffer bytes);

 // Decode the value returned by redis.
 V decodeValue(ByteBuffer bytes);

 // Encode the key sent to redis.
 ByteBuffer encodeKey(K key);

 // Encode the value sent to redis.
 ByteBuffer encodeValue(V value);

Following is the sample implementation that we will use in our code below

class SerializedObjectCodec implements RedisCodec<String, Object> {

    private Charset charset = Charset.forName("UTF-8");

    @Override
    public String decodeKey(ByteBuffer bytes) {
        return charset.decode(bytes).toString();
    }

    @Override
    public Object decodeValue(ByteBuffer bytes) {
        try {
            byte[] array = new byte[bytes.remaining()];
            bytes.get(array);
            ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(array));
            return is.readObject();
        } catch (Exception e) {
            return null;
        }
    }

    @Override
    public ByteBuffer encodeKey(String key) {
        return charset.encode(key);
    }

    @Override
    public ByteBuffer encodeValue(Object value) {
        try {
            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
            ObjectOutputStream os = new ObjectOutputStream(bytes);
            os.writeObject(value);
            return ByteBuffer.wrap(bytes.toByteArray());
        } catch (IOException e) {
            return null;
        }
    }
}

 

 

List Operations

Sample code for list operations

private static void executeSynchronousListOperations(RedisClient redisClient) {
    //Open a new Connection
    StatefulRedisConnection<String, Object> connection = redisClient.connect(new SerializedObjectCodec());

    //Returns synchronous command API for current connection.
    RedisCommands<String, Object> commands = connection.sync();

    String key = "users:skills";

    //delete existing keys
    commands.del(key);

    //Execute list command LPUSH
    commands.lpush(key, "Java", "Scala", "Spring", "Elastic Search", "Mongo DB", "Redis");

    System.out.println("Total Items in List = " + commands.llen(key));

    //Executes list command RPUSH
    commands.rpush(key, "Solr", "Jenkins", "Git");

    System.out.println("List values = " + commands.lrange(key, 0, -1));

    //Execute list command RPOP
    commands.rpop(key);

    //Iterates the list using LRANGE
    System.out.println("List values = " + commands.lrange(key, 0, -1));

    //Close Connection
    connection.close();
}

Hash Operations

For Hash Operations, we will create two POJO Objects Person and Address, as given below.

class Person implements Serializable {
    private String name;
    private int age;
    private Address address;


    public Person(String name, int age, Address address) {
        this.name = name;
        this.age = age;
        this.address = address;
    }

    @Override
    public String toString() {
        return "Person{" +
                "name='" + name + '\'' +
                ", age=" + age +
                ", address=" + address +
                '}';
    }
}

class Address implements Serializable {
    private String city;
    private String stateCode;
    private String country;

    public Address(String city, String stateCode, String country) {
        this.city = city;
        this.stateCode = stateCode;
        this.country = country;
    }

    @Override
    public String toString() {
        return "Address{" +
                "city='" + city + '\'' +
                ", stateCode='" + stateCode + '\'' +
                ", country='" + country + '\'' +
                '}';
    }
}

 

Now we will execute synchronous hash operations, and insert person objects into the hash.

private static void executeSynchronousObjectOperations(RedisClient redisClient) {
    //Open a new Connection
    StatefulRedisConnection<String, Object> connection = redisClient.connect(new SerializedObjectCodec());

    //Returns synchronous command API for current connection.
    RedisCommands<String, Object> commands = connection.sync();

    String key = "users:sync";

    //delete existing keys
    commands.del(key);

    //Create person records
    Person person1 = new Person("John Doe", 35, new Address("San Francisco", "CA", "US"));
    Person person2 = new Person("Samantha", 21, new Address("Portland", "OR", "US"));


    //Execute hash command HSET
    commands.hset(key, "1000", person1);
    commands.hset(key, "1001", person2);


    //Execute hash command HEXISTS
    System.out.println(commands.hexists(key, "1000"));

    //Execute hash command HGETALL
    commands.hgetall(key)
            .entrySet()
            .stream()
            .forEach(entry ->
                System.out.println(entry.getKey() + " : " + (Person)entry.getValue()));

    //Close Connection
    connection.close();
}

Asynchronous Operations

We will retrieve a set of RedisAsyncCommands from the connection, in the same way we received the synchronous commands set.

Every command invocation on the asynchronous API creates a RedisFuture<T> that can be canceled, awaited and subscribed (listener).

CompleteableFuture<T> or RedisFuture<T> is a pointer to the result that is initially unknown since the computation of its value is yet incomplete.

RedisFuture<T> provides operations for synchronization and chaining.

 

private static void executeAsynchronousObjectOperations(RedisClient redisClient) {
    //Open a new Connection
    StatefulRedisConnection<String, Object> connection = redisClient.connect(new SerializedObjectCodec());

    //Returns asynchronous commands API for given connection
    RedisAsyncCommands<String, Object> commands = connection.async();

    String key = "users:async";

    //Del the key from the cache (synchronously) first.
    connection.sync().del(key);


    //Create the person records
    Person person1 = new Person("John Doe", 35, new Address("San Francisco", "CA", "US"));
    Person person2 = new Person("Samantha", 21, new Address("Portland", "OR", "US"));


    //Create redis future commands for HSET
    RedisFuture<Boolean> command1 = commands.hset(key, "1002", person1);
    RedisFuture<Boolean> command2 = commands.hset(key, "1003", person2);

    //Wait for async commands (command1 and command2) to complete
    LettuceFutures.awaitAll(120, TimeUnit.SECONDS, command1, command2);

    //Execute Future Command HEXISTS
    commands.hexists(key, "1003").thenAccept(System.out::println);


    //Execute Future Command HGET, using awaitOrCancel
    Person value1 =
            (Person)LettuceFutures
                    .awaitOrCancel(commands.hget(key, "1002"), 60, TimeUnit.SECONDS);

    System.out.println("Value1 : " + value1);

    //Execute Future Command HGETALL, using whenComplete
    //The results don't get printed always.
    //Sometimes we get the exception: java.util.concurrent.CancellationException
    commands.hgetall(key).whenComplete( (result, ex) -> {
        if(result != null)
            System.out.println("All Results : " + result);
        else
            System.out.println("All Results : " + ex.toString());
    });


    //Execute Future Command HGETALL, using thenAccept
    //Does not always prints the results
    commands.hgetall(key).thenAccept(map ->
        map.entrySet().stream().forEach(entry ->
                System.out.println("Results : " + entry.getKey() + " : " + (Person)entry.getValue())));


    //Close Connection
    connection.close();
}

 

 

References

https://redislabs.com/lp/redis-java/

https://github.com/lettuce-io/lettuce-core