User Defined Connector

GeaFlow support user defined connector using the java SPI.

Interface

TableConnector

User should implement a TableConnector. We support TableReadableConnector for read and TableWritableConnector for write. If you implement both of them, the connector will support both read and write.

/**
 * The interface for table connector.
 */
public interface TableConnector {

    /**
     * Return table connector type.
     */
    String getType();
}

/**
 * A readable table connector.
 */
public interface TableReadableConnector extends TableConnector {

    TableSource createSource(Configuration conf);
}

/**
 * A writable table connector.
 */
public interface TableWritableConnector extends TableConnector {

    /**
     * Create the {@link TableSink} for the table connector.
     */
    TableSink createSink(Configuration conf);
}

TableSource

TableSource is the inferface for read data from the connector.

/**
 * Interface for table source.
 */
public interface TableSource extends Serializable {

    /**
     * The init method for compile time.
     */
    void init(Configuration tableConf, TableSchema tableSchema);

    /**
     * The init method for runtime.
     */
    void open(RuntimeContext context);

    /**
     * List all the partitions for the source.
     */
    List<Partition> listPartitions();

    /**
     * Returns the {@link TableDeserializer} for the source to convert data read from
     * the source to {@link Row}.
     */
    <IN> TableDeserializer<IN> getDeserializer(Configuration conf);

    /**
     * Fetch data for the partition from start offset. if the windowSize is -1, it represents an
     * all-window which will read all the data from the source, else return widow size for data.
     */
    <T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset, long windowSize) throws IOException;

    /**
     * The close callback for the job finish the execution.
     */
    void close();
}

TableSink

TableSink is the interface for write data to the connector.

/**
 * Interface for table sink.
 */
public interface TableSink extends Serializable {

    /**
     * The init method for compile time.
     */
    void init(Configuration tableConf, StructType schema);

    /**
     * The init method for runtime.
     */
    void open(RuntimeContext context);

    /**
     * The write method for writing row to the table.
     */
    void write(Row row) throws IOException;

    /**
     * The finish callback for each window finished.
     */
    void finish() throws IOException;

    /**
     * The close callback for the job finish the execution.
     */
    void close();
}

Example

Here is an example for console table connector.

Implement TableConnector

public class ConsoleTableConnector implements TableWritableConnector {

    @Override
    public String getType() {
        return "CONSOLE";
    }

    @Override
    public TableSink createSink(Configuration conf) {
        return new ConsoleTableSink();
    }
}

public class ConsoleTableSink implements TableSink {

    private static final Logger LOGGER = 
            LoggerFactory.getLogger(ConsoleTableSink.class);

    private boolean skip;

    @Override
    public void init(Configuration tableConf, StructType schema) {
        skip = tableConf.getBoolean(ConsoleConfigKeys.GEAFLOW_DSL_CONSOLE_SKIP);
    }

    @Override
    public void open(RuntimeContext context) {

    }

    @Override
    public void write(Row row) {
        if (!skip) {
            LOGGER.info(row.toString());
        }
    }

    @Override
    public void finish() {

    }

    @Override
    public void close() {

    }
}

After implement the ConsoleTableConnector, you should put the full class name to the resources/META-INF.services/com.antgroup.geaflow.dsl.connector.api.TableConnector

Usage

CREATE TABLE file_source (
  id BIGINT,
  name VARCHAR,
  age INT
) WITH (
    type='file',
    geaflow.dsl.file.path = '/path/to/file'
);

CREATE TABLE console_sink (
  id BIGINT,
  name VARCHAR,
  age INT
) WITH (
    type='console'
);

INSERT INTO console_sink
SELECT * FROM file_source;