diff --git a/.gitignore b/.gitignore index 1e6c9124..a145b235 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,14 @@ /target /gnupg + +# Intellij IDEA (see https://intellij-support.jetbrains.com/entries/23393067) +.idea/ +release.properties +.navigation/ +captures/ +*.iws +*.iml +*.ipr +*~ +*.swp \ No newline at end of file diff --git a/src/main/java/com/corundumstudio/socketio/AuthorizationListener.java b/src/main/java/com/corundumstudio/socketio/AuthorizationListener.java index 886a076c..44df8df0 100644 --- a/src/main/java/com/corundumstudio/socketio/AuthorizationListener.java +++ b/src/main/java/com/corundumstudio/socketio/AuthorizationListener.java @@ -18,11 +18,11 @@ public interface AuthorizationListener { /** - * Checks is client with handshake data is authorized + * Checks if client with handshake data is authorized * * @param data - handshake data - * @return - true if client is authorized of false otherwise + * @return - AuthorizationResponse */ - boolean isAuthorized(HandshakeData data); + AuthorizationResponse authorize(HandshakeData data); } diff --git a/src/main/java/com/corundumstudio/socketio/AuthorizationResponse.java b/src/main/java/com/corundumstudio/socketio/AuthorizationResponse.java new file mode 100644 index 00000000..a5c4c1da --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/AuthorizationResponse.java @@ -0,0 +1,32 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import io.netty.handler.codec.http.HttpResponseStatus; + +public abstract class AuthorizationResponse { + + private final HttpResponseStatus httpResponseStatus; + + protected AuthorizationResponse(HttpResponseStatus httpResponseStatus) { + this.httpResponseStatus = httpResponseStatus; + } + + public HttpResponseStatus getHttpResponseStatus() { + return httpResponseStatus; + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/AuthorizedResponse.java b/src/main/java/com/corundumstudio/socketio/AuthorizedResponse.java new file mode 100644 index 00000000..2550d282 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/AuthorizedResponse.java @@ -0,0 +1,56 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.AsciiString; + +import java.util.HashMap; +import java.util.Map; + +/* + * Used to authorize client and add data to the client store + */ +public class AuthorizedResponse extends AuthorizationResponse { + + private final Map clientData = new HashMap(); + + public AuthorizedResponse() { + super(HttpResponseStatus.OK); + } + + public static AuthorizedResponse OK() { + return new AuthorizedResponse(); + } + + public AuthorizedResponse setClientData(String key, Object value) { + clientData.put(key, value); + return this; + } + + public AuthorizedResponse setClientData(Map map) { + clientData.putAll(map); + return this; + } + + + public Map getClientData() { + return clientData; + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/Configuration.java b/src/main/java/com/corundumstudio/socketio/Configuration.java index d39e8c82..7b00fce4 100644 --- a/src/main/java/com/corundumstudio/socketio/Configuration.java +++ b/src/main/java/com/corundumstudio/socketio/Configuration.java @@ -15,10 +15,6 @@ */ package com.corundumstudio.socketio; -import java.io.InputStream; -import java.util.Arrays; -import java.util.List; - import com.corundumstudio.socketio.handler.SuccessAuthorizationListener; import com.corundumstudio.socketio.listener.DefaultExceptionListener; import com.corundumstudio.socketio.listener.ExceptionListener; @@ -27,6 +23,9 @@ import com.corundumstudio.socketio.store.StoreFactory; import javax.net.ssl.KeyManagerFactory; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; public class Configuration { @@ -40,8 +39,6 @@ public class Configuration { private int workerThreads = 0; // 0 = current_processors_amount * 2 private boolean useLinuxNativeEpoll; - private boolean allowCustomRequests = false; - private int upgradeTimeout = 10000; private int pingTimeout = 60000; private int pingInterval = 25000; @@ -122,7 +119,6 @@ public Configuration() { setJsonSupport(new JsonSupportWrapper(conf.getJsonSupport())); setContext(conf.getContext()); - setAllowCustomRequests(conf.isAllowCustomRequests()); setKeyStorePassword(conf.getKeyStorePassword()); setKeyStore(conf.getKeyStore()); @@ -239,22 +235,6 @@ public void setContext(String context) { this.context = context; } - public boolean isAllowCustomRequests() { - return allowCustomRequests; - } - - /** - * Allow to service custom requests differs from socket.io protocol. - * In this case it's necessary to add own handler which handle them - * to avoid hang connections. - * Default is {@code false} - * - * @param allowCustomRequests - {@code true} to allow - */ - public void setAllowCustomRequests(boolean allowCustomRequests) { - this.allowCustomRequests = allowCustomRequests; - } - /** * SSL key store password * @@ -460,7 +440,7 @@ public void setKeyManagerFactoryAlgorithm(String keyManagerFactoryAlgorithm) { /** * Set maximum websocket frame content length limit * - * @param maxContentLength + * @param maxFramePayloadLength */ public void setMaxFramePayloadLength(int maxFramePayloadLength) { this.maxFramePayloadLength = maxFramePayloadLength; diff --git a/src/main/java/com/corundumstudio/socketio/HttpParams.java b/src/main/java/com/corundumstudio/socketio/HttpParams.java new file mode 100644 index 00000000..ee7a8353 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/HttpParams.java @@ -0,0 +1,37 @@ +package com.corundumstudio.socketio; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class HttpParams { + private final Map> params; + + public HttpParams(Map> params) { + this.params = params; + } + + public Set getNames() { + return params.keySet(); + } + + public String get(String name) { + List values = getAll(name); + if (values == null || values.isEmpty()) return null; + + return values.get(0); + } + + public List getAll(String name) { + return params.get(name); + } + + public Map> asMap() { + return params; + } + + @Override + public String toString() { + return params.toString(); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/HttpRequestBody.java b/src/main/java/com/corundumstudio/socketio/HttpRequestBody.java new file mode 100644 index 00000000..9d9249a1 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/HttpRequestBody.java @@ -0,0 +1,25 @@ +package com.corundumstudio.socketio; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.util.CharsetUtil; + +import java.nio.charset.Charset; + +public class HttpRequestBody { + + private final FullHttpRequest req; + + public HttpRequestBody(FullHttpRequest req) { + this.req = req; + } + + public String toString() { + return toString(CharsetUtil.UTF_8); + } + + public String toString(Charset charset) { + ByteBuf buffer = req.content(); + return buffer.toString(charset); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/HttpRequestSignature.java b/src/main/java/com/corundumstudio/socketio/HttpRequestSignature.java new file mode 100644 index 00000000..cc35f1fc --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/HttpRequestSignature.java @@ -0,0 +1,39 @@ +package com.corundumstudio.socketio; + +import io.netty.handler.codec.http.HttpMethod; + +public class HttpRequestSignature { + private final HttpMethod httpMethod; + private final String path; + + public HttpRequestSignature(HttpMethod httpMethod, String path) { + this.httpMethod = httpMethod; + this.path = path; + } + + public HttpMethod getHttpMethod() { + return httpMethod; + } + + public String getPath() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof HttpRequestSignature)) return false; + + HttpRequestSignature that = (HttpRequestSignature) o; + + if (!httpMethod.equals(that.httpMethod)) return false; + return path.equals(that.path); + } + + @Override + public int hashCode() { + int result = httpMethod.hashCode(); + result = 31 * result + path.hashCode(); + return result; + } +} diff --git a/src/main/java/com/corundumstudio/socketio/HttpResponse.java b/src/main/java/com/corundumstudio/socketio/HttpResponse.java new file mode 100644 index 00000000..5f80dba2 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/HttpResponse.java @@ -0,0 +1,120 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.AsciiString; +import io.netty.util.CharsetUtil; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +/* + * Used to return a result from HttpListener + */ +public class HttpResponse { + + private final HttpResponseStatus httpResponseStatus; + private final HttpHeaders httpHeaders = new DefaultHttpHeaders(); + private String body; + private String contentType = "text/plain"; + private Charset charset = CharsetUtil.UTF_8; + + public HttpResponse(HttpResponseStatus httpResponseStatus) { + this.httpResponseStatus = httpResponseStatus; + } + + public static HttpResponse OK() { + return new HttpResponse(HttpResponseStatus.OK); + } + + public static HttpResponse TEMPORARY_REDIRECT(String locationUrl) { + HttpResponse authorizationResponse = new HttpResponse(HttpResponseStatus.TEMPORARY_REDIRECT); + authorizationResponse.getHeaders().add(HttpHeaderNames.LOCATION, locationUrl); + return authorizationResponse; + } + + public static HttpResponse UNAUTHORIZED() { + return new HttpResponse(HttpResponseStatus.UNAUTHORIZED); + } + + public static HttpResponse INTERNAL_SERVER_ERROR() { + return new HttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR); + } + + public HttpResponse setHeader(AsciiString name, String value) { + httpHeaders.add(name, value); + return this; + } + + public HttpResponse setHeaders(HttpHeaders headers) { + httpHeaders.setAll(headers); + return this; + } + + public HttpResponse setHeaders(Map> headers) { + for (Map.Entry> header : headers.entrySet()) { + AsciiString name = header.getKey(); + List values = header.getValue(); + for (String value : values) { + httpHeaders.set(name, value); + } + } + return this; + } + + public HttpHeaders getHeaders() { + return httpHeaders; + } + + public HttpResponse setBody(String body) { + this.body = body; + return this; + } + + public HttpResponse setBody(String body, String contentType) { + this.body = body; + this.contentType = contentType; + return this; + } + + public HttpResponse setBody(String body, String contentType, Charset charset) { + this.body = body; + this.contentType = contentType; + this.charset = charset; + return this; + } + + public HttpResponseStatus getHttpResponseStatus() { + return httpResponseStatus; + } + + public String getBody() { + return body; + } + + public String getContentType() { + return contentType; + } + + public Charset getCharset() { + return charset; + } +} diff --git a/src/main/java/com/corundumstudio/socketio/JsonSupportWrapper.java b/src/main/java/com/corundumstudio/socketio/JsonSupportWrapper.java index d93d9cb2..566cecee 100644 --- a/src/main/java/com/corundumstudio/socketio/JsonSupportWrapper.java +++ b/src/main/java/com/corundumstudio/socketio/JsonSupportWrapper.java @@ -43,7 +43,7 @@ public AckArgs readAckArgs(ByteBufInputStream src, AckCallback callback) thro return delegate.readAckArgs(src, callback); } catch (Exception e) { src.reset(); - log.error("Can't read ack args: " + src.readLine() + " for type: " + callback.getResultClass(), e); + log.error("Can't read ack args: " + src.readLine() + " for type: " + (callback != null ? callback.getResultClass() : "null"), e); throw new IOException(e); } } diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java index 988306b4..a6185104 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java @@ -23,17 +23,12 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; +import com.corundumstudio.socketio.handler.*; +import com.corundumstudio.socketio.namespace.HttpNamespace; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.corundumstudio.socketio.ack.AckManager; -import com.corundumstudio.socketio.handler.AuthorizeHandler; -import com.corundumstudio.socketio.handler.ClientHead; -import com.corundumstudio.socketio.handler.ClientsBox; -import com.corundumstudio.socketio.handler.EncoderHandler; -import com.corundumstudio.socketio.handler.InPacketHandler; -import com.corundumstudio.socketio.handler.PacketListener; -import com.corundumstudio.socketio.handler.WrongUrlHandler; import com.corundumstudio.socketio.namespace.NamespacesHub; import com.corundumstudio.socketio.protocol.JsonSupport; import com.corundumstudio.socketio.protocol.PacketDecoder; @@ -74,6 +69,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer impl public static final String SSL_HANDLER = "ssl"; public static final String RESOURCE_HANDLER = "resourceHandler"; + public static final String HTTP_REQUEST_HANDLER = "httpRequestHandler"; public static final String WRONG_URL_HANDLER = "wrongUrlBlocker"; private static final Logger log = LoggerFactory.getLogger(SocketIOChannelInitializer.class); @@ -82,6 +78,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer impl private ClientsBox clientsBox = new ClientsBox(); private AuthorizeHandler authorizeHandler; + private HttpRequestHandler httpRequestHandler; private PollingTransport xhrPollingTransport; private WebSocketTransport webSocketTransport; private WebSocketServerCompressionHandler webSocketTransportCompression = new WebSocketServerCompressionHandler(); @@ -99,7 +96,7 @@ public void handlerAdded(ChannelHandlerContext ctx) { scheduler.update(ctx); } - public void start(Configuration configuration, NamespacesHub namespacesHub) { + public void start(Configuration configuration, NamespacesHub namespacesHub, HttpNamespace httpNamespace) { this.configuration = configuration; ackManager = new AckManager(scheduler); @@ -120,14 +117,13 @@ public void start(Configuration configuration, NamespacesHub namespacesHub) { } StoreFactory factory = configuration.getStoreFactory(); - authorizeHandler = new AuthorizeHandler(connectPath, scheduler, configuration, namespacesHub, factory, this, ackManager, clientsBox); + authorizeHandler = new AuthorizeHandler(connectPath, scheduler, configuration, namespacesHub, httpNamespace, factory, this, ackManager, clientsBox); + httpRequestHandler = new HttpRequestHandler(httpNamespace); factory.init(namespacesHub, authorizeHandler, jsonSupport); - xhrPollingTransport = new PollingTransport(decoder, authorizeHandler, clientsBox); - webSocketTransport = new WebSocketTransport(isSsl, authorizeHandler, configuration, scheduler, clientsBox); + xhrPollingTransport = new PollingTransport(connectPath, decoder, authorizeHandler, clientsBox); + webSocketTransport = new WebSocketTransport(connectPath, isSsl, authorizeHandler, configuration, scheduler, clientsBox); PacketListener packetListener = new PacketListener(ackManager, namespacesHub, xhrPollingTransport, scheduler); - - packetHandler = new InPacketHandler(packetListener, decoder, namespacesHub, configuration.getExceptionListener()); try { @@ -190,6 +186,8 @@ protected Object newContinueResponse(HttpMessage start, int maxContentLength, } pipeline.addLast(WEB_SOCKET_TRANSPORT, webSocketTransport); + pipeline.addLast(HTTP_REQUEST_HANDLER, httpRequestHandler); + pipeline.addLast(SOCKETIO_ENCODER, encoderHandler); pipeline.addLast(WRONG_URL_HANDLER, wrongUrlHandler); diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java index 329d8ec3..7d8a6914 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java @@ -15,6 +15,10 @@ */ package com.corundumstudio.socketio; +import com.corundumstudio.socketio.listener.*; +import com.corundumstudio.socketio.namespace.HttpNamespace; +import com.corundumstudio.socketio.namespace.Namespace; +import com.corundumstudio.socketio.namespace.NamespacesHub; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -24,29 +28,21 @@ import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpMethod; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.Collection; import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.corundumstudio.socketio.listener.ClientListeners; -import com.corundumstudio.socketio.listener.ConnectListener; -import com.corundumstudio.socketio.listener.DataListener; -import com.corundumstudio.socketio.listener.DisconnectListener; -import com.corundumstudio.socketio.listener.MultiTypeEventListener; -import com.corundumstudio.socketio.namespace.Namespace; -import com.corundumstudio.socketio.namespace.NamespacesHub; - /** * Fully thread-safe. * */ -public class SocketIOServer implements ClientListeners { +public class SocketIOServer implements ClientListeners, HttpListeners { private static final Logger log = LoggerFactory.getLogger(SocketIOServer.class); @@ -55,6 +51,7 @@ public class SocketIOServer implements ClientListeners { private final NamespacesHub namespacesHub; private final SocketIONamespace mainNamespace; + private final HttpNamespace httpNamespace; private SocketIOChannelInitializer pipelineFactory = new SocketIOChannelInitializer(); @@ -66,6 +63,7 @@ public SocketIOServer(Configuration configuration) { this.configCopy = new Configuration(configuration); namespacesHub = new NamespacesHub(configCopy); mainNamespace = addNamespace(Namespace.DEFAULT_NAME); + httpNamespace = new HttpNamespace(configuration); } public void setPipelineFactory(SocketIOChannelInitializer pipelineFactory) { @@ -130,7 +128,7 @@ public Future startAsync() { log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory()); initGroups(); - pipelineFactory.start(configCopy, namespacesHub); + pipelineFactory.start(configCopy, namespacesHub, httpNamespace); Class channelClass = NioServerSocketChannel.class; if (configCopy.isUseLinuxNativeEpoll()) { @@ -256,5 +254,8 @@ public void addListeners(Object listeners, Class listenersClass) { mainNamespace.addListeners(listeners, listenersClass); } - + @Override + public void addHttpListener(HttpMethod method, String path, HttpListener listener) { + httpNamespace.addHttpListener(method, path, listener); + } } diff --git a/src/main/java/com/corundumstudio/socketio/UnauthorizedResponse.java b/src/main/java/com/corundumstudio/socketio/UnauthorizedResponse.java new file mode 100644 index 00000000..805d1bd3 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/UnauthorizedResponse.java @@ -0,0 +1,119 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.AsciiString; +import io.netty.util.CharsetUtil; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +/* + * Used to reject client + */ +public class UnauthorizedResponse extends AuthorizationResponse { + + private final HttpHeaders httpHeaders = new DefaultHttpHeaders(); + private String body; + private String contentType = "text/plain"; + private Charset charset = CharsetUtil.UTF_8; + + public UnauthorizedResponse(HttpResponseStatus httpResponseStatus) { + super(validateHttpResponseStatus(httpResponseStatus)); + } + + private static HttpResponseStatus validateHttpResponseStatus(HttpResponseStatus httpResponseStatus) { + if (HttpResponseStatus.OK.equals(httpResponseStatus)) { + throw new RuntimeException("Use 'AuthorizedResponse' for httpResponseStatus 'OK'."); + } + return httpResponseStatus; + } + + public static UnauthorizedResponse TEMPORARY_REDIRECT(String locationUrl) { + UnauthorizedResponse authorizationResponse = new UnauthorizedResponse(HttpResponseStatus.TEMPORARY_REDIRECT); + authorizationResponse.getHeaders().add(HttpHeaderNames.LOCATION, locationUrl); + return authorizationResponse; + } + + public static UnauthorizedResponse UNAUTHORIZED() { + return new UnauthorizedResponse(HttpResponseStatus.UNAUTHORIZED); + } + + public static UnauthorizedResponse INTERNAL_SERVER_ERROR() { + return new UnauthorizedResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR); + } + + public UnauthorizedResponse setHeader(AsciiString name, String value) { + httpHeaders.add(name, value); + return this; + } + + public UnauthorizedResponse setHeaders(HttpHeaders headers) { + httpHeaders.setAll(headers); + return this; + } + + public UnauthorizedResponse setHeaders(Map> headers) { + for (Map.Entry> header : headers.entrySet()) { + AsciiString name = header.getKey(); + List values = header.getValue(); + for (String value : values) { + httpHeaders.set(name, value); + } + } + return this; + } + + public HttpHeaders getHeaders() { + return httpHeaders; + } + + public UnauthorizedResponse setBody(String body) { + this.body = body; + return this; + } + + public UnauthorizedResponse setBody(String body, String contentType) { + this.body = body; + this.contentType = contentType; + return this; + } + + public UnauthorizedResponse setBody(String body, String contentType, Charset charset) { + this.body = body; + this.contentType = contentType; + this.charset = charset; + return this; + } + + public String getBody() { + return body; + } + + public String getContentType() { + return contentType; + } + + public Charset getCharset() { + return charset; + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java index 80bfe3f9..dfe7cff6 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java @@ -15,27 +15,10 @@ */ package com.corundumstudio.socketio.handler; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.corundumstudio.socketio.Configuration; -import com.corundumstudio.socketio.Disconnectable; -import com.corundumstudio.socketio.DisconnectableHub; -import com.corundumstudio.socketio.HandshakeData; -import com.corundumstudio.socketio.SocketIOClient; -import com.corundumstudio.socketio.Transport; +import com.corundumstudio.socketio.*; import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.messages.HttpErrorMessage; +import com.corundumstudio.socketio.namespace.HttpNamespace; import com.corundumstudio.socketio.namespace.Namespace; import com.corundumstudio.socketio.namespace.NamespacesHub; import com.corundumstudio.socketio.protocol.AuthPacket; @@ -47,19 +30,26 @@ import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.store.pubsub.ConnectMessage; import com.corundumstudio.socketio.store.pubsub.PubSubType; - +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.http.DefaultHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.QueryStringDecoder; +import io.netty.handler.codec.http.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; @Sharable public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Disconnectable { @@ -71,18 +61,20 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di private final String connectPath; private final Configuration configuration; private final NamespacesHub namespacesHub; + private final HttpNamespace httpNamespace; private final StoreFactory storeFactory; private final DisconnectableHub disconnectable; private final AckManager ackManager; private final ClientsBox clientsBox; - public AuthorizeHandler(String connectPath, CancelableScheduler scheduler, Configuration configuration, NamespacesHub namespacesHub, StoreFactory storeFactory, + public AuthorizeHandler(String connectPath, CancelableScheduler scheduler, Configuration configuration, NamespacesHub namespacesHub, HttpNamespace httpNamespace, StoreFactory storeFactory, DisconnectableHub disconnectable, AckManager ackManager, ClientsBox clientsBox) { super(); this.connectPath = connectPath; this.configuration = configuration; this.disconnectScheduler = scheduler; this.namespacesHub = namespacesHub; + this.httpNamespace = httpNamespace; this.storeFactory = storeFactory; this.disconnectable = disconnectable; this.ackManager = ackManager; @@ -112,9 +104,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception Channel channel = ctx.channel(); QueryStringDecoder queryDecoder = new QueryStringDecoder(req.uri()); - if (!configuration.isAllowCustomRequests() - && !queryDecoder.path().startsWith(connectPath)) { - HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST); + if (!httpNamespace.hasListeners() && !queryDecoder.path().startsWith(connectPath)) { + io.netty.handler.codec.http.HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST); channel.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE); req.release(); return; @@ -142,32 +133,52 @@ private boolean authorize(ChannelHandlerContext ctx, Channel channel, String ori headers.put(name, values); } - HandshakeData data = new HandshakeData(req.headers(), params, + HandshakeData handshakeData = new HandshakeData(req.headers(), params, (InetSocketAddress)channel.remoteAddress(), req.uri(), origin != null && !origin.equalsIgnoreCase("null")); - boolean result = false; + AuthorizationResponse authorizationResponse = null; try { - result = configuration.getAuthorizationListener().isAuthorized(data); + authorizationResponse = configuration.getAuthorizationListener().authorize(handshakeData); } catch (Exception e) { - log.error("Authorization error", e); + log.warn("AuthorizationListener threw exception.", e); } + if (authorizationResponse == null) { + authorizationResponse = UnauthorizedResponse.UNAUTHORIZED(); + } + + if (authorizationResponse instanceof UnauthorizedResponse) { + // unauthorized + UnauthorizedResponse unauthorizedResponse = (UnauthorizedResponse) authorizationResponse; + + DefaultFullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, unauthorizedResponse.getHttpResponseStatus()); + if (unauthorizedResponse.getBody() != null) { + ByteBuf buf = Unpooled.copiedBuffer(unauthorizedResponse.getBody(), unauthorizedResponse.getCharset()); + res.content().writeBytes(buf); + buf.release(); + res.headers().set(HttpHeaderNames.CONTENT_TYPE, unauthorizedResponse.getContentType() + "; charset=" + unauthorizedResponse.getCharset().displayName().toLowerCase()); + res.headers().set(HttpHeaderNames.CONTENT_LENGTH, res.content().readableBytes()); + } + if (unauthorizedResponse.getHeaders() != null) { + res.headers().add(unauthorizedResponse.getHeaders()); + } - if (!result) { - HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.UNAUTHORIZED); - channel.writeAndFlush(res) - .addListener(ChannelFutureListener.CLOSE); - log.debug("Handshake unauthorized, query params: {} headers: {}", params, headers); + channel.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE); + log.debug("Handshake UNAUTHORIZED, query params: {} headers: {}", params, headers); return false; } + // authorized + AuthorizedResponse authorizedResponse = (AuthorizedResponse) authorizationResponse; + Map storeData = authorizedResponse.getClientData(); + UUID sessionId = this.generateOrGetSessionIdFromRequest(req.headers()); List transportValue = params.get("transport"); if (transportValue == null) { log.error("Got no transports for request {}", req.uri()); - HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.UNAUTHORIZED); + io.netty.handler.codec.http.HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.UNAUTHORIZED); channel.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE); return false; } @@ -183,7 +194,8 @@ private boolean authorize(ChannelHandlerContext ctx, Channel channel, String ori return false; } - ClientHead client = new ClientHead(sessionId, ackManager, disconnectable, storeFactory, data, clientsBox, transport, disconnectScheduler, configuration); + ClientHead client = new ClientHead(sessionId, ackManager, disconnectable, storeFactory, storeData, + handshakeData, clientsBox, transport, disconnectScheduler, configuration); channel.attr(ClientHead.CLIENT).set(client); clientsBox.addClient(client); @@ -192,8 +204,7 @@ private boolean authorize(ChannelHandlerContext ctx, Channel channel, String ori transports = new String[] {"websocket"}; } - AuthPacket authPacket = new AuthPacket(sessionId, transports, configuration.getPingInterval(), - configuration.getPingTimeout()); + AuthPacket authPacket = new AuthPacket(sessionId, transports, configuration.getPingInterval(), configuration.getPingTimeout()); Packet packet = new Packet(PacketType.OPEN); packet.setData(authPacket); client.send(packet); diff --git a/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java b/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java index 82cd8600..6cadf6ff 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java +++ b/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java @@ -15,23 +15,7 @@ */ package com.corundumstudio.socketio.handler; -import java.net.SocketAddress; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.corundumstudio.socketio.Configuration; -import com.corundumstudio.socketio.DisconnectableHub; -import com.corundumstudio.socketio.HandshakeData; -import com.corundumstudio.socketio.Transport; +import com.corundumstudio.socketio.*; import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.messages.OutPacketMessage; import com.corundumstudio.socketio.namespace.Namespace; @@ -43,13 +27,20 @@ import com.corundumstudio.socketio.store.Store; import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.transport.NamespaceClient; - import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.util.AttributeKey; import io.netty.util.internal.PlatformDependent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.SocketAddress; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class ClientHead { @@ -76,8 +67,8 @@ public class ClientHead { private volatile Transport currentTransport; public ClientHead(UUID sessionId, AckManager ackManager, DisconnectableHub disconnectable, - StoreFactory storeFactory, HandshakeData handshakeData, ClientsBox clientsBox, Transport transport, CancelableScheduler disconnectScheduler, - Configuration configuration) { + StoreFactory storeFactory, Map storeData, HandshakeData handshakeData, ClientsBox clientsBox, Transport transport, CancelableScheduler disconnectScheduler, + Configuration configuration) { this.sessionId = sessionId; this.ackManager = ackManager; this.disconnectableHub = disconnectable; @@ -88,6 +79,10 @@ public ClientHead(UUID sessionId, AckManager ackManager, DisconnectableHub disco this.disconnectScheduler = disconnectScheduler; this.configuration = configuration; + for (Map.Entry entry : storeData.entrySet()) { + this.store.set(entry.getKey(), entry.getValue()); + } + channels.put(Transport.POLLING, new TransportState()); channels.put(Transport.WEBSOCKET, new TransportState()); } @@ -263,5 +258,4 @@ public void setLastBinaryPacket(Packet lastBinaryPacket) { public Packet getLastBinaryPacket() { return lastBinaryPacket; } - } diff --git a/src/main/java/com/corundumstudio/socketio/handler/HttpRequestHandler.java b/src/main/java/com/corundumstudio/socketio/handler/HttpRequestHandler.java new file mode 100644 index 00000000..48d2b9be --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/handler/HttpRequestHandler.java @@ -0,0 +1,88 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.handler; + +import com.corundumstudio.socketio.HttpParams; +import com.corundumstudio.socketio.HttpRequestBody; +import com.corundumstudio.socketio.HttpRequestSignature; +import com.corundumstudio.socketio.HttpResponse; +import com.corundumstudio.socketio.namespace.HttpNamespace; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.handler.codec.http.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +@Sharable +public class HttpRequestHandler extends ChannelInboundHandlerAdapter { + + private static final Logger log = LoggerFactory.getLogger(HttpRequestHandler.class); + + private final HttpNamespace httpNamespace; + + public HttpRequestHandler(HttpNamespace httpNamespace) { + this.httpNamespace = httpNamespace; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof FullHttpRequest && httpNamespace.hasListeners()) { + FullHttpRequest req = (FullHttpRequest) msg; + Channel channel = ctx.channel(); + QueryStringDecoder queryDecoder = new QueryStringDecoder(req.uri()); + + HttpMethod method = req.method(); + String path = queryDecoder.path(); + HttpParams params = new HttpParams(queryDecoder.parameters()); + HttpHeaders headers = req.headers(); + HttpRequestBody body = new HttpRequestBody(req); + + HttpRequestSignature httpRequestSignature = new HttpRequestSignature(method, path); + HttpResponse httpResponse = null; + try { + httpResponse = httpNamespace.onRequest(httpRequestSignature, params, headers, body); + } catch (Exception e) { + log.warn("HttpListener threw exception.", e); + } + if (httpResponse != null) { + DefaultFullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, httpResponse.getHttpResponseStatus()); + if (httpResponse.getBody() != null) { + ByteBuf buf = Unpooled.wrappedBuffer(httpResponse.getBody().getBytes(httpResponse.getCharset())); + res.content().writeBytes(buf); + buf.release(); + res.headers().set(HttpHeaderNames.CONTENT_TYPE, httpResponse.getContentType() + "; charset=" + httpResponse.getCharset().displayName().toLowerCase()); + res.headers().set(HttpHeaderNames.CONTENT_LENGTH, res.content().readableBytes()); + } + if (httpResponse.getHeaders() != null) { + res.headers().add(httpResponse.getHeaders()); + } + + channel.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE); + + log.debug("Http response, query params: {} headers: {}", params, headers); + return; + } + } + super.channelRead(ctx, msg); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/handler/SuccessAuthorizationListener.java b/src/main/java/com/corundumstudio/socketio/handler/SuccessAuthorizationListener.java index dfded95c..44fe3e9f 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/SuccessAuthorizationListener.java +++ b/src/main/java/com/corundumstudio/socketio/handler/SuccessAuthorizationListener.java @@ -16,13 +16,15 @@ package com.corundumstudio.socketio.handler; import com.corundumstudio.socketio.AuthorizationListener; +import com.corundumstudio.socketio.AuthorizationResponse; +import com.corundumstudio.socketio.AuthorizedResponse; import com.corundumstudio.socketio.HandshakeData; public class SuccessAuthorizationListener implements AuthorizationListener { @Override - public boolean isAuthorized(HandshakeData data) { - return true; + public AuthorizationResponse authorize(HandshakeData data) { + return AuthorizedResponse.OK(); } } diff --git a/src/main/java/com/corundumstudio/socketio/handler/WrongUrlHandler.java b/src/main/java/com/corundumstudio/socketio/handler/WrongUrlHandler.java index 7322b644..d70d71c3 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/WrongUrlHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/WrongUrlHandler.java @@ -44,11 +44,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception Channel channel = ctx.channel(); QueryStringDecoder queryDecoder = new QueryStringDecoder(req.uri()); - HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST); + HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NOT_FOUND); ChannelFuture f = channel.writeAndFlush(res); f.addListener(ChannelFutureListener.CLOSE); req.release(); - log.warn("Blocked wrong socket.io-context request! url: {}, params: {}, ip: {}", queryDecoder.path(), queryDecoder.parameters(), channel.remoteAddress()); + log.warn("Blocked wrong path url: {}, params: {}, ip: {}", queryDecoder.path(), queryDecoder.parameters(), channel.remoteAddress()); return; } super.channelRead(ctx, msg); diff --git a/src/main/java/com/corundumstudio/socketio/listener/DefaultExceptionListener.java b/src/main/java/com/corundumstudio/socketio/listener/DefaultExceptionListener.java index 684195b3..479cd762 100644 --- a/src/main/java/com/corundumstudio/socketio/listener/DefaultExceptionListener.java +++ b/src/main/java/com/corundumstudio/socketio/listener/DefaultExceptionListener.java @@ -15,14 +15,13 @@ */ package com.corundumstudio.socketio.listener; +import com.corundumstudio.socketio.HttpRequestSignature; +import com.corundumstudio.socketio.SocketIOClient; import io.netty.channel.ChannelHandlerContext; - -import java.util.List; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.corundumstudio.socketio.SocketIOClient; +import java.util.List; public class DefaultExceptionListener extends ExceptionListenerAdapter { @@ -33,6 +32,11 @@ public void onEventException(Exception e, List args, SocketIOClient clie log.error(e.getMessage(), e); } + @Override + public void onHttpException(Exception e, HttpRequestSignature signature) { + log.error(e.getMessage(), e); + } + @Override public void onDisconnectException(Exception e, SocketIOClient client) { log.error(e.getMessage(), e); diff --git a/src/main/java/com/corundumstudio/socketio/listener/ExceptionListener.java b/src/main/java/com/corundumstudio/socketio/listener/ExceptionListener.java index 8d50038e..86bfa6b0 100644 --- a/src/main/java/com/corundumstudio/socketio/listener/ExceptionListener.java +++ b/src/main/java/com/corundumstudio/socketio/listener/ExceptionListener.java @@ -15,16 +15,18 @@ */ package com.corundumstudio.socketio.listener; +import com.corundumstudio.socketio.HttpRequestSignature; +import com.corundumstudio.socketio.SocketIOClient; import io.netty.channel.ChannelHandlerContext; import java.util.List; -import com.corundumstudio.socketio.SocketIOClient; - public interface ExceptionListener { void onEventException(Exception e, List args, SocketIOClient client); + void onHttpException(Exception e, HttpRequestSignature signature); + void onDisconnectException(Exception e, SocketIOClient client); void onConnectException(Exception e, SocketIOClient client); diff --git a/src/main/java/com/corundumstudio/socketio/listener/ExceptionListenerAdapter.java b/src/main/java/com/corundumstudio/socketio/listener/ExceptionListenerAdapter.java index 89efc0f5..84656080 100644 --- a/src/main/java/com/corundumstudio/socketio/listener/ExceptionListenerAdapter.java +++ b/src/main/java/com/corundumstudio/socketio/listener/ExceptionListenerAdapter.java @@ -15,12 +15,12 @@ */ package com.corundumstudio.socketio.listener; +import com.corundumstudio.socketio.HttpRequestSignature; +import com.corundumstudio.socketio.SocketIOClient; import io.netty.channel.ChannelHandlerContext; import java.util.List; -import com.corundumstudio.socketio.SocketIOClient; - /** * Base callback exceptions listener * @@ -32,6 +32,10 @@ public abstract class ExceptionListenerAdapter implements ExceptionListener { public void onEventException(Exception e, List data, SocketIOClient client) { } + @Override + public void onHttpException(Exception e, HttpRequestSignature signature) { + } + @Override public void onDisconnectException(Exception e, SocketIOClient client) { } diff --git a/src/main/java/com/corundumstudio/socketio/listener/HttpListener.java b/src/main/java/com/corundumstudio/socketio/listener/HttpListener.java new file mode 100644 index 00000000..b30fc103 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/listener/HttpListener.java @@ -0,0 +1,13 @@ +package com.corundumstudio.socketio.listener; + +import com.corundumstudio.socketio.HttpParams; +import com.corundumstudio.socketio.HttpRequestBody; +import com.corundumstudio.socketio.HttpRequestSignature; +import com.corundumstudio.socketio.HttpResponse; +import io.netty.handler.codec.http.HttpHeaders; + +public interface HttpListener { + + HttpResponse onRequest(HttpRequestSignature signature, HttpParams params, HttpHeaders headers, HttpRequestBody body) throws Exception; + +} diff --git a/src/main/java/com/corundumstudio/socketio/listener/HttpListeners.java b/src/main/java/com/corundumstudio/socketio/listener/HttpListeners.java new file mode 100644 index 00000000..43f712b4 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/listener/HttpListeners.java @@ -0,0 +1,24 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.listener; + +import io.netty.handler.codec.http.HttpMethod; + +public interface HttpListeners { + + void addHttpListener(HttpMethod method, String path, HttpListener listener); + +} diff --git a/src/main/java/com/corundumstudio/socketio/namespace/HttpNamespace.java b/src/main/java/com/corundumstudio/socketio/namespace/HttpNamespace.java new file mode 100644 index 00000000..6a716d97 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/namespace/HttpNamespace.java @@ -0,0 +1,43 @@ +package com.corundumstudio.socketio.namespace; + +import com.corundumstudio.socketio.*; +import com.corundumstudio.socketio.listener.ExceptionListener; +import com.corundumstudio.socketio.listener.HttpListener; +import com.corundumstudio.socketio.listener.HttpListeners; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.util.internal.PlatformDependent; + +import java.util.concurrent.ConcurrentMap; + +public class HttpNamespace implements HttpListeners { + + private final ConcurrentMap httpListeners = PlatformDependent.newConcurrentHashMap(); + private final ExceptionListener exceptionListener; + + public HttpNamespace(Configuration configuration) { + this.exceptionListener = configuration.getExceptionListener(); + } + + public void addHttpListener(HttpMethod method, String path, HttpListener listener) { + HttpRequestSignature signature = new HttpRequestSignature(method, path); + httpListeners.put(signature, listener); + } + + public boolean hasListeners() { + return !httpListeners.isEmpty(); + } + + public HttpResponse onRequest(HttpRequestSignature httpRequestSignature, HttpParams params, HttpHeaders headers, HttpRequestBody body) { + HttpListener httpListener = httpListeners.get(httpRequestSignature); + if (httpListener == null) return null; + + try { + return httpListener.onRequest(httpRequestSignature, params, headers, body); + } catch (Exception e) { + exceptionListener.onHttpException(e, httpRequestSignature); + return HttpResponse.INTERNAL_SERVER_ERROR(); + } + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java b/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java index c3e838cd..47576d0d 100644 --- a/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java +++ b/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java @@ -283,7 +283,7 @@ private void parseBody(ClientHead head, ByteBuf frame, Packet packet) throws IOE if (packet.getType() == PacketType.MESSAGE) { if (packet.getSubType() == PacketType.CONNECT || packet.getSubType() == PacketType.DISCONNECT) { - packet.setNsp(readString(frame)); + packet.setNsp(readNamespace(frame)); } if (packet.hasAttachments() && !packet.isAttachmentsLoaded()) { @@ -314,4 +314,21 @@ private void parseBody(ClientHead head, ByteBuf frame, Packet packet) throws IOE } } + private String readNamespace(ByteBuf frame){ + /** + * namespace post request with url queryString, like + * /message?a=1, + * /message, + */ + int endIndex = frame.bytesBefore((byte)'?'); + if(endIndex > 0){ + return readString(frame, endIndex); + } + endIndex = frame.bytesBefore((byte)','); + if(endIndex > 0){ + return readString(frame, endIndex); + } + return readString(frame); + } + } diff --git a/src/main/java/com/corundumstudio/socketio/transport/PollingTransport.java b/src/main/java/com/corundumstudio/socketio/transport/PollingTransport.java index 2f70ce21..f1a7f923 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/PollingTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/PollingTransport.java @@ -58,7 +58,10 @@ public class PollingTransport extends ChannelInboundHandlerAdapter { private final ClientsBox clientsBox; private final AuthorizeHandler authorizeHandler; - public PollingTransport(PacketDecoder decoder, AuthorizeHandler authorizeHandler, ClientsBox clientsBox) { + private final String connectPath; + + public PollingTransport(String connectPath, PacketDecoder decoder, AuthorizeHandler authorizeHandler, ClientsBox clientsBox) { + this.connectPath = connectPath; this.decoder = decoder; this.authorizeHandler = authorizeHandler; this.clientsBox = clientsBox; @@ -69,10 +72,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (msg instanceof FullHttpRequest) { FullHttpRequest req = (FullHttpRequest) msg; QueryStringDecoder queryDecoder = new QueryStringDecoder(req.uri()); - + String path = queryDecoder.path(); List transport = queryDecoder.parameters().get("transport"); - if (transport != null && NAME.equals(transport.get(0))) { + if (transport != null && NAME.equals(transport.get(0)) && path.startsWith(connectPath)) { List sid = queryDecoder.parameters().get("sid"); List j = queryDecoder.parameters().get("j"); List b64 = queryDecoder.parameters().get("b64"); @@ -88,7 +91,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.channel().attr(EncoderHandler.JSONP_INDEX).set(index); } if (b64 != null && b64.get(0) != null) { - Integer enable = Integer.valueOf(b64.get(0)); + int enable = Integer.valueOf(b64.get(0)); ctx.channel().attr(EncoderHandler.B64).set(enable == 1); } diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java index 691bcb7a..5a611b14 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java @@ -63,11 +63,13 @@ public class WebSocketTransport extends ChannelInboundHandlerAdapter { private final Configuration configuration; private final ClientsBox clientsBox; + private final String connectPath; private final boolean isSsl; - public WebSocketTransport(boolean isSsl, + public WebSocketTransport(String connectPath, boolean isSsl, AuthorizeHandler authorizeHandler, Configuration configuration, CancelableScheduler scheduler, ClientsBox clientsBox) { + this.connectPath = connectPath; this.isSsl = isSsl; this.authorizeHandler = authorizeHandler; this.configuration = configuration; @@ -100,7 +102,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception List transport = queryDecoder.parameters().get("transport"); List sid = queryDecoder.parameters().get("sid"); - if (transport != null && NAME.equals(transport.get(0))) { + if (transport != null && NAME.equals(transport.get(0)) && path.startsWith(connectPath)) { try { if (!configuration.getTransports().contains(Transport.WEBSOCKET)) { log.debug("{} transport not supported by configuration.", Transport.WEBSOCKET); @@ -201,7 +203,7 @@ public void run() { }, configuration.getUpgradeTimeout(), TimeUnit.MILLISECONDS); } - log.debug("сlient {} handshake completed", sessionId); + log.debug("client {} handshake completed", sessionId); } private String getWebSocketLocation(HttpRequest req) {