In Part 1, we focussed on Redis and its data structures, and in Part 2 we focussed on Redis commands. In this section we will focus on Lettuce Java client for Redis, and use lettuce to connect to Redis and run commands.
- Lettuce is a scalable thread-safe Redis client for synchronous, asynchronous and reactive usage.
- Multiple threads may share one connection if they avoid blocking and transactional operations such as
BLPOP
andMULTI
/EXEC
. - Lettuce is built with netty.
- Supports advanced Redis features such as Sentinel, Cluster, Pipelining, Auto-Reconnect and Redis data models.
- Lettuce offers a natural interface for making asynchronous requests from the Redis database server and for creating streams.
- Also provides asynchronous support via the Java 8’s CompletionStage interface
- Has support for Reactive Streams.
Maven Dependency Information
<!-- https://mvnrepository.com/artifact/biz.paluch.redis/lettuce --> <dependency> <groupId>biz.paluch.redis</groupId> <artifactId>lettuce</artifactId> <version>4.4.4.Final</version> </dependency>
Gradle Dependency Information
// https://mvnrepository.com/artifact/biz.paluch.redis/lettuce compile group: 'biz.paluch.redis', name: 'lettuce', version: '4.4.4.Final'
Creating a Connection
A StatefulRedisConnection is a thread-safe connection to a Redis server that will maintain its connection to the server and reconnect if needed. Once we have a connection, we can use it to execute Redis commands either synchronously or asynchronously.
Redis connections are designed to be long-lived and thread-safe, and if the connection is lost it will reconnect until close() is called. Pending commands that have not timed out will be (re)sent after successful reconnection.
RedisClient uses substantial system resources, as it holds Netty resources for communicating with the Redis server. Applications that require multiple connections should use a single RedisClient.
//Create the RedisClient instance and provide a Redis URI pointing to localhost, Port 6379 RedisClient redisClient = RedisClient.create( new RedisURI("localhost", 6379, 30, TimeUnit.SECONDS)); //Open a Redis Standalone connection StatefulRedisConnection<String, String> connection = redisClient.connect(); ... ... ... //shutdown the client redisClient.shutdown();
Synchronous String operations
private static void executeSynchronousStringCommands(RedisClient redisClient) { //Open a Redis Standalone connection StatefulRedisConnection<String, String> connection = redisClient.connect(); //Obtain the command API for synchronous execution. RedisCommands<String, String> commands = connection.sync(); String key1 = "auth:type"; String key2 = "email:client:maxSize"; //delete existing keys commands.del(key1, key2); //Simple SET and GET commands commands.set(key1, "LDAP"); //Simple SET command with a timeout value specified in SetArgs. commands.set(key2 , "30", SetArgs.Builder.ex(3600)); //Simple GET Commands. String value1 = commands.get(key1); System.out.println(key1 + " = " + value1); String value2 = commands.get(key2); Long ttl2 = commands.ttl(key2); System.out.println(key2 + " = " + value2 + " ; TTL = " + ttl2); //Close the connection connection.close(); }
Object Serialization
For putting objects into Redis Cache, we need to implement a RedisCodec for the objects.
The codec needs to implement 4 methods
// Decode the key returned by redis. K decodeKey(ByteBuffer bytes); // Decode the value returned by redis. V decodeValue(ByteBuffer bytes); // Encode the key sent to redis. ByteBuffer encodeKey(K key); // Encode the value sent to redis. ByteBuffer encodeValue(V value);
Following is the sample implementation that we will use in our code below
class SerializedObjectCodec implements RedisCodec<String, Object> { private Charset charset = Charset.forName("UTF-8"); @Override public String decodeKey(ByteBuffer bytes) { return charset.decode(bytes).toString(); } @Override public Object decodeValue(ByteBuffer bytes) { try { byte[] array = new byte[bytes.remaining()]; bytes.get(array); ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(array)); return is.readObject(); } catch (Exception e) { return null; } } @Override public ByteBuffer encodeKey(String key) { return charset.encode(key); } @Override public ByteBuffer encodeValue(Object value) { try { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); ObjectOutputStream os = new ObjectOutputStream(bytes); os.writeObject(value); return ByteBuffer.wrap(bytes.toByteArray()); } catch (IOException e) { return null; } } }
List Operations
Sample code for list operations
private static void executeSynchronousListOperations(RedisClient redisClient) { //Open a new Connection StatefulRedisConnection<String, Object> connection = redisClient.connect(new SerializedObjectCodec()); //Returns synchronous command API for current connection. RedisCommands<String, Object> commands = connection.sync(); String key = "users:skills"; //delete existing keys commands.del(key); //Execute list command LPUSH commands.lpush(key, "Java", "Scala", "Spring", "Elastic Search", "Mongo DB", "Redis"); System.out.println("Total Items in List = " + commands.llen(key)); //Executes list command RPUSH commands.rpush(key, "Solr", "Jenkins", "Git"); System.out.println("List values = " + commands.lrange(key, 0, -1)); //Execute list command RPOP commands.rpop(key); //Iterates the list using LRANGE System.out.println("List values = " + commands.lrange(key, 0, -1)); //Close Connection connection.close(); }
Hash Operations
For Hash Operations, we will create two POJO Objects Person and Address, as given below.
class Person implements Serializable { private String name; private int age; private Address address; public Person(String name, int age, Address address) { this.name = name; this.age = age; this.address = address; } @Override public String toString() { return "Person{" + "name='" + name + '\'' + ", age=" + age + ", address=" + address + '}'; } } class Address implements Serializable { private String city; private String stateCode; private String country; public Address(String city, String stateCode, String country) { this.city = city; this.stateCode = stateCode; this.country = country; } @Override public String toString() { return "Address{" + "city='" + city + '\'' + ", stateCode='" + stateCode + '\'' + ", country='" + country + '\'' + '}'; } }
Now we will execute synchronous hash operations, and insert person objects into the hash.
private static void executeSynchronousObjectOperations(RedisClient redisClient) { //Open a new Connection StatefulRedisConnection<String, Object> connection = redisClient.connect(new SerializedObjectCodec()); //Returns synchronous command API for current connection. RedisCommands<String, Object> commands = connection.sync(); String key = "users:sync"; //delete existing keys commands.del(key); //Create person records Person person1 = new Person("John Doe", 35, new Address("San Francisco", "CA", "US")); Person person2 = new Person("Samantha", 21, new Address("Portland", "OR", "US")); //Execute hash command HSET commands.hset(key, "1000", person1); commands.hset(key, "1001", person2); //Execute hash command HEXISTS System.out.println(commands.hexists(key, "1000")); //Execute hash command HGETALL commands.hgetall(key) .entrySet() .stream() .forEach(entry -> System.out.println(entry.getKey() + " : " + (Person)entry.getValue())); //Close Connection connection.close(); }
Asynchronous Operations
We will retrieve a set of RedisAsyncCommands from the connection, in the same way we received the synchronous commands set.
Every command invocation on the asynchronous API creates a RedisFuture<T>
that can be canceled, awaited and subscribed (listener).
A CompleteableFuture<T>
or RedisFuture<T>
is a pointer to the result that is initially unknown since the computation of its value is yet incomplete.
A RedisFuture<T>
provides operations for synchronization and chaining.
private static void executeAsynchronousObjectOperations(RedisClient redisClient) { //Open a new Connection StatefulRedisConnection<String, Object> connection = redisClient.connect(new SerializedObjectCodec()); //Returns asynchronous commands API for given connection RedisAsyncCommands<String, Object> commands = connection.async(); String key = "users:async"; //Del the key from the cache (synchronously) first. connection.sync().del(key); //Create the person records Person person1 = new Person("John Doe", 35, new Address("San Francisco", "CA", "US")); Person person2 = new Person("Samantha", 21, new Address("Portland", "OR", "US")); //Create redis future commands for HSET RedisFuture<Boolean> command1 = commands.hset(key, "1002", person1); RedisFuture<Boolean> command2 = commands.hset(key, "1003", person2); //Wait for async commands (command1 and command2) to complete LettuceFutures.awaitAll(120, TimeUnit.SECONDS, command1, command2); //Execute Future Command HEXISTS commands.hexists(key, "1003").thenAccept(System.out::println); //Execute Future Command HGET, using awaitOrCancel Person value1 = (Person)LettuceFutures .awaitOrCancel(commands.hget(key, "1002"), 60, TimeUnit.SECONDS); System.out.println("Value1 : " + value1); //Execute Future Command HGETALL, using whenComplete //The results don't get printed always. //Sometimes we get the exception: java.util.concurrent.CancellationException commands.hgetall(key).whenComplete( (result, ex) -> { if(result != null) System.out.println("All Results : " + result); else System.out.println("All Results : " + ex.toString()); }); //Execute Future Command HGETALL, using thenAccept //Does not always prints the results commands.hgetall(key).thenAccept(map -> map.entrySet().stream().forEach(entry -> System.out.println("Results : " + entry.getKey() + " : " + (Person)entry.getValue()))); //Close Connection connection.close(); }