SequenceUtil and ConcurrentHashMap(non-blocking)

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

SequenceUtil and ConcurrentHashMap(non-blocking)

Adam Heath-2
So, one year ago I was tasked with fixing some deadlocks in
Webslinger(the website container Brainfood has developed).  While
investigating solutions to the problems I was seeing, I stumbled across
the book, "Java Concurrency in Practice".

OH
MY
GOD
!

It's not normal for me to get all ecstatic about a programming book, but
this one has to be at least an 11(on a scale of 1-10).  Anyone doing
*any* multi-threaded programming in java absolutely *must* read this book.

Anyways, after reading said book, I completely rewrote the internals of
webslinger, following the guidelines I had read about.  It's now been a
year, and what I figured out seems to be running great; no normal code
paths thru webslinger take out *any* locks now.

I've decided to start applying this knowledge against OfBiz.  My first
attempt is attached.

While SequenceUtil is not a highly-contended resource, it is simple
enough, and used often enough, so that others can understand what is
going on, and it's possible to see if the change actually breaks anything.

Summarizing, here is how it functions(at a high-level).

1: Any fields that have to be modified together have to be moved to
separate class.
2: Said class is made final, and *all* fields are also made final;
equals() and hashCode() are implemented.
3: The parent class is modified to use an AtomicReference.
4: Any time the parent class needs to change one of the original fields,
it makes a copy of the current reference with the new values, then does
an atomic compareAndSet.  This can cause multiple allocations when
contended, however, java 1.6 is smart enough to do some allocations on
the stack, so in general this is not a problem.

These changes invariably increase the size of the code; however, they do
reduce overhead in the long run, so I consider them beneficial.

Attached you will find SequenceUtil.java; I've modified it to be
non-blocking.  I've done "ant run-install", with no problems, but have
*not* yet run any test cases.

I'm sending it here(to this list) first, before checking it in, as this
is a radical departure from simple programming practices, and would like
others to discuss if I should do this in other places.

/*******************************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *******************************************************************************/
package org.ofbiz.entity.util;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

import javax.transaction.Transaction;

import org.ofbiz.base.util.Debug;
import org.ofbiz.entity.GenericEntityException;
import org.ofbiz.entity.jdbc.ConnectionFactory;
import org.ofbiz.entity.model.ModelEntity;
import org.ofbiz.entity.model.ModelField;
import org.ofbiz.entity.transaction.GenericTransactionException;
import org.ofbiz.entity.transaction.TransactionUtil;

/**
 * Sequence Utility to get unique sequences from named sequence banks
 * Uses a collision detection approach to safely get unique sequenced ids in banks from the database
 */
public class SequenceUtil {

    public static final String module = SequenceUtil.class.getName();

    ConcurrentHashMap<String, SequenceBank> sequences = new ConcurrentHashMap<String, SequenceBank>();
    String helperName;
    ModelEntity seqEntity;
    String tableName;
    String nameColName;
    String idColName;

    private SequenceUtil() {}

    public SequenceUtil(String helperName, ModelEntity seqEntity, String nameFieldName, String idFieldName) {
        this.helperName = helperName;
        this.seqEntity = seqEntity;
        if (seqEntity == null) {
            throw new IllegalArgumentException("The sequence model entity was null but is required.");
        }
        this.tableName = seqEntity.getTableName(helperName);

        ModelField nameField = seqEntity.getField(nameFieldName);

        if (nameField == null) {
            throw new IllegalArgumentException("Could not find the field definition for the sequence name field " + nameFieldName);
        }
        this.nameColName = nameField.getColName();

        ModelField idField = seqEntity.getField(idFieldName);

        if (idField == null) {
            throw new IllegalArgumentException("Could not find the field definition for the sequence id field " + idFieldName);
        }
        this.idColName = idField.getColName();
    }

    public Long getNextSeqId(String seqName, long staggerMax, ModelEntity seqModelEntity) {
        SequenceBank bank = this.getBank(seqName, seqModelEntity);
        return bank.getNextSeqId(staggerMax);
    }
   
    public void forceBankRefresh(String seqName, long staggerMax) {
        // don't use the get method because we don't want to create if it fails
        SequenceBank bank = sequences.get(seqName);
        if (bank == null) {
            return;
        }
       
        bank.refresh(staggerMax);
    }
   
    private SequenceBank getBank(String seqName, ModelEntity seqModelEntity) {
        SequenceBank bank = sequences.get(seqName);

        if (bank == null) {
            bank = new SequenceBank(seqName, seqModelEntity, this);
            SequenceBank oldBank = sequences.putIfAbsent(seqName, bank);
            if (oldBank != null) bank = oldBank;
        }
       
        return bank;
    }

    class SequenceBank {
        public static final long defaultBankSize = 10;
        public static final long maxBankSize = 5000;
        public static final long startSeqId = 10000;
        public static final int minWaitMillis = 5;
        public static final int maxWaitMillis = 50;
        public static final int maxTries = 5;

        final AtomicReference<SequenceValue> ref;
        String seqName;
        SequenceUtil parentUtil;
        ModelEntity seqModelEntity;

        public SequenceBank(String seqName, ModelEntity seqModelEntity, SequenceUtil parentUtil) {
            this.seqName = seqName;
            this.parentUtil = parentUtil;
            this.seqModelEntity = seqModelEntity;
            ref = new AtomicReference<SequenceValue>(new SequenceValue(0, 0).fillBank(1));
        }

        public Long getNextSeqId(long staggerMax) {
            long stagger = 1;
            if (staggerMax > 1) {
                stagger = Math.round(Math.random() * staggerMax);
                if (stagger == 0) stagger = 1;
            }
            SequenceValue value;
            do {
                value = ref.get();
                if ((value.curSeqId + stagger) > value.maxSeqId) {
                    value = value.fillBank(stagger);
                    if ((value.curSeqId + stagger) > value.maxSeqId) {
                        Debug.logError("[SequenceUtil.SequenceBank.getNextSeqId] Fill bank failed, returning null", module);
                        return null;
                    }
                }
            } while (!ref.compareAndSet(value, new SequenceValue(value.curSeqId + stagger, value.maxSeqId)));
            return Long.valueOf(value.curSeqId);
        }
       
        public void refresh(long staggerMax) {
            SequenceValue value;
            do {
                value = ref.get();
            } while (!ref.compareAndSet(value, value.refresh(staggerMax)));
        }



    protected final class SequenceValue {
        protected final long curSeqId;
        protected final long maxSeqId;

        protected SequenceValue(long curSeqId, long maxSeqId) {
            this.curSeqId = curSeqId;
            this.maxSeqId = maxSeqId;
        }

        public int hashCode() {
            long r = curSeqId ^ maxSeqId;
            return (int) ((r >> 32) ^ (r & 0xffff));
        }

        public boolean equals(Object o) {
            if (!(o instanceof SequenceValue)) return false;
            SequenceValue other = (SequenceValue) o;
            return curSeqId == other.curSeqId && maxSeqId == other.maxSeqId;
        }

        protected SequenceValue refresh(long staggerMax) {
            return fillBank(staggerMax, this.curSeqId);
        }

        protected SequenceValue fillBank(long stagger) {
            return fillBank(stagger, this.curSeqId);
        }

        private SequenceValue fillBank(long stagger, long curSeqId) {
            //Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] Starting fillBank Thread Name is: " + Thread.currentThread().getName() + ":" + Thread.currentThread().toString(), module);

            // no need to get a new bank, SeqIds available
            if ((curSeqId + stagger) <= maxSeqId) return this;
           
            long bankSize = defaultBankSize;
            if (seqModelEntity != null && seqModelEntity.getSequenceBankSize() != null) {
                bankSize = seqModelEntity.getSequenceBankSize().longValue();
            }
            if (stagger > 1) {
                // NOTE: could use staggerMax for this, but if that is done it would be easier to guess a valid next id without a brute force attack
                bankSize = stagger * defaultBankSize;
            }
             
            if (bankSize > maxBankSize) bankSize = maxBankSize;
           
            long val1 = 0;
            long val2 = 0;

            // NOTE: the fancy ethernet type stuff is for the case where transactions not available, or something funny happens with really sensitive timing (between first select and update, for example)
            int numTries = 0;

            while (val1 + bankSize != val2) {
                if (Debug.verboseOn()) Debug.logVerbose("[SequenceUtil.SequenceBank.fillBank] Trying to get a bank of sequenced ids for " +
                        seqName + "; start of loop val1=" + val1 + ", val2=" + val2 + ", bankSize=" + bankSize, module);
               
                // not sure if this synchronized block is totally necessary, the method is synchronized but it does do a wait/sleep
                //outside of this block, and this is the really sensitive block, so making sure it is isolated; there is some overhead
                //to this, but very bad things can happen if we try to do too many of these at once for a single sequencer
                    Transaction suspendedTransaction = null;
                    try {
                        //if we can suspend the transaction, we'll try to do this in a local manual transaction
                        suspendedTransaction = TransactionUtil.suspend();
                       
                        boolean beganTransaction = false;
                        try {
                            beganTransaction = TransactionUtil.begin();
       
                            Connection connection = null;
                            Statement stmt = null;
                            ResultSet rs = null;
       
                            try {
                                connection = ConnectionFactory.getConnection(parentUtil.helperName);
                            } catch (SQLException sqle) {
                                Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]: Unable to esablish a connection with the database... Error was:" + sqle.toString(), module);
                                throw sqle;
                            } catch (GenericEntityException e) {
                                Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]: Unable to esablish a connection with the database... Error was: " + e.toString(), module);
                                throw e;
                            }
                           
                            if (connection == null) {
                                throw new GenericEntityException("[SequenceUtil.SequenceBank.fillBank]: Unable to esablish a connection with the database, connection was null...");
                            }
       
                            String sql = null;
       
                            try {
                                // we shouldn't need this, and some TX managers complain about it, so not including it: connection.setAutoCommit(false);
       
                                stmt = connection.createStatement();
                                   
                                sql = "SELECT " + parentUtil.idColName + " FROM " + parentUtil.tableName + " WHERE " + parentUtil.nameColName + "='" + seqName + "'";
                                rs = stmt.executeQuery(sql);
                                boolean gotVal1 = false;
                                if (rs.next()) {
                                    val1 = rs.getLong(parentUtil.idColName);
                                    gotVal1 = true;
                                }
                                rs.close();
                               
                                if (!gotVal1) {
                                    Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] first select failed: will try to add new row, result set was empty for sequence [" + seqName + "] \nUsed SQL: " + sql + " \n Thread Name is: " + Thread.currentThread().getName() + ":" + Thread.currentThread().toString(), module);
                                    sql = "INSERT INTO " + parentUtil.tableName + " (" + parentUtil.nameColName + ", " + parentUtil.idColName + ") VALUES ('" + seqName + "', " + startSeqId + ")";
                                    if (stmt.executeUpdate(sql) <= 0) {
                                        throw new GenericEntityException("No rows changed when trying insert new sequence row with this SQL: " + sql);
                                    }
                                    continue;
                                }
       
                                sql = "UPDATE " + parentUtil.tableName + " SET " + parentUtil.idColName + "=" + parentUtil.idColName + "+" + bankSize + " WHERE " + parentUtil.nameColName + "='" + seqName + "'";
                                if (stmt.executeUpdate(sql) <= 0) {
                                    throw new GenericEntityException("[SequenceUtil.SequenceBank.fillBank] update failed, no rows changes for seqName: " + seqName);
                                }
       
                                sql = "SELECT " + parentUtil.idColName + " FROM " + parentUtil.tableName + " WHERE " + parentUtil.nameColName + "='" + seqName + "'";
                                rs = stmt.executeQuery(sql);
                                boolean gotVal2 = false;
                                if (rs.next()) {
                                    val2 = rs.getLong(parentUtil.idColName);
                                    gotVal2 = true;
                                }
       
                                rs.close();
                               
                                if (!gotVal2) {
                                    throw new GenericEntityException("[SequenceUtil.SequenceBank.fillBank] second select failed: aborting, result set was empty for sequence: " + seqName);
                                }
                               
                                // got val1 and val2 at this point, if we don't have the right difference between them, force a rollback (with
                                //setRollbackOnly and NOT with an exception because we don't want to break from the loop, just err out and
                                //continue), then flow out to allow the wait and loop thing to happen
                                if (val1 + bankSize != val2) {
                                    TransactionUtil.setRollbackOnly("Forcing transaction rollback in sequence increment because we didn't get a clean update, ie a conflict was found, so not saving the results", null);
                                }
                            } catch (SQLException sqle) {
                                Debug.logWarning(sqle, "[SequenceUtil.SequenceBank.fillBank] SQL Exception while executing the following:\n" + sql + "\nError was:" + sqle.getMessage(), module);
                                throw sqle;
                            } finally {
                                try {
                                    if (stmt != null) stmt.close();
                                } catch (SQLException sqle) {
                                    Debug.logWarning(sqle, "Error closing statement in sequence util", module);
                                }
                                try {
                                    if (connection != null) connection.close();
                                } catch (SQLException sqle) {
                                    Debug.logWarning(sqle, "Error closing connection in sequence util", module);
                                }
                            }
                        } catch (Exception e) {
                            String errMsg = "General error in getting a sequenced ID";
                            Debug.logError(e, errMsg, module);
                            try {
                                TransactionUtil.rollback(beganTransaction, errMsg, e);
                            } catch (GenericTransactionException gte2) {
                                Debug.logError(gte2, "Unable to rollback transaction", module);
                            }
                           
                            // error, break out of the loop to not try to continue forever
                            break;
                        } finally {
                            try {
                                TransactionUtil.commit(beganTransaction);
                            } catch (GenericTransactionException gte) {
                                Debug.logError(gte, "Unable to commit sequence increment transaction, continuing anyway though", module);
                            }
                        }
                    } catch (GenericTransactionException e) {
                        Debug.logError(e, "System Error suspending transaction in sequence util", module);
                    } finally {
                        if (suspendedTransaction != null) {
                            try {
                                TransactionUtil.resume(suspendedTransaction);
                            } catch (GenericTransactionException e) {
                                Debug.logError(e, "Error resuming suspended transaction in sequence util", module);
                            }
                        }
                    }
           
                if (val1 + bankSize != val2) {
                    if (numTries >= maxTries) {
                        String errMsg = "[SequenceUtil.SequenceBank.fillBank] maxTries (" + maxTries + ") reached for seqName [" + seqName + "], giving up.";
                        Debug.logError(errMsg, module);
                        return this;
                    }
                   
                    // collision happened, wait a bounded random amount of time then continue
                    int waitTime = (Double.valueOf(Math.random() * (maxWaitMillis - minWaitMillis))).intValue() + minWaitMillis;

                    Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] Collision found for seqName [" + seqName + "], val1=" + val1 + ", val2=" + val2 + ", val1+bankSize=" + (val1 + bankSize) + ", bankSize=" + bankSize + ", waitTime=" + waitTime, module);

                    try {
                        // using the Thread.sleep to more reliably lock this thread: this.wait(waitTime);
                        java.lang.Thread.sleep(waitTime);
                    } catch (Exception e) {
                        Debug.logWarning(e, "Error waiting in sequence util", module);
                        return this;
                    }
                }

                numTries++;
            }

            if (Debug.infoOn()) Debug.logInfo("Got bank of sequenced IDs for [" + seqName + "]; curSeqId=" + val1 + ", maxSeqId=" + val2 + ", bankSize=" + bankSize, module);
            return new SequenceValue(val1, val2);
            //Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] Ending fillBank Thread Name is: " + Thread.currentThread().getName() + ":" + Thread.currentThread().toString(), module);
        }

    }
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: SequenceUtil and ConcurrentHashMap(non-blocking)

Adrian Crum
Adam,

It would be helpful if this was in a Jira issue where we could comment,
and submit a patch, so we can tell what changed.

-Adrian

Adam Heath wrote:

> So, one year ago I was tasked with fixing some deadlocks in
> Webslinger(the website container Brainfood has developed).  While
> investigating solutions to the problems I was seeing, I stumbled across
> the book, "Java Concurrency in Practice".
>
> OH
> MY
> GOD
> !
>
> It's not normal for me to get all ecstatic about a programming book, but
> this one has to be at least an 11(on a scale of 1-10).  Anyone doing
> *any* multi-threaded programming in java absolutely *must* read this book.
>
> Anyways, after reading said book, I completely rewrote the internals of
> webslinger, following the guidelines I had read about.  It's now been a
> year, and what I figured out seems to be running great; no normal code
> paths thru webslinger take out *any* locks now.
>
> I've decided to start applying this knowledge against OfBiz.  My first
> attempt is attached.
>
> While SequenceUtil is not a highly-contended resource, it is simple
> enough, and used often enough, so that others can understand what is
> going on, and it's possible to see if the change actually breaks anything.
>
> Summarizing, here is how it functions(at a high-level).
>
> 1: Any fields that have to be modified together have to be moved to
> separate class.
> 2: Said class is made final, and *all* fields are also made final;
> equals() and hashCode() are implemented.
> 3: The parent class is modified to use an AtomicReference.
> 4: Any time the parent class needs to change one of the original fields,
> it makes a copy of the current reference with the new values, then does
> an atomic compareAndSet.  This can cause multiple allocations when
> contended, however, java 1.6 is smart enough to do some allocations on
> the stack, so in general this is not a problem.
>
> These changes invariably increase the size of the code; however, they do
> reduce overhead in the long run, so I consider them beneficial.
>
> Attached you will find SequenceUtil.java; I've modified it to be
> non-blocking.  I've done "ant run-install", with no problems, but have
> *not* yet run any test cases.
>
> I'm sending it here(to this list) first, before checking it in, as this
> is a radical departure from simple programming practices, and would like
> others to discuss if I should do this in other places.
>
Reply | Threaded
Open this post in threaded view
|

Re: SequenceUtil and ConcurrentHashMap(non-blocking)

Tim Ruppert
+1 for that.  Adam, this is really great that you've taken this  
initiative to hone these patterns.  Much appreciated.

Cheers,
Tim
--
Tim Ruppert
HotWax Media
http://www.hotwaxmedia.com

o:801.649.6594
f:801.649.6595


On Jun 30, 2008, at 10:28 AM, Adrian Crum wrote:

> Adam,
>
> It would be helpful if this was in a Jira issue where we could  
> comment, and submit a patch, so we can tell what changed.
>
> -Adrian
>
> Adam Heath wrote:
>> So, one year ago I was tasked with fixing some deadlocks in  
>> Webslinger(the website container Brainfood has developed).  While  
>> investigating solutions to the problems I was seeing, I stumbled  
>> across the book, "Java Concurrency in Practice".
>> OH
>> MY
>> GOD
>> !
>> It's not normal for me to get all ecstatic about a programming  
>> book, but this one has to be at least an 11(on a scale of 1-10).  
>> Anyone doing *any* multi-threaded programming in java absolutely  
>> *must* read this book.
>> Anyways, after reading said book, I completely rewrote the  
>> internals of webslinger, following the guidelines I had read  
>> about.  It's now been a year, and what I figured out seems to be  
>> running great; no normal code paths thru webslinger take out *any*  
>> locks now.
>> I've decided to start applying this knowledge against OfBiz.  My  
>> first attempt is attached.
>> While SequenceUtil is not a highly-contended resource, it is simple  
>> enough, and used often enough, so that others can understand what  
>> is going on, and it's possible to see if the change actually breaks  
>> anything.
>> Summarizing, here is how it functions(at a high-level).
>> 1: Any fields that have to be modified together have to be moved to  
>> separate class.
>> 2: Said class is made final, and *all* fields are also made final;  
>> equals() and hashCode() are implemented.
>> 3: The parent class is modified to use an AtomicReference.
>> 4: Any time the parent class needs to change one of the original  
>> fields, it makes a copy of the current reference with the new  
>> values, then does an atomic compareAndSet.  This can cause multiple  
>> allocations when contended, however, java 1.6 is smart enough to do  
>> some allocations on the stack, so in general this is not a problem.
>> These changes invariably increase the size of the code; however,  
>> they do reduce overhead in the long run, so I consider them  
>> beneficial.
>> Attached you will find SequenceUtil.java; I've modified it to be  
>> non-blocking.  I've done "ant run-install", with no problems, but  
>> have *not* yet run any test cases.
>> I'm sending it here(to this list) first, before checking it in, as  
>> this is a radical departure from simple programming practices, and  
>> would like others to discuss if I should do this in other places.


smime.p7s (3K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: SequenceUtil and ConcurrentHashMap(non-blocking)

Jacopo Cappellato-3
In reply to this post by Adam Heath-2
Hi Adam,

On Jun 30, 2008, at 6:21 PM, Adam Heath wrote:
>
> Attached you will find SequenceUtil.java; I've modified it to be non-
> blocking.  I've done "ant run-install", with no problems, but have  
> *not* yet run any test cases.
>
> I'm sending it here(to this list) first, before checking it in, as  
> this is a radical departure from simple programming practices, and  
> would like others to discuss if I should do this in other places.

thanks for your work and yes, please, add this code to Jira to make it  
easier to browse and comment. Also, before checking it in, please run  
some test cases.

Cheers,

Jacopo

Reply | Threaded
Open this post in threaded view
|

Re: SequenceUtil and ConcurrentHashMap(non-blocking)

Adam Heath-2
Jacopo Cappellato wrote:

> Hi Adam,
>
> On Jun 30, 2008, at 6:21 PM, Adam Heath wrote:
>>
>> Attached you will find SequenceUtil.java; I've modified it to be
>> non-blocking.  I've done "ant run-install", with no problems, but have
>> *not* yet run any test cases.
>>
>> I'm sending it here(to this list) first, before checking it in, as
>> this is a radical departure from simple programming practices, and
>> would like others to discuss if I should do this in other places.
>
> thanks for your work and yes, please, add this code to Jira to make it
> easier to browse and comment. Also, before checking it in, please run
> some test cases.

Well, I could, but not without doing other stuff first.

As part of the official build process for webslinger(which means
uploading a debian package to our local repo), I *always* run it's test
cases, and they *must* pass.  And, as part of that, I always run them
under cobertura, a code-coverage tool.  I am currently at 41% coverage,
with some parts over 90%.

I'd like to add such support to ofbiz, but can't use cobertura(it's
gpl).  I'm open for suggestions.
Reply | Threaded
Open this post in threaded view
|

Re: SequenceUtil and ConcurrentHashMap(non-blocking)

David E Jones
In reply to this post by Adam Heath-2

I'd have to second the request for a patch... either here or on a Jira  
issue. As familiar as I am with that class I don't have it memorized  
enough to be able to pick out the differences.

Anyway, yes, looking forward to seeing more.

-David


On Jun 30, 2008, at 10:21 AM, Adam Heath wrote:

> So, one year ago I was tasked with fixing some deadlocks in  
> Webslinger(the website container Brainfood has developed).  While  
> investigating solutions to the problems I was seeing, I stumbled  
> across the book, "Java Concurrency in Practice".
>
> OH
> MY
> GOD
> !
>
> It's not normal for me to get all ecstatic about a programming book,  
> but this one has to be at least an 11(on a scale of 1-10).  Anyone  
> doing *any* multi-threaded programming in java absolutely *must*  
> read this book.
>
> Anyways, after reading said book, I completely rewrote the internals  
> of webslinger, following the guidelines I had read about.  It's now  
> been a year, and what I figured out seems to be running great; no  
> normal code paths thru webslinger take out *any* locks now.
>
> I've decided to start applying this knowledge against OfBiz.  My  
> first attempt is attached.
>
> While SequenceUtil is not a highly-contended resource, it is simple  
> enough, and used often enough, so that others can understand what is  
> going on, and it's possible to see if the change actually breaks  
> anything.
>
> Summarizing, here is how it functions(at a high-level).
>
> 1: Any fields that have to be modified together have to be moved to  
> separate class.
> 2: Said class is made final, and *all* fields are also made final;  
> equals() and hashCode() are implemented.
> 3: The parent class is modified to use an AtomicReference.
> 4: Any time the parent class needs to change one of the original  
> fields, it makes a copy of the current reference with the new  
> values, then does an atomic compareAndSet.  This can cause multiple  
> allocations when contended, however, java 1.6 is smart enough to do  
> some allocations on the stack, so in general this is not a problem.
>
> These changes invariably increase the size of the code; however,  
> they do reduce overhead in the long run, so I consider them  
> beneficial.
>
> Attached you will find SequenceUtil.java; I've modified it to be non-
> blocking.  I've done "ant run-install", with no problems, but have  
> *not* yet run any test cases.
>
> I'm sending it here(to this list) first, before checking it in, as  
> this is a radical departure from simple programming practices, and  
> would like others to discuss if I should do this in other places.
> /
> *******************************************************************************
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements.  See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership.  The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License.  You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing,
> * software distributed under the License is distributed on an
> * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> * KIND, either express or implied.  See the License for the
> * specific language governing permissions and limitations
> * under the License.
> *******************************************************************************/
> package org.ofbiz.entity.util;
>
> import java.sql.Connection;
> import java.sql.ResultSet;
> import java.sql.SQLException;
> import java.sql.Statement;
> import java.util.Hashtable;
> import java.util.Map;
> import java.util.concurrent.ConcurrentHashMap;
> import java.util.concurrent.atomic.AtomicReference;
>
> import javax.transaction.Transaction;
>
> import org.ofbiz.base.util.Debug;
> import org.ofbiz.entity.GenericEntityException;
> import org.ofbiz.entity.jdbc.ConnectionFactory;
> import org.ofbiz.entity.model.ModelEntity;
> import org.ofbiz.entity.model.ModelField;
> import org.ofbiz.entity.transaction.GenericTransactionException;
> import org.ofbiz.entity.transaction.TransactionUtil;
>
> /**
> * Sequence Utility to get unique sequences from named sequence banks
> * Uses a collision detection approach to safely get unique sequenced  
> ids in banks from the database
> */
> public class SequenceUtil {
>
>    public static final String module = SequenceUtil.class.getName();
>
>    ConcurrentHashMap<String, SequenceBank> sequences = new  
> ConcurrentHashMap<String, SequenceBank>();
>    String helperName;
>    ModelEntity seqEntity;
>    String tableName;
>    String nameColName;
>    String idColName;
>
>    private SequenceUtil() {}
>
>    public SequenceUtil(String helperName, ModelEntity seqEntity,  
> String nameFieldName, String idFieldName) {
>        this.helperName = helperName;
>        this.seqEntity = seqEntity;
>        if (seqEntity == null) {
>            throw new IllegalArgumentException("The sequence model  
> entity was null but is required.");
>        }
>        this.tableName = seqEntity.getTableName(helperName);
>
>        ModelField nameField = seqEntity.getField(nameFieldName);
>
>        if (nameField == null) {
>            throw new IllegalArgumentException("Could not find the  
> field definition for the sequence name field " + nameFieldName);
>        }
>        this.nameColName = nameField.getColName();
>
>        ModelField idField = seqEntity.getField(idFieldName);
>
>        if (idField == null) {
>            throw new IllegalArgumentException("Could not find the  
> field definition for the sequence id field " + idFieldName);
>        }
>        this.idColName = idField.getColName();
>    }
>
>    public Long getNextSeqId(String seqName, long staggerMax,  
> ModelEntity seqModelEntity) {
>        SequenceBank bank = this.getBank(seqName, seqModelEntity);
>        return bank.getNextSeqId(staggerMax);
>    }
>
>    public void forceBankRefresh(String seqName, long staggerMax) {
>        // don't use the get method because we don't want to create  
> if it fails
>        SequenceBank bank = sequences.get(seqName);
>        if (bank == null) {
>            return;
>        }
>
>        bank.refresh(staggerMax);
>    }
>
>    private SequenceBank getBank(String seqName, ModelEntity  
> seqModelEntity) {
>        SequenceBank bank = sequences.get(seqName);
>
>        if (bank == null) {
>            bank = new SequenceBank(seqName, seqModelEntity, this);
>            SequenceBank oldBank = sequences.putIfAbsent(seqName,  
> bank);
>            if (oldBank != null) bank = oldBank;
>        }
>
>        return bank;
>    }
>
>    class SequenceBank {
>        public static final long defaultBankSize = 10;
>        public static final long maxBankSize = 5000;
>        public static final long startSeqId = 10000;
>        public static final int minWaitMillis = 5;
>        public static final int maxWaitMillis = 50;
>        public static final int maxTries = 5;
>
>        final AtomicReference<SequenceValue> ref;
>        String seqName;
>        SequenceUtil parentUtil;
>        ModelEntity seqModelEntity;
>
>        public SequenceBank(String seqName, ModelEntity  
> seqModelEntity, SequenceUtil parentUtil) {
>            this.seqName = seqName;
>            this.parentUtil = parentUtil;
>            this.seqModelEntity = seqModelEntity;
>            ref = new AtomicReference<SequenceValue>(new  
> SequenceValue(0, 0).fillBank(1));
>        }
>
>        public Long getNextSeqId(long staggerMax) {
>            long stagger = 1;
>            if (staggerMax > 1) {
>                stagger = Math.round(Math.random() * staggerMax);
>                if (stagger == 0) stagger = 1;
>            }
>            SequenceValue value;
>            do {
>                value = ref.get();
>                if ((value.curSeqId + stagger) > value.maxSeqId) {
>                    value = value.fillBank(stagger);
>                    if ((value.curSeqId + stagger) > value.maxSeqId) {
>                        
> Debug.logError("[SequenceUtil.SequenceBank.getNextSeqId] Fill bank  
> failed, returning null", module);
>                        return null;
>                    }
>                }
>            } while (!ref.compareAndSet(value, new  
> SequenceValue(value.curSeqId + stagger, value.maxSeqId)));
>            return Long.valueOf(value.curSeqId);
>        }
>
>        public void refresh(long staggerMax) {
>            SequenceValue value;
>            do {
>                value = ref.get();
>            } while (!ref.compareAndSet(value,  
> value.refresh(staggerMax)));
>        }
>
>
>
>    protected final class SequenceValue {
>        protected final long curSeqId;
>        protected final long maxSeqId;
>
>        protected SequenceValue(long curSeqId, long maxSeqId) {
>            this.curSeqId = curSeqId;
>            this.maxSeqId = maxSeqId;
>        }
>
>        public int hashCode() {
>            long r = curSeqId ^ maxSeqId;
>            return (int) ((r >> 32) ^ (r & 0xffff));
>        }
>
>        public boolean equals(Object o) {
>            if (!(o instanceof SequenceValue)) return false;
>            SequenceValue other = (SequenceValue) o;
>            return curSeqId == other.curSeqId && maxSeqId ==  
> other.maxSeqId;
>        }
>
>        protected SequenceValue refresh(long staggerMax) {
>            return fillBank(staggerMax, this.curSeqId);
>        }
>
>        protected SequenceValue fillBank(long stagger) {
>            return fillBank(stagger, this.curSeqId);
>        }
>
>        private SequenceValue fillBank(long stagger, long curSeqId) {
>            //Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]  
> Starting fillBank Thread Name is: " +  
> Thread.currentThread().getName() + ":" +  
> Thread.currentThread().toString(), module);
>
>            // no need to get a new bank, SeqIds available
>            if ((curSeqId + stagger) <= maxSeqId) return this;
>
>            long bankSize = defaultBankSize;
>            if (seqModelEntity != null &&  
> seqModelEntity.getSequenceBankSize() != null) {
>                bankSize =  
> seqModelEntity.getSequenceBankSize().longValue();
>            }
>            if (stagger > 1) {
>                // NOTE: could use staggerMax for this, but if that  
> is done it would be easier to guess a valid next id without a brute  
> force attack
>                bankSize = stagger * defaultBankSize;
>            }
>
>            if (bankSize > maxBankSize) bankSize = maxBankSize;
>
>            long val1 = 0;
>            long val2 = 0;
>
>            // NOTE: the fancy ethernet type stuff is for the case  
> where transactions not available, or something funny happens with  
> really sensitive timing (between first select and update, for example)
>            int numTries = 0;
>
>            while (val1 + bankSize != val2) {
>                if (Debug.verboseOn())  
> Debug.logVerbose("[SequenceUtil.SequenceBank.fillBank] Trying to get  
> a bank of sequenced ids for " +
>                        seqName + "; start of loop val1=" + val1 + ",  
> val2=" + val2 + ", bankSize=" + bankSize, module);
>
>                // not sure if this synchronized block is totally  
> necessary, the method is synchronized but it does do a wait/sleep
>                //outside of this block, and this is the really  
> sensitive block, so making sure it is isolated; there is some overhead
>                //to this, but very bad things can happen if we try  
> to do too many of these at once for a single sequencer
>                    Transaction suspendedTransaction = null;
>                    try {
>                        //if we can suspend the transaction, we'll  
> try to do this in a local manual transaction
>                        suspendedTransaction =  
> TransactionUtil.suspend();
>
>                        boolean beganTransaction = false;
>                        try {
>                            beganTransaction = TransactionUtil.begin();
>
>                            Connection connection = null;
>                            Statement stmt = null;
>                            ResultSet rs = null;
>
>                            try {
>                                connection =  
> ConnectionFactory.getConnection(parentUtil.helperName);
>                            } catch (SQLException sqle) {
>                                
> Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]: Unable to  
> esablish a connection with the database... Error was:" +  
> sqle.toString(), module);
>                                throw sqle;
>                            } catch (GenericEntityException e) {
>                                
> Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]: Unable to  
> esablish a connection with the database... Error was: " +  
> e.toString(), module);
>                                throw e;
>                            }
>
>                            if (connection == null) {
>                                throw new  
> GenericEntityException("[SequenceUtil.SequenceBank.fillBank]: Unable  
> to esablish a connection with the database, connection was null...");
>                            }
>
>                            String sql = null;
>
>                            try {
>                                // we shouldn't need this, and some  
> TX managers complain about it, so not including it:  
> connection.setAutoCommit(false);
>
>                                stmt = connection.createStatement();
>
>                                sql = "SELECT " +  
> parentUtil.idColName + " FROM " + parentUtil.tableName + " WHERE " +  
> parentUtil.nameColName + "='" + seqName + "'";
>                                rs = stmt.executeQuery(sql);
>                                boolean gotVal1 = false;
>                                if (rs.next()) {
>                                    val1 =  
> rs.getLong(parentUtil.idColName);
>                                    gotVal1 = true;
>                                }
>                                rs.close();
>
>                                if (!gotVal1) {
>                                    
> Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] first select  
> failed: will try to add new row, result set was empty for sequence  
> [" + seqName + "] \nUsed SQL: " + sql + " \n Thread Name is: " +  
> Thread.currentThread().getName() + ":" +  
> Thread.currentThread().toString(), module);
>                                    sql = "INSERT INTO " +  
> parentUtil.tableName + " (" + parentUtil.nameColName + ", " +  
> parentUtil.idColName + ") VALUES ('" + seqName + "', " + startSeqId  
> + ")";
>                                    if (stmt.executeUpdate(sql) <= 0) {
>                                        throw new  
> GenericEntityException("No rows changed when trying insert new  
> sequence row with this SQL: " + sql);
>                                    }
>                                    continue;
>                                }
>
>                                sql = "UPDATE " +  
> parentUtil.tableName + " SET " + parentUtil.idColName + "=" +  
> parentUtil.idColName + "+" + bankSize + " WHERE " +  
> parentUtil.nameColName + "='" + seqName + "'";
>                                if (stmt.executeUpdate(sql) <= 0) {
>                                    throw new  
> GenericEntityException("[SequenceUtil.SequenceBank.fillBank] update  
> failed, no rows changes for seqName: " + seqName);
>                                }
>
>                                sql = "SELECT " +  
> parentUtil.idColName + " FROM " + parentUtil.tableName + " WHERE " +  
> parentUtil.nameColName + "='" + seqName + "'";
>                                rs = stmt.executeQuery(sql);
>                                boolean gotVal2 = false;
>                                if (rs.next()) {
>                                    val2 =  
> rs.getLong(parentUtil.idColName);
>                                    gotVal2 = true;
>                                }
>
>                                rs.close();
>
>                                if (!gotVal2) {
>                                    throw new  
> GenericEntityException("[SequenceUtil.SequenceBank.fillBank] second  
> select failed: aborting, result set was empty for sequence: " +  
> seqName);
>                                }
>
>                                // got val1 and val2 at this point,  
> if we don't have the right difference between them, force a rollback  
> (with
>                                //setRollbackOnly and NOT with an  
> exception because we don't want to break from the loop, just err out  
> and
>                                //continue), then flow out to allow  
> the wait and loop thing to happen
>                                if (val1 + bankSize != val2) {
>                                    
> TransactionUtil.setRollbackOnly("Forcing transaction rollback in  
> sequence increment because we didn't get a clean update, ie a  
> conflict was found, so not saving the results", null);
>                                }
>                            } catch (SQLException sqle) {
>                                Debug.logWarning(sqle,  
> "[SequenceUtil.SequenceBank.fillBank] SQL Exception while executing  
> the following:\n" + sql + "\nError was:" + sqle.getMessage(), module);
>                                throw sqle;
>                            } finally {
>                                try {
>                                    if (stmt != null) stmt.close();
>                                } catch (SQLException sqle) {
>                                    Debug.logWarning(sqle, "Error  
> closing statement in sequence util", module);
>                                }
>                                try {
>                                    if (connection != null)  
> connection.close();
>                                } catch (SQLException sqle) {
>                                    Debug.logWarning(sqle, "Error  
> closing connection in sequence util", module);
>                                }
>                            }
>                        } catch (Exception e) {
>                            String errMsg = "General error in getting  
> a sequenced ID";
>                            Debug.logError(e, errMsg, module);
>                            try {
>                                
> TransactionUtil.rollback(beganTransaction, errMsg, e);
>                            } catch (GenericTransactionException  
> gte2) {
>                                Debug.logError(gte2, "Unable to  
> rollback transaction", module);
>                            }
>
>                            // error, break out of the loop to not  
> try to continue forever
>                            break;
>                        } finally {
>                            try {
>                                
> TransactionUtil.commit(beganTransaction);
>                            } catch (GenericTransactionException gte) {
>                                Debug.logError(gte, "Unable to commit  
> sequence increment transaction, continuing anyway though", module);
>                            }
>                        }
>                    } catch (GenericTransactionException e) {
>                        Debug.logError(e, "System Error suspending  
> transaction in sequence util", module);
>                    } finally {
>                        if (suspendedTransaction != null) {
>                            try {
>                                
> TransactionUtil.resume(suspendedTransaction);
>                            } catch (GenericTransactionException e) {
>                                Debug.logError(e, "Error resuming  
> suspended transaction in sequence util", module);
>                            }
>                        }
>                    }
>
>                if (val1 + bankSize != val2) {
>                    if (numTries >= maxTries) {
>                        String errMsg =  
> "[SequenceUtil.SequenceBank.fillBank] maxTries (" + maxTries + ")  
> reached for seqName [" + seqName + "], giving up.";
>                        Debug.logError(errMsg, module);
>                        return this;
>                    }
>
>                    // collision happened, wait a bounded random  
> amount of time then continue
>                    int waitTime = (Double.valueOf(Math.random() *  
> (maxWaitMillis - minWaitMillis))).intValue() + minWaitMillis;
>
>                    
> Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] Collision  
> found for seqName [" + seqName + "], val1=" + val1 + ", val2=" +  
> val2 + ", val1+bankSize=" + (val1 + bankSize) + ", bankSize=" +  
> bankSize + ", waitTime=" + waitTime, module);
>
>                    try {
>                        // using the Thread.sleep to more reliably  
> lock this thread: this.wait(waitTime);
>                        java.lang.Thread.sleep(waitTime);
>                    } catch (Exception e) {
>                        Debug.logWarning(e, "Error waiting in  
> sequence util", module);
>                        return this;
>                    }
>                }
>
>                numTries++;
>            }
>
>            if (Debug.infoOn()) Debug.logInfo("Got bank of sequenced  
> IDs for [" + seqName + "]; curSeqId=" + val1 + ", maxSeqId=" + val2  
> + ", bankSize=" + bankSize, module);
>            return new SequenceValue(val1, val2);
>            //Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]  
> Ending fillBank Thread Name is: " + Thread.currentThread().getName()  
> + ":" + Thread.currentThread().toString(), module);
>        }
>
>    }
>    }
> }

Reply | Threaded
Open this post in threaded view
|

Re: SequenceUtil and ConcurrentHashMap(non-blocking)

David E Jones
In reply to this post by Adam Heath-2

On Jun 30, 2008, at 1:43 PM, Adam Heath wrote:

> Jacopo Cappellato wrote:
>> Hi Adam,
>> On Jun 30, 2008, at 6:21 PM, Adam Heath wrote:
>>>
>>> Attached you will find SequenceUtil.java; I've modified it to be  
>>> non-blocking.  I've done "ant run-install", with no problems, but  
>>> have *not* yet run any test cases.
>>>
>>> I'm sending it here(to this list) first, before checking it in, as  
>>> this is a radical departure from simple programming practices, and  
>>> would like others to discuss if I should do this in other places.
>> thanks for your work and yes, please, add this code to Jira to make  
>> it easier to browse and comment. Also, before checking it in,  
>> please run some test cases.
>
> Well, I could, but not without doing other stuff first.
>
> As part of the official build process for webslinger(which means  
> uploading a debian package to our local repo), I *always* run it's  
> test cases, and they *must* pass.  And, as part of that, I always  
> run them under cobertura, a code-coverage tool.  I am currently at  
> 41% coverage, with some parts over 90%.
>
> I'd like to add such support to ofbiz, but can't use cobertura(it's  
> gpl).  I'm open for suggestions.

Do you really need a code coverage tool in order to do tests? I guess  
I'm not quite sure what you're getting at with this question...

There are quite a few potential tools for testing these things, but a  
JUnit class or a Service Engine service run as a test are nice options.

-David


Reply | Threaded
Open this post in threaded view
|

Re: SequenceUtil and ConcurrentHashMap(non-blocking)

Adam Heath-2
David E Jones wrote:
> Do you really need a code coverage tool in order to do tests? I guess
> I'm not quite sure what you're getting at with this question...

So that you know every branch, every test, every exception block, every
method, is actually being executed.

If you have some if statement, who's true block is never executed, then
how can you be certain whether the condition is specified correctly?

Since most of ofbiz is loaded thru a custom classloader, it makes it
simple to deal with; For each class in each jar loaded, add the
instrumentation(by use of a temp file).  I've got experience with
cobertura, but that's GPL, so not compatible with apache software.

> There are quite a few potential tools for testing these things, but a
> JUnit class or a Service Engine service run as a test are nice options.

The version I sent earlier was completely broken.  I've since fixed it,
but will run with it a bit more before opening this up thru jira.
Reply | Threaded
Open this post in threaded view
|

Re: SequenceUtil and ConcurrentHashMap(non-blocking)

David E Jones

On Jun 30, 2008, at 11:14 PM, Adam Heath wrote:

> David E Jones wrote:
>> Do you really need a code coverage tool in order to do tests? I  
>> guess I'm not quite sure what you're getting at with this question...
>
> So that you know every branch, every test, every exception block,  
> every method, is actually being executed.
>
> If you have some if statement, who's true block is never executed,  
> then how can you be certain whether the condition is specified  
> correctly?
>
> Since most of ofbiz is loaded thru a custom classloader, it makes it  
> simple to deal with; For each class in each jar loaded, add the  
> instrumentation(by use of a temp file).  I've got experience with  
> cobertura, but that's GPL, so not compatible with apache software.

Was this another joke? I'm not getting the punch line...

Thank you for assuming I'm an idiot and don't know what a code  
coverage tool is or does. That wasn't my question... my question was:  
why do you need one in order to write the tests that were mentioned?  
In fact, what does it have to do with it at all?

BTW, just to take this opportunity to slip in my personal opinion:  
code coverage tools are of very limited usefulness. They are what  
people use when they don't have real requirements or designs to test  
against. In essence you toss out all of that and have one requirement  
"run every line of code". Even that is a silly one, because running  
the same code and going through the same path in the code with  
different data can do very different things, and meet (or fail to  
meet!) different requirements.

>> There are quite a few potential tools for testing these things, but  
>> a JUnit class or a Service Engine service run as a test are nice  
>> options.
>
> The version I sent earlier was completely broken.  I've since fixed  
> it, but will run with it a bit more before opening this up thru jira.

Was that a response to what I wrote? I guess not, sounds good though.

-David


Reply | Threaded
Open this post in threaded view
|

Re: SequenceUtil and ConcurrentHashMap(non-blocking)

Jacopo Cappellato-3
Hi Adam,

what I really meant when I asked you to run some tests before  
committing was actually... "run some tests before committing" :-)
If you can create some automated tests (like the few ones we already  
have in OFBiz) that would be great... but at least run some manual  
tests so that you can be rather confident the modified code is still  
working fine when used by the existing appliications (from Java,  
Minilang etc...).

Cheers,

Jacopo

On Jul 1, 2008, at 7:24 AM, David E Jones wrote:

>
> On Jun 30, 2008, at 11:14 PM, Adam Heath wrote:
>
>> David E Jones wrote:
>>> Do you really need a code coverage tool in order to do tests? I  
>>> guess I'm not quite sure what you're getting at with this  
>>> question...
>>
>> So that you know every branch, every test, every exception block,  
>> every method, is actually being executed.
>>
>> If you have some if statement, who's true block is never executed,  
>> then how can you be certain whether the condition is specified  
>> correctly?
>>
>> Since most of ofbiz is loaded thru a custom classloader, it makes  
>> it simple to deal with; For each class in each jar loaded, add the  
>> instrumentation(by use of a temp file).  I've got experience with  
>> cobertura, but that's GPL, so not compatible with apache software.
>
> Was this another joke? I'm not getting the punch line...
>
> Thank you for assuming I'm an idiot and don't know what a code  
> coverage tool is or does. That wasn't my question... my question  
> was: why do you need one in order to write the tests that were  
> mentioned? In fact, what does it have to do with it at all?
>
> BTW, just to take this opportunity to slip in my personal opinion:  
> code coverage tools are of very limited usefulness. They are what  
> people use when they don't have real requirements or designs to test  
> against. In essence you toss out all of that and have one  
> requirement "run every line of code". Even that is a silly one,  
> because running the same code and going through the same path in the  
> code with different data can do very different things, and meet (or  
> fail to meet!) different requirements.
>
>>> There are quite a few potential tools for testing these things,  
>>> but a JUnit class or a Service Engine service run as a test are  
>>> nice options.
>>
>> The version I sent earlier was completely broken.  I've since fixed  
>> it, but will run with it a bit more before opening this up thru jira.
>
> Was that a response to what I wrote? I guess not, sounds good though.
>
> -David
>
>