Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 0005074 CSV to database replication not working #174

Open
wants to merge 4 commits into
base: 3.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion symmetric-assemble/src/asciidoc/examples/csv2db.ad
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ NOTE: 'TARGET_BASE_DIR' should be changed to your target base directory.

endif::pro[]

* Testing. You are now ready to test your csv to database routing. Add a csv file that matches your table specifications to your base directory or make a change to a pre-existing csv file in the directory. File sync tracker checks for changes in already sync'd files and runs every 5 minutes. File sync pull checks for new files to pull down and runs every 1 minute. Depending on the change you choose (changing a file or adding a file), wait the appropriate amount of time and then verify that the changes are shown in the target table.
* Testing. You are now ready to test your csv to database routing. Add a csv file that matches your table specifications to your base directory or make a change to a pre-existing csv file in the directory. Note that this CSV file must have a first row containing the column names for the table into which you want to insert. File sync tracker checks for changes in already sync'd files and runs every 5 minutes. File sync pull checks for new files to pull down and runs every 1 minute. Depending on the change you choose (changing a file or adding a file), wait the appropriate amount of time and then verify that the changes are shown in the target table.

.The following SQL statement will verify changes to the person table.
[source, SQL]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.service.impl;

import org.junit.Assert;
import java.lang.reflect.Field;
import java.util.Date;

import org.jumpmind.db.sql.ISqlReadCursor;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerReBuildReason;
import org.jumpmind.symmetric.route.AbstractFileParsingRouter;
import org.jumpmind.symmetric.service.impl.DataExtractorService.SelectFromSymDataSource;
import org.junit.Test;
import org.mockito.Mockito;

public class DataExtractorServiceTest {
@SuppressWarnings("unchecked")
@Test
public void selectFromSymDataSource_csvValuesAreExtracted_triggerRouterIsNotMarkedAsMissing()
throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException {
DataExtractorService dataExtractorService = Mockito.mock(DataExtractorService.class);
TriggerRouterService triggerRouterService = Mockito.mock(TriggerRouterService.class);
Mockito.when(triggerRouterService.getTriggerRoutersByTriggerHist("target", false)).thenReturn(null);
Mockito.when(triggerRouterService.getRouterById("fooCsvRouter")).thenReturn(new Router());
ISymmetricDialect symmetricDialect = Mockito.mock(ISymmetricDialect.class);
Mockito.when(symmetricDialect.getBinaryEncoding()).thenReturn(BinaryEncoding.NONE);
Mockito.when(symmetricDialect.getName()).thenReturn("H2");
// mock a cursor with some data from csv router
ISqlReadCursor<Data> mockedCursor = Mockito.mock(ISqlReadCursor.class);
Data csvCapturedData = new Data(1, "1", "1", DataEventType.INSERT, "foo", new Date(), buildVirtualTriggerHistoryForParsedFile(),
"default", null, null);
Mockito.when(mockedCursor.next()).thenReturn(csvCapturedData);

setPrivateField(DataExtractorService.class.getSuperclass(), dataExtractorService, "symmetricDialect", symmetricDialect);
setPrivateField(DataExtractorService.class, dataExtractorService, "triggerRouterService", triggerRouterService);

SelectFromSymDataSource selectFromSymDataSource = dataExtractorService.new SelectFromSymDataSource(new OutgoingBatch(), new Node(),
new Node(), new ProcessInfo(), false);
setPrivateField(SelectFromSymDataSource.class, selectFromSymDataSource, "cursor", mockedCursor);
Assert.assertTrue(selectFromSymDataSource.next().equals(csvCapturedData));
}

protected TriggerHistory buildVirtualTriggerHistoryForParsedFile() {
TriggerHistory virtualTriggerHistory = new TriggerHistory();
virtualTriggerHistory.setColumnNames("foo");
virtualTriggerHistory.setCreateTime(new Date());
virtualTriggerHistory.setLastTriggerBuildReason(TriggerReBuildReason.NEW_TRIGGERS);
virtualTriggerHistory.setPkColumnNames("foo");
virtualTriggerHistory.setSourceTableName("foo");
virtualTriggerHistory.setTriggerHistoryId(1);
virtualTriggerHistory.setTriggerId(AbstractFileParsingRouter.TRIGGER_ID_FILE_PARSER);
return virtualTriggerHistory;
}

protected void setPrivateField(Class<?> objectClass, Object objectWithPrivateField, String fieldName, Object value)
throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException {
Field field = objectClass.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(objectWithPrivateField, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2535,8 +2535,18 @@ public CsvData next() {
data = this.cursor.next();
if (data != null) {
TriggerHistory triggerHistory = data.getTriggerHistory();
TriggerRouter triggerRouter = triggerRoutersByTriggerHist.get(triggerHistory.getTriggerHistoryId());

boolean isFileParserRouter = data.getTriggerHistory().getTriggerId().equals(AbstractFileParsingRouter.TRIGGER_ID_FILE_PARSER);

TriggerRouter triggerRouter;
if (isFileParserRouter) {
triggerRouter = new TriggerRouter();
triggerRouter.setTriggerId(AbstractFileParsingRouter.TRIGGER_ID_FILE_PARSER);
String routerId = AbstractFileParsingRouter.getRouterIdFromExternalData(data.getExternalData());
triggerRouter.setRouter(triggerRouterService.getRouterById(routerId));
triggerRouter.setTrigger(new Trigger());
} else {
triggerRouter = triggerRoutersByTriggerHist.get(triggerHistory.getTriggerHistoryId());
}
if (triggerRouter == null) {
CounterStat counterStat = missingTriggerRoutersByTriggerHist.get(triggerHistory.getTriggerHistoryId());
if (counterStat == null) {
Expand All @@ -2555,7 +2565,6 @@ public CsvData next() {
}

String routerId = triggerRouter.getRouterId();

if (data.getDataEventType() == DataEventType.RELOAD) {
processInfo.setCurrentTableName(triggerHistory.getSourceTableName());

Expand Down Expand Up @@ -2591,7 +2600,6 @@ public CsvData next() {
initialLoadSelect.length() - platform.getDatabaseInfo().getSqlCommandDelimiter().length());
}
}

SelectFromTableEvent event = new SelectFromTableEvent(targetNode,
triggerRouter, triggerHistory, initialLoadSelect);
this.reloadSource = new SelectFromTableSource(outgoingBatch, batch,
Expand All @@ -2606,7 +2614,6 @@ public CsvData next() {
}
} else {
Trigger trigger = triggerRouter.getTrigger();
boolean isFileParserRouter = triggerHistory.getTriggerId().equals(AbstractFileParsingRouter.TRIGGER_ID_FILE_PARSER);
if (lastTriggerHistory == null || lastTriggerHistory.getTriggerHistoryId() != triggerHistory.getTriggerHistoryId() ||
lastRouterId == null || !lastRouterId.equals(routerId)) {

Expand Down