Skip to content
Paul Rogers edited this page Dec 4, 2019 · 6 revisions

Create the Storage Plugin Schema and Table

A storage plugin represents a storage engine. The storage plugin operates at both plan and run times.

The first step is to define one or more schemas that define the tables available to users. The details of the schema are unique to your plugin; here we'll cover the generalities. (Note that the term "schema" here is in the sense of a collection of tables, not in the sense of the set of columns within a table.) Drill defines a schema as a collection of one or more tables, possibly organized into one or more schemas, forming a three-part hierarchy:

  • Storage engine (plugin)
  • Schema (to any depth)
  • Tables

We've defined the plugin. Now we need to define the schema and tables.

Schema and Table Resolution

To put these classes in perspective, it helps to understand how your classes work in the Calcite planner.

Drill uses Apache Calcite as its planner. Calcite is a bit of a black art, but here are some basics.

Calcite defines a "default" schema for each storage plugin which is the one used if you simply use the plugin name itself followed by a table name:

SELECT * FROM examplePlugin.aTable

In this case, Calcite will look up the table aTable in our default schema.

Calcite also allows you to define nested schemas. For example, if we are connecting to MySQL, we must first specify a database, then a table within the database:

SELECT * FROM examplePlugin.aSchema.aTable

Here, Calcite will first resolve the schema name (aSchema) within the default schema, then will resolve the table name within the nested schema.

The complete table resolution process is:

  • Calcite resolves the storage plugin name to find a storage plugin config.
  • Drill Uses the plugin config name to find the storage plugin config
  • Drill Uses the plugin config class to find your plugin.
  • Calcite asks the storage plugin for a schema factory. The schema factory can be shared across queries.
  • Calcite asks the schema factory for the default schema. Calcite will ask for the schema once per query; the schema is assumed to be scoped to that one query.
  • Calcite uses the default schema to resolve a nested schema (optional.)
  • Calcite uses the schema to resolve tables (multiple times). Calcite expects that, each time it asks the schema for a table, that it gets the same table object. (This is important given how Drill "dynamically" creates columns as Calcite asks about them.)
  • Calcite asks the table for an object to represent a scan of that table (called a "scan spec"), which will discuss further in the next section.

To keep this example simple, we assume our tables reside in the default schema.

Schema Factory

Calcite uses a schema factory class to create your schemas as needed. For our example, we'll create a default schema that contains a single table, "myTable".

Create the Schema Factory class:

public class ExampleSchemaFactory extends AbstractSchemaFactory {

  public static final String MY_TABLE = "myTable";

  private final ExampleStoragePlugin plugin;

  public ExampleSchemaFactory(ExampleStoragePlugin plugin) {
    super(plugin.getName());
    this.plugin = plugin;
  }

  @Override
  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent)
      throws IOException {
    parent.add(getName(), new DefaultSchema(getName()));
  }
}

We pass along the plugin because a real implementation will likely obtain the list of schemas or tables from an external system.

Default Schema

Create the default schema. Here we assume that the class is a nested class in the same file as the schema factory. If you put the class in its own file, perhaps change the class name to ExampleDefaultSchema.

  class DefaultSchema extends AbstractSchema {

    private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();

    DefaultSchema(String name) {
      super(Collections.emptyList(), name);
    }

    @Override
    public Table getTable(String name) {
      DynamicDrillTable table = activeTables.get(name);
      if (table != null) {
        return table;
      }
      if (MY_TABLE.contentEquals(name)) {
        return null; // TODO
      }
      return null; // Unknown table
    }

    private DynamicDrillTable registerTable(String name, DynamicDrillTable table) {
      activeTables.put(name, table);
      return table;
    }

    @Override
    public Set<String> getTableNames() {
      return Sets.newHashSet(MY_TABLE);
    }

    @Override
    public String getTypeName() {
      return ExampleStoragePluginConfig.NAME;
    }
  }
}

The schema must hold onto any planning-time tables it creates and return those same tables on each request.

getTableNames() appears to not be critical: your schema can handle tables found dynamically without having to declare all of them in getTableNames().

Create the Schema Factory Instance

Create the schema factory in the storage plugin:

  public ExampleStoragePlugin(ExampleStoragePluginConfig config,
      DrillbitContext context, String name) throws IOException {
    ...
    schemaFactory = new ExampleSchemaFactory(this);
  }

The schemaFactory is provided by the base class. The base class will return this schema factory when Calcite requests it so you don't need to provide the registerSchemas() method. (If you review existing plugins, you will see they do provide this method; they were created before the base class existed.)

Test Schema Resolution

We can now test that the schema resolution works. Add the following test:

  @Test
  public void test() throws Exception {
    String sql = "SELECT * FROM sumo.logs.all";
    queryBuilder().sql(sql).run();
  }

Set breakpoints in the following methods:

  • BaseStoragePlugin.registerSchemas
  • DefaultSchema.getTable

Run the test. You should see the debugger stop in each method (multiple times.) If so, then the name resolution works and we are ready to move on to creating tables.

If not, then double-check that the naming works correctly:

  • Register schemas in the storage plugin.
  • Give the default schema the same name as given to your plugin in its constructor (not necessarily the name defined in your config class.)
  • Give the default schema no parent path.
  • If you include a nested schema, in the default schema getSubSchema method, be sure to give it the sub-schema name and the path of its parent (the default schema.)
  • Provide the getTable method (which does nothing yet.)

Debugging though Calcite is a bit tricky; best to sort out any issues using the hints above.

Table Definition

The next step in the name resolution process is to provide the class that represents the resolved table name in a query:

SELECT * example.myTable;

Drill gives you complete flexibility: you can match existing tables, you can have a fixed set of tables, or you can even create tables on the fly (as the "mock" storage plugin does.) You can also store as little or as much information as you want in your table definition.

For our example, we will support a single table name, and the only information we need is the table names. As a result, we can use Drill's table class directly. However, if we needed more table information or logic, we could create our own table class.

In our nested schema we will resolve our one and only table name:

  class DefaultSchema extends AbstractSchema {
    ...
    @Override
    public Table getTable(String tableName) {
      DynamicDrillTable table = activeTables.get(name);
      if (table != null) {
        return table;
      }
      if (MY_TABLE.contentEquals(tableName)) {
        return registerTable(name,
            new DynamicDrillTable(plugin, plugin.getName(),
            null /* TO DO */));
      }
      return null;
    }

Dynamic Table Schema

Note that the logic to cache tables is essential (this author found that out the hard way.) It is useful to understand why.

In most databases, a table carries a known, fixed schema: the set of columns in a table is independent of any observer (query) on that table.

Drill, however, supports a dynamic schema: columns don't exist until we look at them. (There is some deep philosophical tangent we could go onto here, but we'll resist.) Said another way, since Drill does not know the schema of, say, your JSON file, Drill uses "hints" to infer the schema. One of those hints is the fact that you asked for a set of columns:

SELECT a, b FROM myTable WHERE c > 10

The above tells us that, very likely, the table does have columns a, b and c. Drill's DynamicDrillTable uses that fact to create columns on read. When Calcite resolves a column, it asks the table for a description of the table's row structure as a RelDataType. The dynamic table provides a RelDataTypeHolder which, in its getField() method, adds the column to the row schema if it does not exist, thus materializing the column when we look for it.

This magic works as long as you return the same table object each time that Calcite asks for that table, hence the caching above. If you do not cache, but instead create a new DynamicDrillTable each time, then you will get obscure errors at plan time from Calcite as columns seemingly appear and disappear.

The above makes one wonder: what if the query performs a self join?

SELECT t1.a, t1.b, t2.c FROM myTable t1, myTable t2 WHERE t1.a = t2.a

In this case, we have two references to the same table. Alias t1 references refers to columns a and b, while alias t2 refers to columns a and c. How will we know which columns are needed from each alias?

As it turns out, the columns-per-scan question is answered later (via the scan object.) Here, we simply infer that if both aliases refer to the same underlying table, then that table must have all three columns.

Note also that, because of the way we infer the existence of columns, we don't know the column's data type. There may be a way to infer that from the information in the query, but that is beyond the scope of this post.

Finally, you may be building a plugin in which you know the schema. In this case, you would not use DrillDynamicTable, but would instead create your own class that extends DrillTable which would hold your pre-defined schema. Consider the Hive storage plugin as an example.

Clone this wiki locally