Skip to content

Commit

Permalink
fixed thread exception
Browse files Browse the repository at this point in the history
  • Loading branch information
Panizghi committed Aug 25, 2024
1 parent 897fac7 commit 7551989
Showing 1 changed file with 42 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Anserini: A Lucene toolkit for reproducible information retrieval research
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.anserini.index.generator;

import io.anserini.collection.SourceDocument;
Expand All @@ -18,7 +34,7 @@
/**
* A document generator for creating Lucene documents with SafeTensors dense vector data.
* Implements the LuceneDocumentGenerator interface.
*
*
* @param <T> the type of SourceDocument
*/
public class SafeTensorsDenseVectorDocumentGenerator<T extends SourceDocument> implements LuceneDocumentGenerator<T> {
Expand All @@ -36,20 +52,14 @@ public class SafeTensorsDenseVectorDocumentGenerator<T extends SourceDocument> i
@Override
public Document createDocument(T src) throws InvalidDocumentException {
String docId = src.id();
AtomicBoolean alreadyProcessed = processedDocuments.putIfAbsent(docId, new AtomicBoolean(true));

// Check if the document is already being processed by another thread
if (alreadyProcessed != null && alreadyProcessed.get()) {
LOG.warn("Document ID: " + docId + " is already being processed by another thread.");
return null;
}

try {
LOG.info("Processing document ID: " + src.id() + " with thread: " + Thread.currentThread().getName());

// Parse vector data from document contents
float[] contents = parseVectorFromContents(src.contents());
if (contents == null) {
if (contents == null || contents.length == 0) {
LOG.error("Vector data is null or empty for document ID: " + src.id());
throw new InvalidDocumentException();
}

Expand All @@ -62,14 +72,18 @@ public Document createDocument(T src) throws InvalidDocumentException {
document.add(new KnnFloatVectorField(Constants.VECTOR, contents, VectorSimilarityFunction.DOT_PRODUCT));

LOG.info("Document created for ID: " + src.id());

return document;

} catch (Exception e) {
LOG.error("Error creating document for ID: " + src.id(), e);
throw new InvalidDocumentException();

} finally {
// Mark processing as complete
processedDocuments.get(docId).set(false);
// Ensure the processed flag is reset if needed
AtomicBoolean processed = processedDocuments.get(docId);
if (processed != null) {
processed.set(false);
}
}
}

Expand All @@ -80,11 +94,21 @@ public Document createDocument(T src) throws InvalidDocumentException {
* @return the parsed vector as an array of floats
*/
private float[] parseVectorFromContents(String contents) {
String[] parts = contents.replace("[", "").replace("]", "").split(",");
float[] vector = new float[parts.length];
for (int i = 0; i < parts.length; i++) {
vector[i] = Float.parseFloat(parts[i].trim());
if (contents == null || contents.isEmpty()) {
LOG.error("Contents are null or empty, cannot parse vectors.");
return null;
}

try {
String[] parts = contents.replace("[", "").replace("]", "").split(",");
float[] vector = new float[parts.length];
for (int i = 0; i < parts.length; i++) {
vector[i] = Float.parseFloat(parts[i].trim());
}
return vector;
} catch (NumberFormatException e) {
LOG.error("Error parsing vector contents: " + contents, e);
return null;
}
return vector;
}
}
}

0 comments on commit 7551989

Please sign in to comment.