首页 图片 查字 
所属分类:Flink
关键词: Flink 分区 阅读
浏览:580
内容:

1、
GlobalPartitioner
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return 0;
    }
}

2、
ShufflePartitioner
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
    private Random random = new Random();
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return random.nextInt(numberOfChannels);
    }
}

3、
RebalancePartitioner
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
    private int nextChannelToSendTo;
    @Override
    public void setup(int numberOfChannels) {
        super.setup(numberOfChannels);
        nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
    }
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }
}

4、
KeyGroupStreamPartitioner
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T>
        implements ConfigurableStreamPartitioner {
    private int maxParallelism;
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException(
                    "Could not extract key from " + record.getInstance().getValue(), e);
        }
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(
                key, maxParallelism, numberOfChannels);
    }
}

5、
KeyGroupRangeAssignment
public final class KeyGroupRangeAssignment {
    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        return computeOperatorIndexForKeyGroup(
                maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }
    public static int computeOperatorIndexForKeyGroup(
            int maxParallelism, int parallelism, int keyGroupId) {
        return keyGroupId * parallelism / maxParallelism;
    }
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }    
}

6、
BroadcastPartitioner
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        throw new UnsupportedOperationException(
                "Broadcast partitioner does not support select channels.");
    }
}

7、
RescalePartitioner
public class RescalePartitioner<T> extends StreamPartitioner<T> {
    private int nextChannelToSendTo = -1;
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        if (++nextChannelToSendTo >= numberOfChannels) {
            nextChannelToSendTo = 0;
        }
        return nextChannelToSendTo;
    }
    @Override
    public boolean isPointwise() {
        return true;
    }
}

8、
ForwardPartitioner
public class ForwardPartitioner<T> extends StreamPartitioner<T> {
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return 0;
    }
    @Override
    public boolean isPointwise() {
        return true;
    }
}

9、
CustomPartitionerWrapper
public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
        }
        return partitioner.partition(key, numberOfChannels);
    }
}
public interface Partitioner<K> extends java.io.Serializable, Function {
    int partition(K key, int numPartitions);
}