Updated discovery source code example.

This commit is contained in:
Paulo Pires 2015-01-07 12:04:59 +00:00
parent 2c6211e692
commit cd946abaf0

View File

@ -211,8 +211,6 @@ kubecfg resize hazelcast 4
### Hazelcast Discovery Source ### Hazelcast Discovery Source
```java ```java
import static com.github.pires.hazelcast.Constants.hazelcastPodLabelKey;
import static com.github.pires.hazelcast.Constants.hazelcastPodLabelValue;
import com.hazelcast.config.Config; import com.hazelcast.config.Config;
import com.hazelcast.config.GroupConfig; import com.hazelcast.config.GroupConfig;
import com.hazelcast.config.JoinConfig; import com.hazelcast.config.JoinConfig;
@ -221,15 +219,14 @@ import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.SSLConfig; import com.hazelcast.config.SSLConfig;
import com.hazelcast.config.TcpIpConfig; import com.hazelcast.config.TcpIpConfig;
import com.hazelcast.core.Hazelcast; import com.hazelcast.core.Hazelcast;
import io.fabric8.kubernetes.api.Kubernetes; import io.fabric8.kubernetes.api.KubernetesClient;
import io.fabric8.kubernetes.api.KubernetesFactory; import io.fabric8.kubernetes.api.KubernetesFactory;
import io.fabric8.kubernetes.api.model.PodSchema; import io.fabric8.kubernetes.api.model.Pod;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
@ -242,46 +239,45 @@ public class HazelcastDiscoveryController implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger( private static final Logger log = LoggerFactory.getLogger(
HazelcastDiscoveryController.class); HazelcastDiscoveryController.class);
// TODO load this from env vars private static final String HAZELCAST_LABEL_NAME = "name";
private static final String HC_GROUP_NAME = "someGroup"; private static final String HAZELCAST_LABEL_VALUE = "hazelcast";
private static final String HC_GROUP_PASSWORD = "someSecret";
private static final int HC_PORT = 5701;
@Value("#{systemEnvironment.KUBERNETES_RO_SERVICE_HOST}") private static String getEnvOrDefault(String var, String def) {
private String kubeMasterHost; final String val = System.getenv(var);
return (val == null || val.isEmpty())
@Value("#{systemEnvironment.KUBERNETES_RO_SERVICE_PORT}") ? def
private String kubeMasterPort; : val;
private String getKubeApi() {
return "http://" + kubeMasterHost + ":" + kubeMasterPort;
} }
@Override @Override
public void run(String... args) { public void run(String... args) {
log.info("Asking k8s registry at {}..", getKubeApi()); final String kubeApiHost = getEnvOrDefault("KUBERNETES_RO_SERVICE_HOST",
KubernetesFactory kubernetesFactory = new KubernetesFactory(getKubeApi()); "localhost");
final List<PodSchema> hazelcastPods = retrieveHazelcastPods( final String kubeApiPort = getEnvOrDefault("KUBERNETES_RO_SERVICE_PORT",
kubernetesFactory.createKubernetes()); "8080");
log.info("Found {} pods running Hazelcast.", hazelcastPods.size()); final String kubeUrl = "http://" + kubeApiHost + ":" + kubeApiPort;
log.info("Asking k8s registry at {}..", kubeUrl);
final KubernetesClient kube = new KubernetesClient(new KubernetesFactory(
kubeUrl));
final List<Pod> hazelcastPods = new CopyOnWriteArrayList<>();
kube.getPods().getItems().parallelStream().filter(pod
-> pod.getLabels().get(HAZELCAST_LABEL_NAME).equals(
HAZELCAST_LABEL_VALUE)).forEach(hazelcastPods::add);
log.info("Found {} pods running Hazelcast.", hazelcastPods.size());
if (!hazelcastPods.isEmpty()) { if (!hazelcastPods.isEmpty()) {
runHazelcast(hazelcastPods); runHazelcast(hazelcastPods);
} }
} }
public List<PodSchema> retrieveHazelcastPods(final Kubernetes kubernetes) { private void runHazelcast(final List<Pod> hazelcastPods) {
final List<PodSchema> hazelcastPods = new CopyOnWriteArrayList<>();
kubernetes.getPods().getItems().parallelStream().filter(pod -> pod.
getLabels().get(hazelcastPodLabelKey).equals(hazelcastPodLabelValue)).
forEach(hazelcastPods::add);
return hazelcastPods;
}
private void runHazelcast(final List<PodSchema> hazelcastPods) {
// configure Hazelcast instance // configure Hazelcast instance
final Config cfg = new Config(); final Config cfg = new Config();
cfg.setInstanceName(UUID.randomUUID().toString()); cfg.setInstanceName(UUID.randomUUID().toString());
// group configuration // group configuration
final String HC_GROUP_NAME = getEnvOrDefault("HC_GROUP_NAME", "someGroup");
final String HC_GROUP_PASSWORD = getEnvOrDefault("HC_GROUP_PASSWORD",
"someSecret");
final int HC_PORT = Integer.parseInt(getEnvOrDefault("HC_PORT", "5701"));
cfg.setGroupConfig(new GroupConfig(HC_GROUP_NAME, HC_GROUP_PASSWORD)); cfg.setGroupConfig(new GroupConfig(HC_GROUP_NAME, HC_GROUP_PASSWORD));
// network configuration initialization // network configuration initialization
final NetworkConfig netCfg = new NetworkConfig(); final NetworkConfig netCfg = new NetworkConfig();
@ -292,8 +288,7 @@ public class HazelcastDiscoveryController implements CommandLineRunner {
mcCfg.setEnabled(false); mcCfg.setEnabled(false);
// tcp // tcp
final TcpIpConfig tcpCfg = new TcpIpConfig(); final TcpIpConfig tcpCfg = new TcpIpConfig();
hazelcastPods.stream().filter( hazelcastPods.parallelStream().forEach(pod -> {
pod -> pod.getCurrentState().getPodIP() != null).forEach(pod -> {
tcpCfg.addMember(pod.getCurrentState().getPodIP()); tcpCfg.addMember(pod.getCurrentState().getPodIP());
}); });
tcpCfg.setEnabled(true); tcpCfg.setEnabled(true);
@ -306,11 +301,9 @@ public class HazelcastDiscoveryController implements CommandLineRunner {
netCfg.setSSLConfig(new SSLConfig().setEnabled(false)); netCfg.setSSLConfig(new SSLConfig().setEnabled(false));
// set it all // set it all
cfg.setNetworkConfig(netCfg); cfg.setNetworkConfig(netCfg);
// run // run
Hazelcast.newHazelcastInstance(cfg); Hazelcast.newHazelcastInstance(cfg);
} }
} }
``` ```