UDTF介绍

UDTF(User Defined Table Function)将输入扩展为多行。

接口

public abstract class UserDefinedFunction implements Serializable {

   /**
     * Init method for the user defined function.
     */
    public void open(FunctionContext context) {
    }

    /**
     * Close method for the user defined function.
     */
    public void close() {
    }
}

public abstract class UDTF extends UserDefinedFunction {

    protected List<Object[]> collector;

    public UDTF() {
        this.collector = Lists.newArrayList();
    }

    /**
     * Collect the result.
     */
    protected void collect(Object[] output) {

    }

    /**
     * Returns type output types for the function.
     * @param paramTypes The parameter types of the function.
     * @param outFieldNames The output fields of the function in the sql.
     */
    public abstract List<Class<?>> getReturnType(List<Class<?>> paramTypes, 
                                                 List<String> outFieldNames);
}

每个UDTF都应该有一个或多个eval方法。

示例

public class Split extends UDTF {

    private String splitChar = ",";

    public void eval(String text) {
        evalInternal(text);
    }

    public void eval(String text, String separator) {
        evalInternal(text, separator);
    }

    private void evalInternal(String... args) {
        if (args != null && (args.length == 1 || args.length == 2)) {
            if (args.length == 2 && StringUtils.isNotEmpty(args[1])) {
                splitChar = args[1];
            }
            String[] lines = StringUtils.split(args[0], splitChar);
            for (String line : lines) {
                collect(new Object[]{line});
            }
        }
    }

    @Override
    public List<Class<?>> getReturnType(List<Class<?>> paramTypes, 
                                        List<String> outputFields) {
        return Collections.singletonList(String.class);
    }
}
CREATE Function my_split AS 'com.antgroup.geaflow.dsl.udf.Split';

SELECT t.id, u.name FROM users u, LATERAL table(my_split(u.ids)) as t(id);