内容:
1、
GlobalPartitioner
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
}
2、
ShufflePartitioner
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
private Random random = new Random();
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}
}
3、
RebalancePartitioner
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
private int nextChannelToSendTo;
@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
}
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
}
4、
KeyGroupStreamPartitioner
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T>
implements ConfigurableStreamPartitioner {
private int maxParallelism;
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException(
"Could not extract key from " + record.getInstance().getValue(), e);
}
return KeyGroupRangeAssignment.assignKeyToParallelOperator(
key, maxParallelism, numberOfChannels);
}
}
5、
KeyGroupRangeAssignment
public final class KeyGroupRangeAssignment {
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
Preconditions.checkNotNull(key, "Assigned key must not be null!");
return computeOperatorIndexForKeyGroup(
maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
public static int computeOperatorIndexForKeyGroup(
int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
public static int assignToKeyGroup(Object key, int maxParallelism) {
Preconditions.checkNotNull(key, "Assigned key must not be null!");
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
}
6、
BroadcastPartitioner
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new UnsupportedOperationException(
"Broadcast partitioner does not support select channels.");
}
}
7、
RescalePartitioner
public class RescalePartitioner<T> extends StreamPartitioner<T> {
private int nextChannelToSendTo = -1;
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
}
@Override
public boolean isPointwise() {
return true;
}
}
8、
ForwardPartitioner
public class ForwardPartitioner<T> extends StreamPartitioner<T> {
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
@Override
public boolean isPointwise() {
return true;
}
}
9、
CustomPartitionerWrapper
public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
}
return partitioner.partition(key, numberOfChannels);
}
}
public interface Partitioner<K> extends java.io.Serializable, Function {
int partition(K key, int numPartitions);
}