Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve dimension table handing #13967

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Closeable;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
Expand All @@ -29,9 +30,19 @@ public interface DimensionTable extends Closeable {

List<String> getPrimaryKeyColumns();

GenericRow get(PrimaryKey pk);
@Nullable
FieldSpec getFieldSpecFor(String columnName);

boolean isEmpty();

FieldSpec getFieldSpecFor(String columnName);
boolean containsKey(PrimaryKey pk);

@Nullable
GenericRow getRow(PrimaryKey pk);

@Nullable
Object getValue(PrimaryKey pk, String columnName);

@Nullable
Object[] getValues(PrimaryKey pk, String[] columnNames);
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,26 @@ public boolean isPopulated() {
return !_dimensionTable.get().isEmpty();
}

public GenericRow lookupRowByPrimaryKey(PrimaryKey pk) {
return _dimensionTable.get().get(pk);
public boolean containsKey(PrimaryKey pk) {
return _dimensionTable.get().containsKey(pk);
}

@Nullable
public GenericRow lookupRow(PrimaryKey pk) {
return _dimensionTable.get().getRow(pk);
}

@Nullable
public Object lookupValue(PrimaryKey pk, String columnName) {
return _dimensionTable.get().getValue(pk, columnName);
}

@Nullable
public Object[] lookupValues(PrimaryKey pk, String[] columnNames) {
return _dimensionTable.get().getValues(pk, columnNames);
}

@Nullable
public FieldSpec getColumnFieldSpec(String columnName) {
return _dimensionTable.get().getFieldSpecFor(columnName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
Expand All @@ -43,9 +44,10 @@ public List<String> getPrimaryKeyColumns() {
return _primaryKeyColumns;
}

@Nullable
@Override
public GenericRow get(PrimaryKey pk) {
return _lookupTable.get(pk);
public FieldSpec getFieldSpecFor(String columnName) {
return _tableSchema.getFieldSpecFor(columnName);
}

@Override
Expand All @@ -54,8 +56,36 @@ public boolean isEmpty() {
}

@Override
public FieldSpec getFieldSpecFor(String columnName) {
return _tableSchema.getFieldSpecFor(columnName);
public boolean containsKey(PrimaryKey pk) {
return _lookupTable.containsKey(pk);
}

@Nullable
@Override
public GenericRow getRow(PrimaryKey pk) {
return _lookupTable.get(pk);
}

@Nullable
@Override
public Object getValue(PrimaryKey pk, String columnName) {
GenericRow row = _lookupTable.get(pk);
return row != null ? row.getValue(columnName) : null;
}

@Nullable
@Override
public Object[] getValues(PrimaryKey pk, String[] columnNames) {
GenericRow row = _lookupTable.get(pk);
if (row == null) {
return null;
}
int numColumns = columnNames.length;
Object[] values = new Object[numColumns];
for (int i = 0; i < numColumns; i++) {
values[i] = row.getValue(columnNames[i]);
}
return values;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ public GenericRow getRecord(GenericRow reuse) {
_pinotSegmentRecordReader.getRecord(_docId, reuse);
return reuse;
}

public Object getValue(String column) {
return _pinotSegmentRecordReader.getValue(_docId, column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
Expand Down Expand Up @@ -59,7 +60,23 @@ public List<String> getPrimaryKeyColumns() {
}

@Override
public GenericRow get(PrimaryKey pk) {
public FieldSpec getFieldSpecFor(String columnName) {
return _tableSchema.getFieldSpecFor(columnName);
}

@Override
public boolean isEmpty() {
return _lookupTable.isEmpty();
}

@Override
public boolean containsKey(PrimaryKey pk) {
return _lookupTable.containsKey(pk);
}

@Nullable
@Override
public GenericRow getRow(PrimaryKey pk) {
LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk);
if (lookupRecordLocation == null) {
return null;
Expand All @@ -69,14 +86,26 @@ public GenericRow get(PrimaryKey pk) {
return lookupRecordLocation.getRecord(reuse);
}

@Nullable
@Override
public boolean isEmpty() {
return _lookupTable.isEmpty();
public Object getValue(PrimaryKey pk, String columnName) {
LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk);
return lookupRecordLocation != null ? lookupRecordLocation.getValue(columnName) : null;
}

@Nullable
@Override
public FieldSpec getFieldSpecFor(String columnName) {
return _tableSchema.getFieldSpecFor(columnName);
public Object[] getValues(PrimaryKey pk, String[] columnNames) {
LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk);
if (lookupRecordLocation == null) {
return null;
}
int numColumns = columnNames.length;
Object[] values = new Object[numColumns];
for (int i = 0; i < numColumns; i++) {
values[i] = lookupRecordLocation.getValue(columnNames[i]);
}
return values;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
Expand Down Expand Up @@ -219,8 +218,7 @@ private void lookup(ValueBlock valueBlock, ValueAcceptor valueAcceptor) {
}
}
// lookup
GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
Object value = row == null ? null : row.getValue(_dimColumnName);
Object value = _dataManager.lookupValue(primaryKey, _dimColumnName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

but where lookupValues() method is used? or added for future use

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sharp eyes! Yes it is a preparation for #13966

valueAcceptor.accept(i, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
import static org.testng.Assert.*;


@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -196,17 +193,29 @@ public void testLookup()
DimensionTableDataManager tableDataManager = makeTableDataManager(helixManager);

// try fetching data BEFORE loading segment
GenericRow resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNull(resp, "Response should be null if no segment is loaded");
PrimaryKey key = new PrimaryKey(new String[]{"SF"});
assertFalse(tableDataManager.containsKey(key));
assertNull(tableDataManager.lookupRow(key));
assertNull(tableDataManager.lookupValue(key, "teamID"));
assertNull(tableDataManager.lookupValue(key, "teamName"));
assertNull(tableDataManager.lookupValues(key, new String[]{"teamID", "teamName"}));

tableDataManager.addSegment(ImmutableSegmentLoader.load(_indexDir, _indexLoadingConfig));

// Confirm table is loaded and available for lookup
resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNotNull(resp, "Should return response after segment load");
assertEquals(resp.getFieldToValueMap().size(), 2);
assertEquals(resp.getValue("teamID"), "SF");
assertEquals(resp.getValue("teamName"), "San Francisco Giants");
assertTrue(tableDataManager.containsKey(key));
GenericRow row = tableDataManager.lookupRow(key);
assertNotNull(row);
assertEquals(row.getFieldToValueMap().size(), 2);
assertEquals(row.getValue("teamID"), "SF");
assertEquals(row.getValue("teamName"), "San Francisco Giants");
assertEquals(tableDataManager.lookupValue(key, "teamID"), "SF");
assertEquals(tableDataManager.lookupValue(key, "teamName"), "San Francisco Giants");
Object[] values = tableDataManager.lookupValues(key, new String[]{"teamID", "teamName"});
assertNotNull(values);
assertEquals(values.length, 2);
assertEquals(values[0], "SF");
assertEquals(values[1], "San Francisco Giants");

// Confirm we can get FieldSpec for loaded tables columns.
FieldSpec spec = tableDataManager.getColumnFieldSpec("teamName");
Expand All @@ -224,8 +233,11 @@ public void testLookup()
String segmentName = segMgr.getSegmentName();
tableDataManager.offloadSegment(segmentName);
// confirm table is cleaned up
resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNull(resp, "Response should be null if no segment is loaded");
assertFalse(tableDataManager.containsKey(key));
assertNull(tableDataManager.lookupRow(key));
assertNull(tableDataManager.lookupValue(key, "teamID"));
assertNull(tableDataManager.lookupValue(key, "teamName"));
assertNull(tableDataManager.lookupValues(key, new String[]{"teamID", "teamName"}));
}

@Test
Expand All @@ -240,11 +252,15 @@ public void testReloadTable()
tableDataManager.addSegment(ImmutableSegmentLoader.load(_indexDir, _indexLoadingConfig));

// Confirm table is loaded and available for lookup
GenericRow resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNotNull(resp, "Should return response after segment load");
assertEquals(resp.getFieldToValueMap().size(), 2);
assertEquals(resp.getValue("teamID"), "SF");
assertEquals(resp.getValue("teamName"), "San Francisco Giants");
PrimaryKey key = new PrimaryKey(new String[]{"SF"});
assertTrue(tableDataManager.containsKey(key));
GenericRow row = tableDataManager.lookupRow(key);
assertNotNull(row);
assertEquals(row.getFieldToValueMap().size(), 2);
assertEquals(row.getValue("teamID"), "SF");
assertEquals(row.getValue("teamName"), "San Francisco Giants");
assertEquals(tableDataManager.lookupValue(key, "teamID"), "SF");
assertEquals(tableDataManager.lookupValue(key, "teamName"), "San Francisco Giants");

// Confirm the new column does not exist
FieldSpec teamCitySpec = tableDataManager.getColumnFieldSpec("teamCity");
Expand All @@ -261,11 +277,16 @@ public void testReloadTable()
teamCitySpec = tableDataManager.getColumnFieldSpec("teamCity");
assertNotNull(teamCitySpec, "Should return spec for existing column");
assertEquals(teamCitySpec.getDataType(), DataType.STRING, "Should return correct data type for teamCity column");
resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertEquals(resp.getFieldToValueMap().size(), 3);
assertEquals(resp.getValue("teamID"), "SF");
assertEquals(resp.getValue("teamName"), "San Francisco Giants");
assertEquals(resp.getValue("teamCity"), "null");
assertTrue(tableDataManager.containsKey(key));
row = tableDataManager.lookupRow(key);
assertNotNull(row, "Should return response after segment reload");
assertEquals(row.getFieldToValueMap().size(), 3);
assertEquals(row.getValue("teamID"), "SF");
assertEquals(row.getValue("teamName"), "San Francisco Giants");
assertEquals(row.getValue("teamCity"), "null");
assertEquals(tableDataManager.lookupValue(key, "teamID"), "SF");
assertEquals(tableDataManager.lookupValue(key, "teamName"), "San Francisco Giants");
assertEquals(tableDataManager.lookupValue(key, "teamCity"), "null");
}

@Test
Expand All @@ -281,17 +302,21 @@ public void testLookupWithoutPreLoad()
DimensionTableDataManager tableDataManager = makeTableDataManager(helixManager);

// try fetching data BEFORE loading segment
GenericRow resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNull(resp, "Response should be null if no segment is loaded");
PrimaryKey key = new PrimaryKey(new String[]{"SF"});
assertFalse(tableDataManager.containsKey(key));
assertNull(tableDataManager.lookupRow(key));

tableDataManager.addSegment(ImmutableSegmentLoader.load(_indexDir, _indexLoadingConfig));

// Confirm table is loaded and available for lookup
resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNotNull(resp, "Should return response after segment load");
assertEquals(resp.getFieldToValueMap().size(), 2);
assertEquals(resp.getValue("teamID"), "SF");
assertEquals(resp.getValue("teamName"), "San Francisco Giants");
assertTrue(tableDataManager.containsKey(key));
GenericRow row = tableDataManager.lookupRow(key);
assertNotNull(row, "Should return response after segment load");
assertEquals(row.getFieldToValueMap().size(), 2);
assertEquals(row.getValue("teamID"), "SF");
assertEquals(row.getValue("teamName"), "San Francisco Giants");
assertEquals(tableDataManager.lookupValue(key, "teamID"), "SF");
assertEquals(tableDataManager.lookupValue(key, "teamName"), "San Francisco Giants");

// Confirm we can get FieldSpec for loaded tables columns.
FieldSpec spec = tableDataManager.getColumnFieldSpec("teamName");
Expand All @@ -309,8 +334,8 @@ public void testLookupWithoutPreLoad()
String segmentName = segMgr.getSegmentName();
tableDataManager.offloadSegment(segmentName);
// confirm table is cleaned up
resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNull(resp, "Response should be null if no segment is loaded");
assertFalse(tableDataManager.containsKey(key));
assertNull(tableDataManager.lookupRow(key));
}

@Test
Expand All @@ -326,8 +351,7 @@ public void testLookupErrorOnDuplicatePrimaryKey()
DimensionTableDataManager tableDataManager = makeTableDataManager(helixManager);

// try fetching data BEFORE loading segment
GenericRow resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNull(resp, "Response should be null if no segment is loaded");
assertFalse(tableDataManager.containsKey(new PrimaryKey(new String[]{"SF"})));

try {
tableDataManager.addSegment(ImmutableSegmentLoader.load(_indexDir, _indexLoadingConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ public void setUp()
_doubleMV2Values[i][j] = 1.0;
}

float range = 1.0f - 0.0f;
Random random = new Random();
for (int j = 0; j < VECTOR_DIM_SIZE; j++) {
_vector1Values[i][j] = random.nextFloat();;
Expand Down
Loading
Loading