Modified: ofbiz/trunk/framework/entityext/src/org/ofbiz/entityext/synchronization/EntitySyncContext.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/entityext/src/org/ofbiz/entityext/synchronization/EntitySyncContext.java?rev=757070&r1=757069&r2=757070&view=diff ============================================================================== --- ofbiz/trunk/framework/entityext/src/org/ofbiz/entityext/synchronization/EntitySyncContext.java (original) +++ ofbiz/trunk/framework/entityext/src/org/ofbiz/entityext/synchronization/EntitySyncContext.java Sat Mar 21 23:23:34 2009 @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -64,9 +64,9 @@ * Entity Engine Sync Services */ public class EntitySyncContext { - + public static final String module = EntitySyncContext.class.getName(); - + // set default split to 10 seconds, ie try not to get too much data moving over at once public static final long defaultSyncSplitMillis = 10000; @@ -91,20 +91,20 @@ public String targetServiceName; public String targetDelegatorName; - + public Timestamp syncEndStamp; public long offlineSyncSplitMillis = defaultOfflineSyncSplitMillis; public long syncSplitMillis = defaultSyncSplitMillis; public long syncEndBufferMillis = defaultSyncEndBufferMillis; public long maxRunningNoUpdateMillis = defaultMaxRunningNoUpdateMillis; - + public Timestamp lastSuccessfulSynchTime; public List<ModelEntity> entityModelToUseList; public Set<String> entityNameToUseSet; public Timestamp currentRunStartTime; public Timestamp currentRunEndTime; - // these values are used to make this more efficient; if we run into an entity that has 0 + // these values are used to make this more efficient; if we run into an entity that has 0 //results for a given time block, we will do a query to find the next create/update/remove //time for that entity, and also keep track of a global next with the lowest future next value; //using these we can skip a lot of queries and speed this up significantly @@ -113,7 +113,7 @@ public Timestamp nextCreateTxTime = null; public Timestamp nextUpdateTxTime = null; public Timestamp nextRemoveTxTime = null; - + // this is the other part of the history PK, leave null until we create the history object public Timestamp startDate = null; @@ -153,7 +153,7 @@ this.userLogin = (GenericValue) context.get("userLogin"); - + this.entitySyncId = (String) context.get("entitySyncId"); Debug.logInfo("Creating EntitySyncContext with entitySyncId=" + entitySyncId, module); @@ -172,8 +172,8 @@ targetServiceName = entitySync.getString("targetServiceName"); targetDelegatorName = entitySync.getString("targetDelegatorName"); - - // make the last time to sync X minutes before the current time so that if this machines clock is up to that amount of time + + // make the last time to sync X minutes before the current time so that if this machines clock is up to that amount of time //ahead of another machine writing to the DB it will still work fine and not lose any data syncEndStamp = new Timestamp(System.currentTimeMillis() - syncEndBufferMillis); @@ -185,11 +185,11 @@ this.lastSuccessfulSynchTime = entitySync.getTimestamp("lastSuccessfulSynchTime"); this.entityModelToUseList = this.makeEntityModelToUseList(); this.entityNameToUseSet = this.makeEntityNameToUseSet(); - + // set start and end times for the first/current pass this.currentRunStartTime = getCurrentRunStartTime(lastSuccessfulSynchTime, entityModelToUseList, delegator); this.setCurrentRunEndTime(); - + // this is mostly for the pull side... will always be null for at the beginning of a push process, to be filled in later this.startDate = (Timestamp) context.get("startDate"); } catch (GenericEntityException e) { @@ -207,9 +207,9 @@ throw new SyncDataErrorException("Unable to commit transaction", e); } } - + /** - * To see if it is running check: + * To see if it is running check: * - in the running status * - AND when the entitySync was last updated, and if it was more than maxRunningNoUpdateMillis ago, then don't consider it to be running * @return boolean representing if the EntitySync should be considered running @@ -221,7 +221,7 @@ if (!isInRunning) { return false; } - + Timestamp esLastUpdated = this.entitySync.getTimestamp(ModelEntity.STAMP_FIELD); if (esLastUpdated == null) { // shouldn't ever happen, but just in case; assume is running if we don't know when it was last updated @@ -234,18 +234,18 @@ // it has been longer than the maxRunningNoUpdateMillis, so don't consider it running return false; } - + return true; } - + public boolean hasMoreTimeToSync() { return currentRunStartTime.before(syncEndStamp); } - + protected void setCurrentRunEndTime() { this.currentRunEndTime = getNextRunEndTime(); } - + protected Timestamp getNextRunEndTime() { long syncSplit = this.isOfflineSync ? offlineSyncSplitMillis : syncSplitMillis; Timestamp nextRunEndTime = new Timestamp(this.currentRunStartTime.getTime() + syncSplit); @@ -254,7 +254,7 @@ } return nextRunEndTime; } - + public void advanceRunTimes() { this.currentRunStartTime = this.currentRunEndTime; this.setCurrentRunEndTime(); @@ -263,7 +263,7 @@ public void setSplitStartTime() { this.splitStartTime = System.currentTimeMillis(); } - + protected static long getSyncSplitMillis(GenericValue entitySync) { long splitMillis = defaultSyncSplitMillis; Long syncSplitMillis = entitySync.getLong("syncSplitMillis"); @@ -290,7 +290,7 @@ } return syncEndBufferMillis; } - + protected static long getMaxRunningNoUpdateMillis(GenericValue entitySync) { long maxRunningNoUpdateMillis = defaultMaxRunningNoUpdateMillis; Long maxRunningNoUpdateMillisLong = entitySync.getLong("maxRunningNoUpdateMillis"); @@ -299,7 +299,7 @@ } return maxRunningNoUpdateMillis; } - + /** create history record, target service should run in own tx */ public void createInitialHistory() throws SyncDataErrorException, SyncServiceErrorException { String errorMsg = "Not running EntitySync [" + entitySyncId + "], could not create EntitySyncHistory"; @@ -313,20 +313,20 @@ throw new SyncServiceErrorException(errorMsg, e); } } - + public ArrayList<GenericValue> assembleValuesToCreate() throws SyncDataErrorException { // first grab all values inserted in the date range, then get the updates (leaving out all values inserted in the data range) ArrayList<GenericValue> valuesToCreate = new ArrayList<GenericValue>(); // make it an ArrayList to easily merge in sorted lists - + if (this.nextCreateTxTime != null && (this.nextCreateTxTime.equals(currentRunEndTime) || this.nextCreateTxTime.after(currentRunEndTime))) { // this means that for all entities in this pack we found on the last pass that there would be nothing for this one, so just return nothing... return valuesToCreate; } //Debug.logInfo("Getting values to create; currentRunStartTime=" + currentRunStartTime + ", currentRunEndTime=" + currentRunEndTime, module); - + int entitiesSkippedForKnownNext = 0; - + // iterate through entities, get all records with tx stamp in the current time range, put all in a single list for (ModelEntity modelEntity: entityModelToUseList) { int insertBefore = 0; @@ -349,13 +349,13 @@ try { // get the values created within the current time range EntityCondition findValCondition = EntityCondition.makeCondition( - EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunStartTime), + EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunStartTime), EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunEndTime)); EntityListIterator eli = delegator.find(modelEntity.getEntityName(), findValCondition, null, null, UtilMisc.toList(ModelEntity.CREATE_STAMP_TX_FIELD, ModelEntity.CREATE_STAMP_FIELD), null); GenericValue nextValue = null; long valuesPerEntity = 0; while ((nextValue = eli.next()) != null) { - // sort by the tx stamp and then the record stamp + // sort by the tx stamp and then the record stamp // find first value in valuesToStore list, starting with the current insertBefore value, that has a CREATE_STAMP_TX_FIELD after the nextValue.CREATE_STAMP_TX_FIELD, then do the same with CREATE_STAMP_FIELD while (insertBefore < valuesToCreate.size() && valuesToCreate.get(insertBefore).getTimestamp(ModelEntity.CREATE_STAMP_TX_FIELD).before(nextValue.getTimestamp(ModelEntity.CREATE_STAMP_TX_FIELD))) { insertBefore++; @@ -367,16 +367,16 @@ valuesPerEntity++; } eli.close(); - + // definately remove this message and related data gathering //long preCount = delegator.findCountByCondition(modelEntity.getEntityName(), findValCondition, null); //long entityTotalCount = delegator.findCountByCondition(modelEntity.getEntityName(), null, null); //if (entityTotalCount > 0 || preCount > 0 || valuesPerEntity > 0) Debug.logInfo("Got " + valuesPerEntity + "/" + preCount + "/" + entityTotalCount + " values for entity " + modelEntity.getEntityName(), module); - + // if we didn't find anything for this entity, find the next value's Timestamp and keep track of it if (valuesPerEntity == 0) { Timestamp startCheckStamp = new Timestamp(System.currentTimeMillis() - syncEndBufferMillis); - + EntityCondition findNextCondition = EntityCondition.makeCondition( EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.NOT_EQUAL, null), EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunEndTime)); @@ -404,7 +404,7 @@ } catch (GenericEntityException e) { try { TransactionUtil.rollback(beganTransaction, "Entity Engine error in assembleValuesToCreate", e); - + } catch (GenericTransactionException e2) { Debug.logWarning(e2, "Unable to call rollback()", module); } @@ -428,7 +428,7 @@ if (entitiesSkippedForKnownNext > 0) { if (Debug.infoOn()) Debug.logInfo("In assembleValuesToCreate skipped [" + entitiesSkippedForKnownNext + "/" + entityModelToUseList + "] entities for the time period ending at [" + currentRunEndTime + "] because of next known create times", module); } - + // TEST SECTION: leave false for normal use boolean logValues = false; if (logValues && valuesToCreate.size() > 0) { @@ -443,7 +443,7 @@ } Debug.logInfo(toCreateInfo.toString(), module); } - + return valuesToCreate; } @@ -457,13 +457,13 @@ } // Debug.logInfo("Getting values to store; currentRunStartTime=" + currentRunStartTime + ", currentRunEndTime=" + currentRunEndTime, module); - + int entitiesSkippedForKnownNext = 0; - + // iterate through entities, get all records with tx stamp in the current time range, put all in a single list for (ModelEntity modelEntity: entityModelToUseList) { int insertBefore = 0; - + // first test to see if we know that there are no records for this entity in this time period... Timestamp knownNextUpdateTime = this.nextEntityUpdateTxTime.get(modelEntity.getEntityName()); if (knownNextUpdateTime != null && (knownNextUpdateTime.equals(currentRunEndTime) || knownNextUpdateTime.after(currentRunEndTime))) { @@ -481,18 +481,18 @@ try { // get all values that were updated, but NOT created in the current time range; if no info on created stamp, that's okay we'll include it here because it won't have been included in the valuesToCreate list EntityCondition createdBeforeStartCond = EntityCondition.makeCondition( - EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.EQUALS, null), - EntityOperator.OR, + EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.EQUALS, null), + EntityOperator.OR, EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunStartTime)); EntityCondition findValCondition = EntityCondition.makeCondition( - EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunStartTime), - EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunEndTime), + EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunStartTime), + EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunEndTime), createdBeforeStartCond); EntityListIterator eli = delegator.find(modelEntity.getEntityName(), findValCondition, null, null, UtilMisc.toList(ModelEntity.STAMP_TX_FIELD, ModelEntity.STAMP_FIELD), null); GenericValue nextValue = null; long valuesPerEntity = 0; while ((nextValue = eli.next()) != null) { - // sort by the tx stamp and then the record stamp + // sort by the tx stamp and then the record stamp // find first value in valuesToStore list, starting with the current insertBefore value, that has a STAMP_TX_FIELD after the nextValue.STAMP_TX_FIELD, then do the same with STAMP_FIELD while (insertBefore < valuesToStore.size() && valuesToStore.get(insertBefore).getTimestamp(ModelEntity.STAMP_TX_FIELD).before(nextValue.getTimestamp(ModelEntity.STAMP_TX_FIELD))) { insertBefore++; @@ -504,7 +504,7 @@ valuesPerEntity++; } eli.close(); - + // definately remove this message and related data gathering //long preCount = delegator.findCountByCondition(modelEntity.getEntityName(), findValCondition, null); //long entityTotalCount = delegator.findCountByCondition(modelEntity.getEntityName(), null, null); @@ -513,9 +513,9 @@ // if we didn't find anything for this entity, find the next value's Timestamp and keep track of it if (valuesPerEntity == 0) { Timestamp startCheckStamp = new Timestamp(System.currentTimeMillis() - syncEndBufferMillis); - + EntityCondition findNextCondition = EntityCondition.makeCondition( - EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.NOT_EQUAL, null), + EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.NOT_EQUAL, null), EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunEndTime), EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.NOT_EQUAL, null), EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunEndTime)); @@ -566,7 +566,7 @@ if (entitiesSkippedForKnownNext > 0) { if (Debug.infoOn()) Debug.logInfo("In assembleValuesToStore skipped [" + entitiesSkippedForKnownNext + "/" + entityModelToUseList + "] entities for the time period ending at [" + currentRunEndTime + "] because of next known update times", module); } - + // TEST SECTION: leave false for normal use boolean logValues = false; if (logValues && valuesToStore.size() > 0) { @@ -581,7 +581,7 @@ } Debug.logInfo(toStoreInfo.toString(), module); } - + return valuesToStore; } @@ -606,12 +606,12 @@ try { // find all instances of this entity with the STAMP_TX_FIELD != null, sort ascending to get lowest/oldest value first, then grab first and consider as candidate currentRunStartTime EntityCondition findValCondition = EntityCondition.makeCondition( - EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunStartTime), + EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunStartTime), EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunEndTime)); EntityListIterator removeEli = delegator.find("EntitySyncRemove", findValCondition, null, null, UtilMisc.toList(ModelEntity.STAMP_TX_FIELD, ModelEntity.STAMP_FIELD), null); GenericValue entitySyncRemove = null; while ((entitySyncRemove = removeEli.next()) != null) { - // pull the PK from the EntitySyncRemove in the primaryKeyRemoved field, de-XML-serialize it + // pull the PK from the EntitySyncRemove in the primaryKeyRemoved field, de-XML-serialize it String primaryKeyRemoved = entitySyncRemove.getString("primaryKeyRemoved"); GenericEntity pkToRemove = null; try { @@ -633,7 +633,7 @@ Debug.logError(e, errorMsg, module); throw new SyncDataErrorException(errorMsg, e); } - + // set the stamp fields for future reference pkToRemove.set(ModelEntity.STAMP_TX_FIELD, entitySyncRemove.get(ModelEntity.STAMP_TX_FIELD)); pkToRemove.set(ModelEntity.STAMP_FIELD, entitySyncRemove.get(ModelEntity.STAMP_FIELD)); @@ -696,10 +696,10 @@ } Debug.logInfo(toRemoveInfo.toString(), module); } - + return keysToRemove; } - + public void saveResultsReportedFromDataStore() throws SyncDataErrorException, SyncServiceErrorException { try { long runningTimeMillis = System.currentTimeMillis() - startDate.getTime(); @@ -712,7 +712,7 @@ if (splitTotalTime > this.perSplitMaxMillis) { this.perSplitMaxMillis = splitTotalTime; } - + // start the timer for the next split setSplitStartTime(); @@ -734,8 +734,8 @@ Map<String, Object> updateEsRunResult = dispatcher.runSync("updateEntitySyncRunning", UtilMisc.toMap("entitySyncId", entitySyncId, "lastSuccessfulSynchTime", this.currentRunEndTime, "userLogin", userLogin)); // store result of service call on history with results so far, should run in own tx - Map<String, Object> updateHistoryMap = UtilMisc.toMap("entitySyncId", entitySyncId, "startDate", startDate, - "lastSuccessfulSynchTime", this.currentRunEndTime, "lastCandidateEndTime", this.getNextRunEndTime(), + Map<String, Object> updateHistoryMap = UtilMisc.toMap("entitySyncId", entitySyncId, "startDate", startDate, + "lastSuccessfulSynchTime", this.currentRunEndTime, "lastCandidateEndTime", this.getNextRunEndTime(), "lastSplitStartTime", Long.valueOf(this.splitStartTime)); updateHistoryMap.put("toCreateInserted", Long.valueOf(toCreateInserted)); updateHistoryMap.put("toCreateUpdated", Long.valueOf(toCreateUpdated)); @@ -758,13 +758,13 @@ updateHistoryMap.put("perSplitMaxItems", Long.valueOf(perSplitMaxItems)); updateHistoryMap.put("userLogin", userLogin); Map<String, Object> updateEsHistRunResult = dispatcher.runSync("updateEntitySyncHistory", updateHistoryMap); - + // now we have updated EntitySync and EntitySyncHistory, check both ops for errors... if (ServiceUtil.isError(updateEsRunResult)) { String errorMsg = "Error running EntitySync [" + entitySyncId + "], update of EntitySync record with lastSuccessfulSynchTime failed."; throw new SyncDataErrorException(errorMsg, null, null, updateEsRunResult, null); } - + if (ServiceUtil.isError(updateEsHistRunResult)) { String errorMsg = "Error running EntitySync [" + entitySyncId + "], update of EntitySyncHistory (startDate:[" + startDate + "]) record with lastSuccessfulSynchTime and result stats failed."; throw new SyncDataErrorException(errorMsg, null, null, updateEsHistRunResult, null); @@ -773,7 +773,7 @@ throw new SyncServiceErrorException("Error saving results reported from data store", e); } } - + public void saveFinalSyncResults() throws SyncDataErrorException, SyncServiceErrorException { String newStatusId = "ESR_COMPLETE"; if (this.isOfflineSync && totalRowsExported > 0) { @@ -791,7 +791,7 @@ } catch (GenericServiceException e) { throw new SyncServiceErrorException(esErrMsg, e); } - + // if nothing moved over, remove the history record, otherwise store status long totalRows = totalRowsToCreate + totalRowsToStore + totalRowsToRemove; if (totalRows == 0) { @@ -817,7 +817,7 @@ throw new SyncServiceErrorException(eshCompleteErrMsg, e); } } - + if (Debug.infoOn()) Debug.logInfo("Finished saveFinalSyncResults [" + entitySyncId + "]: totalRows=" + totalRows + ", totalRowsToCreate=" + totalRowsToCreate + ", totalRowsToStore=" + totalRowsToStore + ", totalRowsToRemove=" + totalRowsToRemove, module); } @@ -828,7 +828,7 @@ } return entityNameToUseSet; } - + /** prepare a list of all entities we want to synchronize: remove all view-entities and all entities that don't match the patterns attached to this EntitySync */ protected List<ModelEntity> makeEntityModelToUseList() throws GenericEntityException { List<GenericValue> entitySyncIncludes = entitySync.getRelated("EntitySyncInclude"); @@ -837,7 +837,7 @@ entitySyncIncludes.addAll(entitySyncGroupIncludes); List<ModelEntity> entityModelToUseList = EntityGroupUtil.getModelEntitiesFromRecords(entitySyncIncludes, delegator, true); - + if (Debug.infoOn()) Debug.logInfo("In makeEntityModelToUseList for EntitySync with ID [" + entitySync.get("entitySyncId") + "] syncing " + entityModelToUseList.size() + " entities", module); return entityModelToUseList; } @@ -895,12 +895,12 @@ if (UtilValidate.isEmpty(targetServiceName)) { throw new SyncAbortException("Not running EntitySync [" + entitySyncId + "], no targetServiceName is specified, where do we send the data?"); } - + // check to see if this sync is already running, if so return error if (this.isEntitySyncRunning()) { throw new SyncAbortException("Not running EntitySync [" + entitySyncId + "], an instance is already running."); } - + String markErrorMsg = "Could not start Entity Sync service, could not mark as running"; try { // not running, get started NOW @@ -912,11 +912,11 @@ } catch (GenericServiceException e) { throw new SyncServiceErrorException(markErrorMsg, e); } - + // finally create the initial history record this.createInitialHistory(); } - + public long setTotalRowCounts(ArrayList<GenericValue> valuesToCreate, ArrayList<GenericValue> valuesToStore, List<GenericEntity> keysToRemove) { this.totalRowsToCreate = valuesToCreate.size(); this.totalRowsToStore = valuesToStore.size(); @@ -924,11 +924,11 @@ this.totalRowsPerSplit = this.totalRowsToCreate + this.totalRowsToStore + this.totalRowsToRemove; return this.totalRowsPerSplit; } - + public void runPushSendData(ArrayList<GenericValue> valuesToCreate, ArrayList<GenericValue> valuesToStore, List<GenericEntity> keysToRemove) throws SyncOtherErrorException, SyncServiceErrorException { // grab the totals for this data this.setTotalRowCounts(valuesToCreate, valuesToStore, keysToRemove); - + // call service named on EntitySync, IFF there is actually data to send over if (this.totalRowsPerSplit > 0) { Map<String, Object> targetServiceMap = UtilMisc.toMap("entitySyncId", entitySyncId, "valuesToCreate", valuesToCreate, "valuesToStore", valuesToStore, "keysToRemove", keysToRemove, "userLogin", userLogin); @@ -941,9 +941,9 @@ if (ServiceUtil.isError(remoteStoreResult)) { throw new SyncOtherErrorException(serviceErrorMsg, null, null, remoteStoreResult, null); } - + this.totalStoreCalls++; - + long toCreateInsertedCur = remoteStoreResult.get("toCreateInserted") == null ? 0 : ((Long) remoteStoreResult.get("toCreateInserted")).longValue(); long toCreateUpdatedCur = remoteStoreResult.get("toCreateUpdated") == null ? 0 : ((Long) remoteStoreResult.get("toCreateUpdated")).longValue(); long toCreateNotUpdatedCur = remoteStoreResult.get("toCreateNotUpdated") == null ? 0 : ((Long) remoteStoreResult.get("toCreateNotUpdated")).longValue(); @@ -952,7 +952,7 @@ long toStoreNotUpdatedCur = remoteStoreResult.get("toStoreNotUpdated") == null ? 0 : ((Long) remoteStoreResult.get("toStoreNotUpdated")).longValue(); long toRemoveDeletedCur = remoteStoreResult.get("toRemoveDeleted") == null ? 0 : ((Long) remoteStoreResult.get("toRemoveDeleted")).longValue(); long toRemoveAlreadyDeletedCur = remoteStoreResult.get("toRemoveAlreadyDeleted") == null ? 0 : ((Long) remoteStoreResult.get("toRemoveAlreadyDeleted")).longValue(); - + this.toCreateInserted += toCreateInsertedCur; this.toCreateUpdated += toCreateUpdatedCur; this.toCreateNotUpdated += toCreateNotUpdatedCur; @@ -966,14 +966,14 @@ } } } - + // ======================== PULL Methods ======================== public void runPullStartOrRestoreSavedResults() throws SyncDataErrorException, SyncServiceErrorException, SyncAbortException { // if EntitySync.statusId is ESR_RUNNING, make sure startDate matches EntitySync.lastHistoryStartDate; or return error if (isEntitySyncRunning() && this.startDate == null) { throw new SyncAbortException("Not running EntitySync [" + entitySyncId + "], an instance is already running and no startDate for the current run was passed."); } - + if (this.startDate == null) { // get it started! String markErrorMsg = "Could not start Entity Sync service, could not mark as running"; @@ -987,7 +987,7 @@ } catch (GenericServiceException e) { throw new SyncServiceErrorException(markErrorMsg, e); } - + // finally create the initial history record this.createInitialHistory(); this.setSplitStartTime(); @@ -1021,7 +1021,7 @@ } catch (GenericEntityException e) { throw new SyncDataErrorException("Error getting existing EntitySyncHistory values", e); } - + // got the previous values, now add to them with the values from the context... this.toCreateInserted += UtilMisc.toLong(this.context.get("toCreateInserted")); this.toCreateUpdated += UtilMisc.toLong(this.context.get("toCreateUpdated")); @@ -1031,7 +1031,7 @@ this.toStoreNotUpdated += UtilMisc.toLong(this.context.get("toStoreNotUpdated")); this.toRemoveDeleted += UtilMisc.toLong(this.context.get("toRemoveDeleted")); this.toRemoveAlreadyDeleted += UtilMisc.toLong(this.context.get("toRemoveAlreadyDeleted")); - + this.totalStoreCalls++; this.saveResultsReportedFromDataStore(); @@ -1109,7 +1109,7 @@ public SyncErrorException(String str, List<Object> errorMsgList, Map<String, Object> errorMsgMap, Map<String, Object> nestedServiceResult, Throwable nested) { super(str, errorMsgList, errorMsgMap, nestedServiceResult, nested); } public abstract void saveSyncErrorInfo(EntitySyncContext esc); } - + /** This class signifies an error condition, so the state of the EntitySync value and the EntitySyncHistory value in the datasource should be changed to reflect the error */ public static class SyncOtherErrorException extends SyncErrorException { public SyncOtherErrorException() { super(); } @@ -1125,7 +1125,7 @@ } } } - + /** This class signifies an error condition, so the state of the EntitySync value and the EntitySyncHistory value in the datasource should be changed to reflect the error */ public static class SyncDataErrorException extends SyncErrorException { public SyncDataErrorException() { super(); } @@ -1141,7 +1141,7 @@ } } } - + /** This class signifies an error condition, so the state of the EntitySync value and the EntitySyncHistory value in the datasource should be changed to reflect the error */ public static class SyncServiceErrorException extends SyncErrorException { public SyncServiceErrorException() { super(); } Modified: ofbiz/trunk/framework/entityext/src/org/ofbiz/entityext/synchronization/EntitySyncServices.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/entityext/src/org/ofbiz/entityext/synchronization/EntitySyncServices.java?rev=757070&r1=757069&r2=757070&view=diff ============================================================================== --- ofbiz/trunk/framework/entityext/src/org/ofbiz/entityext/synchronization/EntitySyncServices.java (original) +++ ofbiz/trunk/framework/entityext/src/org/ofbiz/entityext/synchronization/EntitySyncServices.java Sat Mar 21 23:23:34 2009 @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -66,9 +66,9 @@ * Entity Engine Sync Services */ public class EntitySyncServices { - + public static final String module = EntitySyncServices.class.getName(); - + /** * Run an Entity Sync (checks to see if other already running, etc) *@param dctx The DispatchContext that this service is operating in @@ -88,41 +88,41 @@ // increment starting time to run until now esc.setSplitStartTime(); // just run this the first time, will be updated between each loop automatically while (esc.hasMoreTimeToSync()) { - + // this will result in lots of log messages, so leaving commented out unless needed/wanted later // Debug.logInfo("Doing runEntitySync split, currentRunStartTime=" + esc.currentRunStartTime + ", currentRunEndTime=" + esc.currentRunEndTime, module); - + esc.totalSplits++; - + // tx times are indexed // keep track of how long these sync runs take and store that info on the history table // saves info about removed, all entities that don't have no-auto-stamp set, this will be done in the GenericDAO like the stamp sets - + // ===== INSERTS ===== ArrayList<GenericValue> valuesToCreate = esc.assembleValuesToCreate(); // ===== UPDATES ===== ArrayList<GenericValue> valuesToStore = esc.assembleValuesToStore(); // ===== DELETES ===== List<GenericEntity> keysToRemove = esc.assembleKeysToRemove(); - + esc.runPushSendData(valuesToCreate, valuesToStore, keysToRemove); - + esc.saveResultsReportedFromDataStore(); esc.advanceRunTimes(); } esc.saveFinalSyncResults(); - + } catch (SyncAbortException e) { return e.returnError(module); } catch (SyncErrorException e) { e.saveSyncErrorInfo(esc); return e.returnError(module); } - + return ServiceUtil.returnSuccess(); } - + /** * Store Entity Sync Data *@param dctx The DispatchContext that this service is operating in @@ -139,7 +139,7 @@ } } //LocalDispatcher dispatcher = dctx.getDispatcher(); - + String entitySyncId = (String) context.get("entitySyncId"); // incoming lists will already be sorted by lastUpdatedStamp (or lastCreatedStamp) List valuesToCreate = (List) context.get("valuesToCreate"); @@ -156,14 +156,14 @@ long toStoreNotUpdated = 0; long toRemoveDeleted = 0; long toRemoveAlreadyDeleted = 0; - + // create all values in the valuesToCreate List; if the value already exists update it, or if exists and was updated more recently than this one dont update it Iterator valueToCreateIter = valuesToCreate.iterator(); while (valueToCreateIter.hasNext()) { GenericValue valueToCreate = (GenericValue) valueToCreateIter.next(); // to Create check if exists (find by pk), if not insert; if exists check lastUpdatedStamp: if null or before the candidate value insert, otherwise don't insert // NOTE: use the delegator from this DispatchContext rather than the one named in the GenericValue - + // maintain the original timestamps when doing storage of synced data, by default with will update the timestamps to now valueToCreate.setIsFromEntitySync(true); @@ -184,13 +184,13 @@ } } } - + // iterate through to store list and store each Iterator valueToStoreIter = valuesToStore.iterator(); while (valueToStoreIter.hasNext()) { GenericValue valueToStore = (GenericValue) valueToStoreIter.next(); // to store check if exists (find by pk), if not insert; if exists check lastUpdatedStamp: if null or before the candidate value insert, otherwise don't insert - + // maintain the original timestamps when doing storage of synced data, by default with will update the timestamps to now valueToStore.setIsFromEntitySync(true); @@ -211,22 +211,22 @@ } } } - + // iterate through to remove list and remove each Iterator keyToRemoveIter = keysToRemove.iterator(); while (keyToRemoveIter.hasNext()) { GenericEntity pkToRemove = (GenericEntity) keyToRemoveIter.next(); - + // check to see if it exists, if so remove and count, if not just count already removed // always do a removeByAnd, if it was a removeByAnd great, if it was a removeByPrimaryKey, this will also work and save us a query pkToRemove.setIsFromEntitySync(true); - + // remove the stamp fields inserted by EntitySyncContext.java at or near line 646 pkToRemove.remove(ModelEntity.STAMP_TX_FIELD); pkToRemove.remove(ModelEntity.STAMP_FIELD); pkToRemove.remove(ModelEntity.CREATE_STAMP_TX_FIELD); pkToRemove.remove(ModelEntity.CREATE_STAMP_FIELD); - + int numRemByAnd = delegator.removeByAnd(pkToRemove.getEntityName(), pkToRemove); if (numRemByAnd == 0) { toRemoveAlreadyDeleted++; @@ -234,7 +234,7 @@ toRemoveDeleted++; } } - + Map<String, Object> result = ServiceUtil.returnSuccess(); result.put("toCreateInserted", Long.valueOf(toCreateInserted)); result.put("toCreateUpdated", Long.valueOf(toCreateUpdated)); @@ -265,7 +265,7 @@ */ public static Map runPullEntitySync(DispatchContext dctx, Map context) { LocalDispatcher dispatcher = dctx.getDispatcher(); - + String entitySyncId = (String) context.get("entitySyncId"); String remotePullAndReportEntitySyncDataName = (String) context.get("remotePullAndReportEntitySyncDataName"); @@ -273,7 +273,7 @@ // loop until no data is returned to store boolean gotMoreData = true; - + Timestamp startDate = null; Long toCreateInserted = null; Long toCreateUpdated = null; @@ -283,10 +283,10 @@ Long toStoreNotUpdated = null; Long toRemoveDeleted = null; Long toRemoveAlreadyDeleted = null; - + while (gotMoreData) { gotMoreData = false; - + // call pullAndReportEntitySyncData, initially with no results, then with results from last loop Map<String, Object> remoteCallContext = FastMap.newInstance(); remoteCallContext.put("entitySyncId", entitySyncId); @@ -302,24 +302,24 @@ remoteCallContext.put("toStoreNotUpdated", toStoreNotUpdated); remoteCallContext.put("toRemoveDeleted", toRemoveDeleted); remoteCallContext.put("toRemoveAlreadyDeleted", toRemoveAlreadyDeleted); - + try { Map<String, Object> result = dispatcher.runSync(remotePullAndReportEntitySyncDataName, remoteCallContext); if (ServiceUtil.isError(result)) { String errMsg = "Error calling remote pull and report EntitySync service with name: " + remotePullAndReportEntitySyncDataName; return ServiceUtil.returnError(errMsg, null, null, result); } - + startDate = (Timestamp) result.get("startDate"); - + try { // store data returned, get results (just call storeEntitySyncData locally, get the numbers back and boom shakalaka) - + // anything to store locally? - if (startDate != null && (!UtilValidate.isEmpty((Collection) result.get("valuesToCreate")) || + if (startDate != null && (!UtilValidate.isEmpty((Collection) result.get("valuesToCreate")) || !UtilValidate.isEmpty((Collection) result.get("valuesToStore")) || !UtilValidate.isEmpty((Collection) result.get("keysToRemove")))) { - + // yep, we got more data gotMoreData = true; @@ -330,18 +330,18 @@ if (valuesToStore == null) valuesToStore = Collections.emptyList(); List<GenericEntity> keysToRemove = checkList(result.get("keysToRemove"), GenericEntity.class); if (keysToRemove == null) keysToRemove = Collections.emptyList(); - + Map<String, Object> callLocalStoreContext = UtilMisc.toMap("entitySyncId", entitySyncId, "delegatorName", context.get("localDelegatorName"), - "valuesToCreate", valuesToCreate, "valuesToStore", valuesToStore, + "valuesToCreate", valuesToCreate, "valuesToStore", valuesToStore, "keysToRemove", keysToRemove); - + callLocalStoreContext.put("userLogin", context.get("userLogin")); Map<String, Object> storeResult = dispatcher.runSync("storeEntitySyncData", callLocalStoreContext); if (ServiceUtil.isError(storeResult)) { String errMsg = "Error calling service to store data locally"; return ServiceUtil.returnError(errMsg, null, null, storeResult); } - + // get results for next pass toCreateInserted = (Long) storeResult.get("toCreateInserted"); toCreateUpdated = (Long) storeResult.get("toCreateUpdated"); @@ -367,7 +367,7 @@ return ServiceUtil.returnError(errMsg); } } - + return ServiceUtil.returnSuccess(); } @@ -381,9 +381,9 @@ EntitySyncContext esc = null; try { esc = new EntitySyncContext(dctx, context); - + Debug.logInfo("Doing pullAndReportEntitySyncData for entitySyncId=" + esc.entitySyncId + ", currentRunStartTime=" + esc.currentRunStartTime + ", currentRunEndTime=" + esc.currentRunEndTime, module); - + if ("Y".equals(esc.entitySync.get("forPushOnly"))) { return ServiceUtil.returnError("Cannot do Entity Sync Pull because entitySyncId [] is set for Push Only."); } @@ -391,19 +391,19 @@ // Part 1: if any results are passed, store the results for the given startDate, update EntitySync, etc // restore info from last pull, or if no results start new run esc.runPullStartOrRestoreSavedResults(); - + // increment starting time to run until now while (esc.hasMoreTimeToSync()) { // make sure the following message is commented out before commit: // Debug.logInfo("(loop)Doing pullAndReportEntitySyncData split, currentRunStartTime=" + esc.currentRunStartTime + ", currentRunEndTime=" + esc.currentRunEndTime, module); - + esc.totalSplits++; - + // tx times are indexed // keep track of how long these sync runs take and store that info on the history table // saves info about removed, all entities that don't have no-auto-stamp set, this will be done in the GenericDAO like the stamp sets - + // Part 2: get the next set of data for the given entitySyncId // Part 2a: return it back for storage but leave the EntitySyncHistory without results, and don't update the EntitySync last time @@ -413,10 +413,10 @@ ArrayList<GenericValue> valuesToStore = esc.assembleValuesToStore(); // ===== DELETES ===== List<GenericEntity> keysToRemove = esc.assembleKeysToRemove(); - + esc.setTotalRowCounts(valuesToCreate, valuesToStore, keysToRemove); - if (Debug.infoOn()) Debug.logInfo("Service pullAndReportEntitySyncData returning - [" + valuesToCreate.size() + "] to create; [" + valuesToStore.size() + "] to store; [" + keysToRemove.size() + "] to remove; [" + esc.totalRowsPerSplit + "] total rows per split.", module); + if (Debug.infoOn()) Debug.logInfo("Service pullAndReportEntitySyncData returning - [" + valuesToCreate.size() + "] to create; [" + valuesToStore.size() + "] to store; [" + keysToRemove.size() + "] to remove; [" + esc.totalRowsPerSplit + "] total rows per split.", module); if (esc.totalRowsPerSplit > 0) { // stop if we found some data, otherwise look and try again Map<String, Object> result = ServiceUtil.returnSuccess(); @@ -431,7 +431,7 @@ esc.advanceRunTimes(); } } - + // if no more results from database to return, save final settings if (!esc.hasMoreTimeToSync() ) { esc.saveFinalSyncResults(); @@ -599,11 +599,11 @@ public static Map<String, Object> cleanSyncRemoveInfo(DispatchContext dctx, Map<String, ? extends Object> context) { Debug.logInfo("Running cleanSyncRemoveInfo", module); GenericDelegator delegator = dctx.getDelegator(); - + try { // find the largest keepRemoveInfoHours value on an EntitySyncRemove and kill everything before that, if none found default to 10 days (240 hours) double keepRemoveInfoHours = 24; - + List<GenericValue> entitySyncRemoveList = delegator.findList("EntitySync", null, null, null, null, false); for (GenericValue entitySyncRemove: entitySyncRemoveList) { Double curKrih = entitySyncRemove.getDouble("keepRemoveInfoHours"); @@ -614,18 +614,18 @@ } } } - - + + int keepSeconds = (int) Math.floor(keepRemoveInfoHours * 60); - + Calendar nowCal = Calendar.getInstance(); nowCal.setTimeInMillis(System.currentTimeMillis()); nowCal.add(Calendar.SECOND, -keepSeconds); Timestamp keepAfterStamp = new Timestamp(nowCal.getTimeInMillis()); - + int numRemoved = delegator.removeByCondition("EntitySyncRemove", EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.LESS_THAN, keepAfterStamp)); Debug.logInfo("In cleanSyncRemoveInfo removed [" + numRemoved + "] values with TX timestamp before [" + keepAfterStamp + "]", module); - + return ServiceUtil.returnSuccess(); } catch (GenericEntityException e) { String errorMsg = "Error cleaning out EntitySyncRemove info: " + e.toString(); |
Free forum by Nabble | Edit this page |