Skip to content

Commit

Permalink
Use native JDBC for updating channel definition type and implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
filiphr committed Sep 30, 2024
1 parent 720ec94 commit 5388dce
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,19 @@
*/
package org.flowable.eventregistry.impl.cmd;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.List;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.commons.io.IOUtils;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.common.engine.impl.context.Context;
import org.flowable.common.engine.impl.db.DbUpgradeStep;
import org.flowable.common.engine.impl.db.SchemaManagerDatabaseConfiguration;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.eventregistry.api.ChannelDefinition;
import org.flowable.eventregistry.api.EventRepositoryService;
import org.flowable.eventregistry.impl.EventRegistryEngineConfiguration;
import org.flowable.eventregistry.impl.persistence.entity.ChannelDefinitionEntityManager;
import org.flowable.eventregistry.impl.util.CommandContextUtil;
import org.flowable.eventregistry.model.ChannelModel;

Expand All @@ -42,25 +40,73 @@ public void execute() throws Exception {

@Override
public Void execute(CommandContext commandContext) {
SchemaManagerDatabaseConfiguration databaseConfiguration = getDatabaseConfiguration(commandContext);
Connection connection = databaseConfiguration.getConnection();
EventRegistryEngineConfiguration configuration = CommandContextUtil.getEventRegistryConfiguration(commandContext);
EventRepositoryService repositoryService = configuration.getEventRepositoryService();
List<ChannelDefinition> channelDefinitions = repositoryService.createChannelDefinitionQuery().list();
ChannelDefinitionEntityManager entityManager = configuration.getChannelDefinitionEntityManager();
String encoding = configuration.getXmlEncoding();
Charset encodingCharset = encoding != null ? Charset.forName(encoding) : Charset.defaultCharset();
for (ChannelDefinition channelDefinition : channelDefinitions) {
// We are explicitly not using EventRepositoryService#getChannelModelById.
// When the repository service is used, then it will trigger a deployment of the channel.
// However, a channel should not be deployed that early during the update
ChannelModel model;
try (InputStream stream = repositoryService.getResourceAsStream(channelDefinition.getDeploymentId(), channelDefinition.getResourceName())) {
model = configuration.getChannelJsonConverter().convertToChannelModel(IOUtils.toString(stream, encodingCharset));
} catch (IOException e) {
throw new FlowableException("Failed to close resource", e);

String channelDefinitionTableName = "FLW_CHANNEL_DEFINITION";
String eventResourceTableName = "FLW_EVENT_RESOURCE";

if (!databaseConfiguration.isTablePrefixIsSchema()) {
channelDefinitionTableName = databaseConfiguration.getDatabaseTablePrefix() + channelDefinitionTableName;
eventResourceTableName = databaseConfiguration.getDatabaseTablePrefix() + eventResourceTableName;
}

try (PreparedStatement queryStatement = connection.prepareStatement("""
select DEF.ID_ as ID_, RES.RESOURCE_BYTES_ as RESOURCE_BYTES_
from %s DEF
inner join %s RES on DEF.DEPLOYMENT_ID_ = RES.DEPLOYMENT_ID_ and DEF.RESOURCE_NAME_ = RES.NAME_
""".formatted(channelDefinitionTableName, eventResourceTableName));
PreparedStatement updateStatement = connection.prepareStatement(
"update %s set TYPE_ = ?, IMPLEMENTATION_ = ? where ID_ = ?".formatted(channelDefinitionTableName)
)
) {
ResultSet resultSet = queryStatement.executeQuery();
while (resultSet.next()) {
String definitionId = resultSet.getString("ID_");
byte[] resourceBytes = getResourceBytes(resultSet, databaseConfiguration);
if (resourceBytes == null) {
continue;
}

ChannelModel model = configuration.getChannelJsonConverter().convertToChannelModel(new String(resourceBytes, encodingCharset));
String type = model.getChannelType();
if (type == null) {
updateStatement.setNull(1, java.sql.Types.VARCHAR);
} else {
updateStatement.setString(1, type);
}
String implementation = model.getType();
if (implementation == null) {
updateStatement.setNull(2, java.sql.Types.VARCHAR);
} else {
updateStatement.setString(2, implementation);
}
updateStatement.setString(3, definitionId);
updateStatement.executeUpdate();
}
entityManager.updateChannelDefinitionTypeAndImplementation(channelDefinition.getId(), model.getChannelType(), model.getType());
} catch (SQLException e) {
throw new RuntimeException(e);
}

return null;
}

protected byte[] getResourceBytes(ResultSet resultSet, SchemaManagerDatabaseConfiguration databaseConfiguration) throws SQLException {
if ("postgres".equals(databaseConfiguration.getDatabaseType())) {
return resultSet.getBytes("RESOURCE_BYTES_");
}

Blob blob = resultSet.getBlob("RESOURCE_BYTES_");
if (blob != null) {
return blob.getBytes(1, (int) blob.length());
}
return null;
}

protected SchemaManagerDatabaseConfiguration getDatabaseConfiguration(CommandContext commandContext) {
return commandContext.getSession(SchemaManagerDatabaseConfiguration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ public interface ChannelDefinitionEntityManager extends EntityManager<ChannelDef

void updateChannelDefinitionTenantIdForDeployment(String deploymentId, String newTenantId);

void updateChannelDefinitionTypeAndImplementation(String channelDefinitionId, String type, String implementation);

void deleteChannelDefinitionsByDeploymentId(String deploymentId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,4 @@ public void updateChannelDefinitionTenantIdForDeployment(String deploymentId, St
dataManager.updateChannelDefinitionTenantIdForDeployment(deploymentId, newTenantId);
}

@Override
public void updateChannelDefinitionTypeAndImplementation(String channelDefinitionId, String type, String implementation) {
dataManager.updateChannelDefinitionTypeAndImplementation(channelDefinitionId, type, implementation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,4 @@ public interface ChannelDefinitionDataManager extends DataManager<ChannelDefinit

void updateChannelDefinitionTenantIdForDeployment(String deploymentId, String newTenantId);

void updateChannelDefinitionTypeAndImplementation(String channelDefinitionId, String type, String implementation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,4 @@ public void updateChannelDefinitionTenantIdForDeployment(String deploymentId, St
getDbSqlSession().directUpdate("updateChannelDefinitionTenantIdForDeploymentId", params);
}

@Override
public void updateChannelDefinitionTypeAndImplementation(String channelDefinitionId, String type, String implementation) {
Map<String, Object> params = new HashMap<>();
params.put("id", channelDefinitionId);
params.put("type", type);
params.put("implementation", implementation);
getDbSqlSession().directUpdate("updateChannelDefinitionTypeAndImplementationById", params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,6 @@
DEPLOYMENT_ID_ = #{deploymentId, jdbcType=VARCHAR}
</update>

<update id="updateChannelDefinitionTypeAndImplementationById" parameterType="java.util.Map">
update ${prefix}FLW_CHANNEL_DEFINITION
<set>
TYPE_ = #{type, jdbcType=VARCHAR},
IMPLEMENTATION_ = #{implementation, jdbcType=VARCHAR},
</set>
where
ID_ = #{id, jdbcType=VARCHAR}
</update>

<!-- DEFINITION DELETE -->

<delete id="deleteChannelDefinitionsByDeploymentId" parameterType="string">
Expand Down

0 comments on commit 5388dce

Please sign in to comment.