Sunday, August 18, 2013

Simple and lightweight pool implementation

Object pools are containers which contain a specified amount of objects. When an object is taken from the pool, it is not available in the pool until it is put back. Objects in the pool have a lifecycle: creation, validation, destroying, etc. A pool helps to manage available resources in a better way. There are many using examples. Especially in application servers there are data source pools, thread pools, etc. Pools should be used in cases such as
  • High-frequency using of the same objects
  • Objects are very big and consume much memory
  • Objects need much time for initialization
  • Objects use massive IO operations (Streams, Sockets, DB, etc.)
  • Objects are not thread-safe
When I looked for a pool implementation for one of my Java projects, I found that many people reference the Apache Commons Pool. Apache Commons Pool provides an object-pooling API. There are interfaces ObjectPool, ObjectPoolFactory, PoolableObjectFactory and many implementations. A pool provides methods addObject, borrowObject, invalidateObject, returnObject to add, take, remove and return back objects. PoolableObjectFactory defines the behavior of objects within a pool and provides various callbacks for pool's operations.

After looking into the implementation details I found that Apache Commons Pool is not a lightweight implementation which is an overhead for my purposes. Furthermore, it uses the old Java's keyword synchronized for a lot of methods which is not recommended for using. Java 5 introduced Executor framework for Java concurrency (multi-threading). The Executor framework is preferable here. I decided to implement a simple and lightweight pool which I would like to present here. It is only one Java class. I think it is enough if you don't need callbacks and other advanced stuff. I created a project easy-pool on GitHub.

The pool implementation is based on ConcurrentLinkedQueue from the java.util.concurrent package. ConcurrentLinkedQueue is a thread-safe queue based on linked nodes. This queue orders elements by FIFO principle (first-in-first-out). My implementation for a generic pool looks as follows
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public abstract class ObjectPool<T>
{
    private ConcurrentLinkedQueue<T> pool;

    private ScheduledExecutorService executorService;

    /**
     * Creates the pool.
     *
     * @param minIdle minimum number of objects residing in the pool
     */
    public ObjectPool(final int minIdle) {
        // initialize pool
        initialize(minIdle);
    }

    /**
     * Creates the pool.
     *
     * @param minIdle            minimum number of objects residing in the pool
     * @param maxIdle            maximum number of objects residing in the pool
     * @param validationInterval time in seconds for periodical checking of minIdle / maxIdle conditions in a separate thread.
     *                           When the number of objects is less than minIdle, missing instances will be created.
     *                           When the number of objects is greater than maxIdle, too many instances will be removed.
     */
    public ObjectPool(final int minIdle, final int maxIdle, final long validationInterval) {
        // initialize pool
        initialize(minIdle);

        // check pool conditions in a separate thread
        executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleWithFixedDelay(new Runnable()
        {
            @Override
            public void run() {
                int size = pool.size();
                if (size < minIdle) {
                    int sizeToBeAdded = minIdle - size;
                    for (int i = 0; i < sizeToBeAdded; i++) {
                        pool.add(createObject());
                    }
                } else if (size > maxIdle) {
                    int sizeToBeRemoved = size - maxIdle;
                    for (int i = 0; i < sizeToBeRemoved; i++) {
                        pool.poll();
                    }
                }
            }
        }, validationInterval, validationInterval, TimeUnit.SECONDS);
    }

    /**
     * Gets the next free object from the pool. If the pool doesn't contain any objects,
     * a new object will be created and given to the caller of this method back.
     *
     * @return T borrowed object
     */
    public T borrowObject() {
        T object;
        if ((object = pool.poll()) == null) {
            object = createObject();
        }

        return object;
    }

    /**
     * Returns object back to the pool.
     *
     * @param object object to be returned
     */
    public void returnObject(T object) {
        if (object == null) {
            return;
        }

        this.pool.offer(object);
    }

    /**
     * Shutdown this pool.
     */
    public void shutdown() {
        if (executorService != null) {
            executorService.shutdown();
        }
    }

    /**
     * Creates a new object.
     *
     * @return T new object
     */
    protected abstract T createObject();

    private void initialize(final int minIdle) {
        pool = new ConcurrentLinkedQueue<T>();

        for (int i = 0; i < minIdle; i++) {
            pool.add(createObject());
        }
    }
}
The abstract class ObjectPool provides two main methods: borrowObject to get the next free object from the pool and returnObject to return the borrowed object back to the pool. If the pool doesn't contain any objects, a new object will be created and given back to the caller of the method borrowObject. The object creation happens in the method createObject. Any class which extends the abstract class ObjectPool only needs to implement this method and the pool is ready to use. As you can see I also utilizes ScheduledExecutorService from the java.util.concurrent package. What it is good for? You can specifies minimum and maximum number of objects residing in the pool. ScheduledExecutorService starts a special task in a separate thread and observes periodical in a specified time (parameter validationInterval) the minimum and maximum number of objects in the pool. When the number of objects is less than the minimum, missing instances will be created. When the number of objects is greater than the maximum, too many instances will be removed. This is sometimes useful for the balance of memory consuming objects in the pool and more.

Let's implement test classes to show using of a concrete pool. First, we need a class representing objects in the pool which simulates a time-consuming process. This class, called ExportingProcess, needs some time to be instantiated.
public class ExportingProcess {

    private String location;

    private long processNo = 0;

    public ExportingProcess(String location, long processNo) {
        this.location = location;
        this.processNo = processNo;

        // doing some time expensive calls / tasks
        // ...

        // for-loop is just for simulation
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
        }

        System.out.println("Object with process no. " + processNo + " was created");
    }

    public String getLocation() {
        return location;
    }

    public long getProcessNo() {
        return processNo;
    }
}
The second class implements the Runnable interface and simulates some task doing by a thread. In the run method, we borrow an instance of ExportingProcess and return it later back to the pool.
public class ExportingTask implements Runnable {

    private ObjectPool<ExportingProcess> pool;

    private int threadNo;

    public ExportingTask(ObjectPool<ExportingProcess> pool, int threadNo) {
        this.pool = pool;
        this.threadNo = threadNo;
    }

    public void run() {
        // get an object from the pool
        ExportingProcess exportingProcess = pool.borrowObject();

        System.out.println("Thread " + threadNo + 
                ": Object with process no. " + exportingProcess.getProcessNo() + " was borrowed");

        // do something
        // ...

        // for-loop is just for simulation
        for (int i = 0; i < 100000; i++) {
        }

        // return ExportingProcess instance back to the pool
        pool.returnObject(exportingProcess);

        System.out.println("Thread " + threadNo + 
                ": Object with process no. " + exportingProcess.getProcessNo() + " was returned");
    }
}
Now, in the JUnit class TestObjectPool, we create a pool of objects of type ExportingProcess. This occurs by means of new ObjectPool<ExportingProcess>(4, 10, 5). Parameters are described in the comments below.
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TestObjectPool
{
    private ObjectPool<ExportingProcess> pool;

    private AtomicLong processNo = new AtomicLong(0);

    @Before
    public void setUp() {
        // Create a pool of objects of type ExportingProcess. Parameters:
        // 1) Minimum number of special ExportingProcess instances residing in the pool = 4
        // 2) Maximum number of special ExportingProcess instances residing in the pool = 10
        // 3) Time in seconds for periodical checking of minIdle / maxIdle conditions in a separate thread = 5.
        //    When the number of ExportingProcess instances is less than minIdle, missing instances will be created.
        //    When the number of ExportingProcess instances is greater than maxIdle, too many instances will be removed.
        //    If the validation interval is negative, no periodical checking of minIdle / maxIdle conditions
        //    in a separate thread take place. These boundaries are ignored then.
        pool = new ObjectPool<ExportingProcess>(4, 10, 5)
        {
            protected ExportingProcess createObject() {
                // create a test object which takes some time for creation
                return new ExportingProcess("/home/temp/", processNo.incrementAndGet());
            }
        };
    }

    @After
    public void tearDown() {
        pool.shutdown();
    }

    @Test
    public void testObjectPool() {
        ExecutorService executor = Executors.newFixedThreadPool(8);

        // execute 8 tasks in separate threads
        executor.execute(new ExportingTask(pool, 1));
        executor.execute(new ExportingTask(pool, 2));
        executor.execute(new ExportingTask(pool, 3));
        executor.execute(new ExportingTask(pool, 4));
        executor.execute(new ExportingTask(pool, 5));
        executor.execute(new ExportingTask(pool, 6));
        executor.execute(new ExportingTask(pool, 7));
        executor.execute(new ExportingTask(pool, 8));

        executor.shutdown();
        try {
            executor.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
A test output looks like
Object with process no. 1 was created
Object with process no. 2 was created
Object with process no. 3 was created
Object with process no. 4 was created
Thread 2: Object with process no. 2 was borrowed
Thread 1: Object with process no. 1 was borrowed
Thread 2: Object with process no. 2 was returned
Thread 3: Object with process no. 3 was borrowed
Thread 4: Object with process no. 4 was borrowed
Thread 1: Object with process no. 1 was returned
Thread 4: Object with process no. 4 was returned
Thread 8: Object with process no. 4 was borrowed
Thread 5: Object with process no. 1 was borrowed
Thread 7: Object with process no. 3 was borrowed
Thread 3: Object with process no. 3 was returned
Thread 6: Object with process no. 2 was borrowed
Thread 7: Object with process no. 3 was returned
Thread 5: Object with process no. 1 was returned
Thread 8: Object with process no. 4 was returned
Thread 6: Object with process no. 2 was returned
As can be seen, the first thread accessing the pool creates the minimum objects residing in the pool. Running this test class multiple times, we can discover that sometimes 4 objects get borrowed each after other and a new 5. object will be created in the pool. All test classes are available in the GitHub.

5 comments:

  1. Good post
    In your shutdown() method , you seem to only shutdown the executorservice. Why don't you cleanup the ConcurrentLinkedQueue pool as well ?

    MK

    ReplyDelete
  2. Good point. Thanks!

    In the real application a cleanup of the ConcurrentLinkedQueue happens by garbage collector because shutdown is called on application shutdown hook. It doesn't matter when the queue is cleaned - immediately on shutdown or later by GC :-)

    ReplyDelete
  3. isn't object pooling in many cases something being discouraged for years to do? Of course it depends on the weight of the object being reused (connection objects most certainly should be reused), but i think it is fair to say, that by pooling your Objects you add complexity, a possible bottle neck and a possible resource-leak to your code while getting a questionable performance impact, because the VM is quite good with short-lived objects too (depending on you GC of choice)...

    ReplyDelete
  4. Hey Oleg,

    Great job. Where in Russia are you located, if I may ask? We are interested in people like you :)

    James,
    http://www.scnsoft.com/

    ReplyDelete
    Replies
    1. Hi James. Thanks. Can you tell me your e-mail please? I would like to send you a private e-mail.

      Delete