Author: jonesde
Date: Sun Sep 9 01:13:25 2007 New Revision: 573956 URL: http://svn.apache.org/viewvc?rev=573956&view=rev Log: A few refinements to the deadlock retry test case, but the main part of this is the actual implementation of the deadlock retry in the service engine, and the implementation of that makes the deadlock test case pass now; this is a bit of rework in some low level code so this needs some good testing beyond the few general tests already done Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java ofbiz/trunk/framework/service/src/org/ofbiz/service/engine/GenericAsyncEngine.java ofbiz/trunk/framework/service/src/org/ofbiz/service/engine/StandardJavaEngine.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java ofbiz/trunk/framework/service/src/org/ofbiz/service/test/ServiceEngineTestServices.java ofbiz/trunk/framework/service/testdef/data/ServiceDeadLockRetryAssertData.xml Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java?rev=573956&r1=573955&r2=573956&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java Sun Sep 9 01:13:25 2007 @@ -61,6 +61,7 @@ public static final String module = ServiceDispatcher.class.getName(); public static final int lruLogSize = 200; + public static final int LOCK_RETRIES = 3; protected static final Map runLog = new LRUMap(lruLogSize); protected static Map dispatchers = FastMap.newInstance(); @@ -246,7 +247,7 @@ context = FastMap.newInstance(); } - // setup the result map + // setup the result map and other initial settings Map result = FastMap.newInstance(); boolean isFailure = false; boolean isError = false; @@ -264,7 +265,7 @@ DispatchContext ctx = (DispatchContext) localContext.get(localName); GenericEngine engine = this.getGenericEngine(modelService.engineName); - // setup default IN values + // set IN attributes with default-value as applicable modelService.updateDefaultValues(context, ModelService.IN_PARAM); Map ecaContext = null; @@ -283,80 +284,147 @@ // now start a new transaction beganTrans = TransactionUtil.begin(modelService.transactionTimeout); } - } - - // XAResource debugging - if (beganTrans && TransactionUtil.debugResources) { - DebugXaResource dxa = new DebugXaResource(modelService.name); - try { - dxa.enlist(); - } catch (Exception e) { - Debug.logError(e, module); + // enlist for XAResource debugging + if (beganTrans && TransactionUtil.debugResources) { + DebugXaResource dxa = new DebugXaResource(modelService.name); + try { + dxa.enlist(); + } catch (Exception e) { + Debug.logError(e, module); + } } } try { - // setup global transaction ECA listeners to execute later - if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "global-rollback", ctx, context, result, isError, isFailure); - if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "global-commit", ctx, context, result, isError, isFailure); - - // pre-auth ECA - if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "auth", ctx, context, result, isError, isFailure); - - // check for pre-auth failure/errors - isFailure = ServiceUtil.isFailure(result); - isError = ServiceUtil.isError(result); - context = checkAuth(localName, context, modelService); - Object userLogin = context.get("userLogin"); + int lockRetriesRemaining = LOCK_RETRIES; + boolean needsLockRetry = false; + + do { + lockRetriesRemaining--; + + // NOTE: general pattern here is to do everything up to the main service call, and retry it all if + //needed because those will be part of the same transaction and have been rolled back + // TODO: if there is an ECA called async or in a new transaciton it won't get rolled back + //but will be called again, which means the service may complete multiple times! that would be for + //pre-invoke and earlier events only of course + + + // setup global transaction ECA listeners to execute later + if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "global-rollback", ctx, context, result, isError, isFailure); + if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "global-commit", ctx, context, result, isError, isFailure); + + // pre-auth ECA + if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "auth", ctx, context, result, isError, isFailure); + + // check for pre-auth failure/errors + isFailure = ServiceUtil.isFailure(result); + isError = ServiceUtil.isError(result); + + //Debug.logInfo("After [" + modelService.name + "] pre-auth ECA, before auth; isFailure=" + isFailure + ", isError=" + isError, module); - if (modelService.auth && userLogin == null) { - throw new ServiceAuthException("User authorization is required for this service: " + modelService.name + modelService.debugInfo()); - } + context = checkAuth(localName, context, modelService); + Object userLogin = context.get("userLogin"); - // pre-validate ECA - if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "in-validate", ctx, context, result, isError, isFailure); + if (modelService.auth && userLogin == null) { + throw new ServiceAuthException("User authorization is required for this service: " + modelService.name + modelService.debugInfo()); + } - // check for pre-validate failure/errors - isFailure = ServiceUtil.isFailure(result); - isError = ServiceUtil.isError(result); + // pre-validate ECA + if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "in-validate", ctx, context, result, isError, isFailure); - // validate the context - if (modelService.validate && !isError && !isFailure) { - try { - modelService.validate(context, ModelService.IN_PARAM, locale); - } catch (ServiceValidationException e) { - Debug.logError(e, "Incoming context (in runSync : " + modelService.name + ") does not match expected requirements", module); - throw e; + // check for pre-validate failure/errors + isFailure = ServiceUtil.isFailure(result); + isError = ServiceUtil.isError(result); + + //Debug.logInfo("After [" + modelService.name + "] pre-in-validate ECA, before in-validate; isFailure=" + isFailure + ", isError=" + isError, module); + + // validate the context + if (modelService.validate && !isError && !isFailure) { + try { + modelService.validate(context, ModelService.IN_PARAM, locale); + } catch (ServiceValidationException e) { + Debug.logError(e, "Incoming context (in runSync : " + modelService.name + ") does not match expected requirements", module); + throw e; + } } - } - // pre-invoke ECA - if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "invoke", ctx, context, result, isError, isFailure); + // pre-invoke ECA + if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "invoke", ctx, context, result, isError, isFailure); - // check for pre-invoke failure/errors - isFailure = ServiceUtil.isFailure(result); - isError = ServiceUtil.isError(result); - - // ===== invoke the service ===== - if (!isError && !isFailure) { - Map invokeResult = engine.runSync(localName, modelService, context); - engine.sendCallbacks(modelService, context, invokeResult, GenericEngine.SYNC_MODE); - if (invokeResult != null) { - result.putAll(invokeResult); - } else { - Debug.logWarning("Service (in runSync : " + modelService.name + ") returns null result", module); + // check for pre-invoke failure/errors + isFailure = ServiceUtil.isFailure(result); + isError = ServiceUtil.isError(result); + + //Debug.logInfo("After [" + modelService.name + "] pre-invoke ECA, before invoke; isFailure=" + isFailure + ", isError=" + isError, module); + + // ===== invoke the service ===== + if (!isError && !isFailure) { + Map invokeResult = engine.runSync(localName, modelService, context); + engine.sendCallbacks(modelService, context, invokeResult, GenericEngine.SYNC_MODE); + if (invokeResult != null) { + result.putAll(invokeResult); + } else { + Debug.logWarning("Service (in runSync : " + modelService.name + ") returns null result", module); + } } - } - - // re-check the errors/failures - isFailure = ServiceUtil.isFailure(result); - isError = ServiceUtil.isError(result); + + // re-check the errors/failures + isFailure = ServiceUtil.isFailure(result); + isError = ServiceUtil.isError(result); + + //Debug.logInfo("After [" + modelService.name + "] invoke; isFailure=" + isFailure + ", isError=" + isError, module); + + // crazy stuff here: see if there was a deadlock error and if so retry... which we can ONLY do if we own the transaction! + if (beganTrans) { + // look for the string DEADLOCK in an upper-cased error message; tested on: Derby, MySQL + // - Derby 10.2.2.0 deadlock string: "A lock could not be obtained due to a deadlock" + // - MySQL TODO + // TODO need testing in other databases because they all return different error messages for this! + String errMsg = ServiceUtil.getErrorMessage(result); + // NOTE DEJ20070908 are there other things we need to check? I don't think so because these will + //be Entity Engine errors that will be caught and come back in an error message... IFF the + //service is written to not ignore it of course! + if (errMsg != null && errMsg.toUpperCase().indexOf("DEADLOCK") >= 0) { + // it's a deadlock! retry... + String retryMsg = "RETRYING SERVICE [" + modelService.name + "]: Deadlock error found in message [" + errMsg + "]; retry [" + (LOCK_RETRIES - lockRetriesRemaining) + "] of [" + LOCK_RETRIES + "]"; + + // make sure the old transaction is rolled back, and then start a new one + + // if there is an exception in these things, let the big overall thing handle it + TransactionUtil.rollback(beganTrans, retryMsg, null); + + beganTrans = TransactionUtil.begin(modelService.transactionTimeout); + // enlist for XAResource debugging + if (beganTrans && TransactionUtil.debugResources) { + DebugXaResource dxa = new DebugXaResource(modelService.name); + try { + dxa.enlist(); + } catch (Exception e) { + Debug.logError(e, module); + } + } + + if (!beganTrans) { + // just log and let things roll through, will be considered an error and ECAs, etc will run according to that + Debug.logError("After rollback attempt for lock retry did not begin a new transaction!", module); + } else { + needsLockRetry = true; + + // reset state variables + result = FastMap.newInstance(); + isFailure = false; + isError = false; + + Debug.logWarning(retryMsg, module); + } + } + } + } while (needsLockRetry && lockRetriesRemaining > 0); // create a new context with the results to pass to ECA services; necessary because caller may reuse this context ecaContext = FastMap.newInstance(); ecaContext.putAll(context); - // copy all results: don't worry parameters that aren't allowed won't be passed to the ECA services ecaContext.putAll(result); @@ -413,10 +481,9 @@ } finally { // if there was an error, rollback transaction, otherwise commit if (isError) { - String errMsg = "Service Error [" + modelService.name + "]: " + ServiceUtil.getErrorMessage(result); - // try to log the error + String errMsg = "Error is Service [" + modelService.name + "]: " + ServiceUtil.getErrorMessage(result); Debug.logError(errMsg, module); - + // rollback the transaction try { TransactionUtil.rollback(beganTrans, errMsg, null); @@ -519,22 +586,20 @@ try { if (service.useTransaction) { beganTrans = TransactionUtil.begin(service.transactionTimeout); - // isolate the transaction if defined if (service.requireNewTransaction && !beganTrans) { parentTransaction = TransactionUtil.suspend(); // now start a new transaction beganTrans = TransactionUtil.begin(service.transactionTimeout); } - } - - // XAResource debugging - if (beganTrans && TransactionUtil.debugResources) { - DebugXaResource dxa = new DebugXaResource(service.name); - try { - dxa.enlist(); - } catch (Exception e) { - Debug.logError(e, module); + // enlist for XAResource debugging + if (beganTrans && TransactionUtil.debugResources) { + DebugXaResource dxa = new DebugXaResource(service.name); + try { + dxa.enlist(); + } catch (Exception e) { + Debug.logError(e, module); + } } } @@ -548,8 +613,9 @@ context = checkAuth(localName, context, service); Object userLogin = context.get("userLogin"); - if (service.auth && userLogin == null) + if (service.auth && userLogin == null) { throw new ServiceAuthException("User authorization is required for this service: " + service.name + service.debugInfo()); + } // pre-validate ECA if (eventMap != null) ServiceEcaUtil.evalRules(service.name, eventMap, "in-validate", ctx, context, result, isError, isFailure); Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/engine/GenericAsyncEngine.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/engine/GenericAsyncEngine.java?rev=573956&r1=573955&r2=573956&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/engine/GenericAsyncEngine.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/engine/GenericAsyncEngine.java Sun Sep 9 01:13:25 2007 @@ -88,8 +88,6 @@ GenericValue jobV = null; // Build the value object(s). try { - List toBeStored = new LinkedList(); - // Create the runtime data String dataId = dispatcher.getDelegator().getNextSeqId("RuntimeData"); @@ -97,7 +95,7 @@ UtilMisc.toMap("runtimeDataId", dataId)); runtimeData.set("runtimeInfo", XmlSerializer.serialize(context)); - toBeStored.add(runtimeData); + runtimeData.create(); // Get the userLoginId out of the context String authUserLoginId = null; @@ -122,9 +120,7 @@ } jobV = dispatcher.getDelegator().makeValue("JobSandbox", jFields); - toBeStored.add(jobV); - dispatcher.getDelegator().storeAll(toBeStored); - + jobV.create(); } catch (GenericEntityException e) { throw new GenericServiceException("Unable to create persisted job", e); } catch (SerializeException e) { @@ -167,4 +163,3 @@ } } } - Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/engine/StandardJavaEngine.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/engine/StandardJavaEngine.java?rev=573956&r1=573955&r2=573956&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/engine/StandardJavaEngine.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/engine/StandardJavaEngine.java Sun Sep 9 01:13:25 2007 @@ -52,8 +52,9 @@ public Map runSync(String localName, ModelService modelService, Map context) throws GenericServiceException { Object result = serviceInvoker(localName, modelService, context); - if (result == null || !(result instanceof Map)) + if (result == null || !(result instanceof Map)) { throw new GenericServiceException("Service did not return expected result"); + } return (Map) result; } @@ -62,28 +63,33 @@ // static java service methods should be: public Map methodName(DispatchContext dctx, Map context) DispatchContext dctx = dispatcher.getLocalContext(localName); - if (modelService == null) + if (modelService == null) { Debug.logError("ERROR: Null Model Service.", module); - if (dctx == null) + } + if (dctx == null) { Debug.logError("ERROR: Null DispatchContext.", module); - if (context == null) + } + if (context == null) { Debug.logError("ERROR: Null Service Context.", module); + } Class[] paramTypes = new Class[] {DispatchContext.class, Map.class}; Object[] params = new Object[] {dctx, context}; Object result = null; // check the package and method names - if (modelService.location == null || modelService.invoke == null) + if (modelService.location == null || modelService.invoke == null) { throw new GenericServiceException("Cannot locate service to invoke (location or invoke name missing)"); + } // get the classloader to use ClassLoader cl = null; - if (dctx == null) + if (dctx == null) { cl = this.getClass().getClassLoader(); - else + } else { cl = dctx.getClassLoader(); + } try { Class c = cl.loadClass(this.getLocation(modelService)); Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=573956&r1=573955&r2=573956&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Sun Sep 9 01:13:25 2007 @@ -88,8 +88,9 @@ /** Queues a Job to run now. */ public void runJob(Job job) throws JobManagerException { - if (job.isValid()) + if (job.isValid()) { jp.queueNow(job); + } } /** Returns the ServiceDispatcher. */ Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/test/ServiceEngineTestServices.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/test/ServiceEngineTestServices.java?rev=573956&r1=573955&r2=573956&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/test/ServiceEngineTestServices.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/test/ServiceEngineTestServices.java Sun Sep 9 01:13:25 2007 @@ -18,8 +18,11 @@ */ package org.ofbiz.service.test; +import java.util.List; import java.util.Map; +import javolution.util.FastList; + import org.ofbiz.base.util.Debug; import org.ofbiz.base.util.UtilMisc; import org.ofbiz.entity.GenericDelegator; @@ -42,8 +45,18 @@ GenericResultWaiter threadAWaiter = dispatcher.runAsyncWait("testServiceDeadLockRetryThreadA", null, false); GenericResultWaiter threadBWaiter = dispatcher.runAsyncWait("testServiceDeadLockRetryThreadB", null, false); // make sure to wait for these to both finish to make sure results aren't checked until they are done - threadAWaiter.waitForResult(); - threadBWaiter.waitForResult(); + Map threadAResult = threadAWaiter.waitForResult(); + Map threadBResult = threadBWaiter.waitForResult(); + List errorList = FastList.newInstance(); + if (ServiceUtil.isError(threadAResult)) { + errorList.add("Error running testServiceDeadLockRetryThreadA: " + ServiceUtil.getErrorMessage(threadAResult)); + } + if (ServiceUtil.isError(threadBResult)) { + errorList.add("Error running testServiceDeadLockRetryThreadB: " + ServiceUtil.getErrorMessage(threadBResult)); + } + if (errorList.size() > 0) { + return ServiceUtil.returnError("Error(s) running sub-services in testServiceDeadLockRetry", errorList, null, null); + } } catch (Exception e) { String errMsg = "Error running deadlock test services: " + e.toString(); Debug.logError(e, errMsg, module); @@ -59,7 +72,7 @@ try { // grab entity SVCLRT_A by changing, then wait, then find and change SVCLRT_B GenericValue testingTypeA = delegator.findByPrimaryKey("TestingType", UtilMisc.toMap("testingTypeId", "SVCLRT_A")); - testingTypeA.set("description", "New description for SVCLRT_A from Thread A"); + testingTypeA.set("description", "New description for SVCLRT_A"); testingTypeA.store(); // wait at least long enough for the other method to have locked resource B @@ -68,7 +81,7 @@ Debug.logInfo("In testServiceDeadLockRetryThreadA done with wait, updating SVCLRT_B", module); GenericValue testingTypeB = delegator.findByPrimaryKey("TestingType", UtilMisc.toMap("testingTypeId", "SVCLRT_B")); - testingTypeB.set("description", "New description for SVCLRT_B from Thread A"); + testingTypeB.set("description", "New description for SVCLRT_B"); testingTypeB.store(); } catch (GenericEntityException e) { String errMsg = "Entity Engine Exception running dead lock test thread A: " + e.toString(); @@ -88,7 +101,7 @@ try { // grab entity SVCLRT_B by changing, then wait, then change SVCLRT_A GenericValue testingTypeB = delegator.findByPrimaryKey("TestingType", UtilMisc.toMap("testingTypeId", "SVCLRT_B")); - testingTypeB.set("description", "New description for SVCLRT_B from Thread B"); + testingTypeB.set("description", "New description for SVCLRT_B"); testingTypeB.store(); // wait at least long enough for the other method to have locked resource B @@ -97,7 +110,7 @@ Debug.logInfo("In testServiceDeadLockRetryThreadB done with wait, updating SVCLRT_A", module); GenericValue testingTypeA = delegator.findByPrimaryKey("TestingType", UtilMisc.toMap("testingTypeId", "SVCLRT_A")); - testingTypeA.set("description", "New description for SVCLRT_A from Thread B"); + testingTypeA.set("description", "New description for SVCLRT_A"); testingTypeA.store(); } catch (GenericEntityException e) { String errMsg = "Entity Engine Exception running dead lock test thread B: " + e.toString(); @@ -119,8 +132,18 @@ GenericResultWaiter grabberWaiter = dispatcher.runAsyncWait("testServiceLockWaitTimeoutRetryGrabber", null, false); GenericResultWaiter waiterWaiter = dispatcher.runAsyncWait("testServiceLockWaitTimeoutRetryWaiter", null, false); // make sure to wait for these to both finish to make sure results aren't checked until they are done - grabberWaiter.waitForResult(); - waiterWaiter.waitForResult(); + Map grabberResult = grabberWaiter.waitForResult(); + Map waiterResult = waiterWaiter.waitForResult(); + List errorList = FastList.newInstance(); + if (ServiceUtil.isError(grabberResult)) { + errorList.add("Error running testServiceLockWaitTimeoutRetryGrabber: " + ServiceUtil.getErrorMessage(grabberResult)); + } + if (ServiceUtil.isError(waiterResult)) { + errorList.add("Error running testServiceLockWaitTimeoutRetryWaiter: " + ServiceUtil.getErrorMessage(waiterResult)); + } + if (errorList.size() > 0) { + return ServiceUtil.returnError("Error(s) running sub-services in testServiceLockWaitTimeoutRetry", errorList, null, null); + } } catch (Exception e) { String errMsg = "Error running deadlock test services: " + e.toString(); Debug.logError(e, errMsg, module); @@ -206,7 +229,10 @@ Debug.logInfo("In testServiceLockWaitTimeoutRetryCantRecover (grabber) just updated SVCLWTRTCR, running sub-service in own transaction", module); // timeout is 5 seconds so it is longer than the tx timeout for this service, so will fail quickly; with this transaction keeping a lock on the record and that one trying to get it, bam we cause the error - dispatcher.runSync("testServiceLockWaitTimeoutRetryCantRecoverWaiter", null, 5, true); + Map waiterResult = dispatcher.runSync("testServiceLockWaitTimeoutRetryCantRecoverWaiter", null, 5, true); + if (ServiceUtil.isError(waiterResult)) { + return ServiceUtil.returnError("Error running testServiceLockWaitTimeoutRetryCantRecoverWaiter", null, null, waiterResult); + } Debug.logInfo("In testServiceLockWaitTimeoutRetryCantRecover (grabber) successfully finished running sub-service in own transaction", module); } catch (GenericServiceException e) { Modified: ofbiz/trunk/framework/service/testdef/data/ServiceDeadLockRetryAssertData.xml URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/testdef/data/ServiceDeadLockRetryAssertData.xml?rev=573956&r1=573955&r2=573956&view=diff ============================================================================== --- ofbiz/trunk/framework/service/testdef/data/ServiceDeadLockRetryAssertData.xml (original) +++ ofbiz/trunk/framework/service/testdef/data/ServiceDeadLockRetryAssertData.xml Sun Sep 9 01:13:25 2007 @@ -19,6 +19,9 @@ --> <entity-engine-xml> <!-- assert data for service dead lock recovery test --> - <TestingType testingTypeId="SVCLRT_A" description="New description for SVCLRT_A from Thread B"/> - <TestingType testingTypeId="SVCLRT_B" description="New description for SVCLRT_B from Thread A"/> + <!-- NOTE we don't know which thread (A or B) these will be from because that is a race condition thing decided by + the database about which transaction to terminate, and the terminated one will be the one that re-tries and + make it into the database at the end. --> + <TestingType testingTypeId="SVCLRT_A" description="New description for SVCLRT_A"/> + <TestingType testingTypeId="SVCLRT_B" description="New description for SVCLRT_B"/> </entity-engine-xml> |
Free forum by Nabble | Edit this page |