Service Fabric on Linux

  • 8/24/2018

Using Communication Listeners

To create a service that listens to client requests, you need to create and register a CommunicationListener implementation. This process is very similar to what you did in Chapter 2. Perform the following steps to create a Java-based calculator service that provides a REST API for add and subtract calculations:

  1. In Eclipse, create a new Service Fabric application named CalculatorApplication with a stateless service named Calculator.

  2. Add a new CalculatorServer class to the project. This class contains nothing specific to Service Fabric. It uses com.sun.net.httpserver.HttpServer to handle add and subtract requests from clients.

    package statelessservice;
    
    import com.sun.net.httpserver.*;
    import java.net.InetSocketAddress;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.io.UnsupportedEncodingException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CalculatorServer {
            private HttpServer server;
        private int port;
        public CalculatorServer(int port) {
                this.port = port;
        }
        public void start() throws IOException {
                server = HttpServer.create(new InetSocketAddress(port),0);
                HttpHandler add = new HttpHandler() {
                        @Override
                        public void handle(HttpExchange h) throws IOException {
                                byte[] buffer = CalculatorServer.handleCalculation
                                    (h.getRequestURI().getQuery(), "add");
                                h.sendResponseHeaders(200, buffer.length);
                                OutputStream os = h.getResponseBody();
                                os.write(buffer);
                                os.close();
                        }
                };
                HttpHandler subtract = new HttpHandler() {
                        @Override
                        public void handle(HttpExchange h) throws IOException {
                                byte[] buffer = CalculatorServer.handleCalculation
                                    (h.getRequestURI().getQuery(), "subtract");
                                h.sendResponseHeaders(200, buffer.length);
                                OutputStream os = h.getResponseBody();
                                os.write(buffer);
                                os.close();
                        }
                };
                server.createContext("/api/add", add);
                server.createContext("/api/subtract", subtract);
                server.setExecutor(null);
                server.start();
        }
        public void stop() {
                server.stop(10);
        }
        public static Map<String, String> queryToMap(String query) {
                Map<String, String> map = new HashMap<String, String>();
                for (String param: query.split("&")) {
                        String pair[] = param.split("=");
                        if (pair.length > 1) {
                                map.put(pair[0], pair[1]);
                        } else {
                                map.put(pair[0], "0");
                        }
                }
                return map;
        }
        public static byte[] handleCalculation(String query, String type)
            throws UnsupportedEncodingException {
                byte[] buffer = null;
                Map<String, String> parameters = CalculatorServer.queryToMap(query);
                int c = 0;
                try
                {
                    int a = Integer.parseInt(parameters.get("a"));
                    int b = Integer.parseInt(parameters.get("b"));
                    if (type.equals("add")) {
                            c = a + b;
                    } else {
                            c = a - b;
                    }
                    buffer = Integer.toString(c).getBytes("UTF-8");
                } catch (NumberFormatException e) {
                        buffer = ("Invalid parameters").getBytes("UTF-8");
                }
                return buffer;
        }
    }
  3. Add a WebCommunicationListener class to the project. This class implements microsoft.servicefabric.services.communication.runtime.CommunicationListener and overrides the openAsync, closeAsync, and abort methods.

    package statelessservice;
    
    import java.util.concurrent.CompletableFuture;
    import java.io.IOException;
    import microsoft.servicefabric.services.communication.runtime.CommunicationListener;
    import microsoft.servicefabric.services.runtime.StatelessServiceContext;
    import system.fabric.description.EndpointResourceDescription;
    import system.fabric.CancellationToken;
    
    public class WebCommunicationListener implements CommunicationListener {
        private StatelessServiceContext context;
        private CalculatorServer server;
        private String webEndpointName = "ServiceEndpoint";
        private int port;
        public WebCommunicationListener(StatelessServiceContext context) {
                this.context = context;
                EndpointResourceDescription endpoint =
                    this.context.getCodePackageActivationContext().getEndpoint
                        (webEndpointName);
            this.port = endpoint.getPort();
        }
    
        @Override
        public CompletableFuture<String> openAsync(CancellationToken cancellationToken) {
                CompletableFuture<String> str = new CompletableFuture<>();
                String address = String.format("http://%s:%d/api",
                    this.context.getNodeContext().getIpAddressOrFQDN(), this.port);
                str.complete(address);
                try
                {
                    server = new CalculatorServer(port);
                    server.start();
                } catch (IOException e) {
                        throw new RuntimeException(e);
                }
                return str;
        }
    
        @Override
        public CompletableFuture<?> closeAsync(CancellationToken cancellationToken) {
                CompletableFuture<Boolean> task = new CompletableFuture<>();
                task.complete(Boolean.TRUE);
                if (server != null) {
                        server.stop();
                }
                return task;
        }
    
        @Override
        public void abort() {
                if (server != null) {
                        server.stop();
                }
        }
    }
  4. Modify CalculatorService to return WebCommunicationListener from the overridden createServiceInstanceListeners method:

    import java.util.ArrayList;
    ...
    public class CalculatorService extends StatelessService {
        @Override
        protected List<ServiceInstanceListener> createServiceInstanceListeners() {
            ArrayList<ServiceInstanceListener> listeners = new ArrayList();
            listeners.add(new ServiceInstanceListener((context) -> {
                    return new WebCommunicationListener(context);
            }));
            return listeners;
        }
    }
  5. Modify the CalculatorApplicationApplication\CalculatorPkg\ServiceManifest.xml file to define an endpoint resource named ServiceEndpoint:

    <Resources>
        <Endpoints>
            <Endpoint Name="ServiceEndpoint" Protocol="http" Port="8182" />
        </Endpoints>
    </Resources>
  6. Build and deploy the application. Afterward, you should be able to use a browser and send requests such as http://localhost:8182/api/add?a=100&b=200 and http://localhost:8182/api/subtract?a=100&b=200. You should also get corresponding outputs (300 and –100).