La position actuelle:Accueil du site>Résumé du websocket minier

Résumé du websocket minier

2022-07-24 00:24:38Ne t'arrête pas.

Besoins:

  1. Oui.NginxAgents(Ne pas exposer le numéro de port)
  2. CréationnamespaceÉcouter(Nom de l'événement:connectTask,namespaceNom:/remote)
  3. .La logique de service peut être interrompue à tout moment,Mais ne pas fermer le client(Exécution asynchrone de la logique de service)
  4. Même navigateur différentes pages s'ouvrent pour créer un nouveau client(sessionC'est différent.)

socketioFosse minière

Version

 <!--netty socketio-->
      <dependency>
        <groupId>com.corundumstudio.socketio</groupId>
        <artifactId>netty-socketio</artifactId>
        <version>1.7.18</version>
      </dependency>

 properties

netty.io.host=0.0.0.0
netty.io.port=9092
# Définir la longueur maximale des données traitées par image,Empêcher d'autres d'utiliser le Big Data pour attaquer le serveur
socketio.maxFramePayloadLength=1048576
# ParamètreshttpLongueur maximale du contenu interactif
socketio.maxHttpContentLength=1048576
# socketNombre de connexions taille(Si vous n'écoutez qu'un seul portbossLe Groupe de thread est1C'est tout.)
socketio.bossCount=1
socketio.workCount=100
socketio.allowCustomRequests=true
# Délai de mise à jour du Protocole(MS),Par défaut10Secondes.HTTPPoignée de main mise à jour àwsDélai d'exécution du Protocole
socketio.upgradeTimeout=10000
# PingMessage timeout(MS),Par défaut60Secondes,Aucun message de battement de cœur n'est reçu dans cet intervalle de temps et un événement de temporisation est envoyé
socketio.pingTimeout=60000
# PingIntervalle de message(MS),Par défaut10Secondes.Le client envoie un intervalle de battements de cœur au serveur
socketio.pingInterval=10000

Configurer la classe

Même navigateur différentes pages s'ouvrent pour créer un nouveau client, Donc chaque demande doit créer un aléatoire session id.

Fosse

Il y a un trou ici, Les anciennes versions comme 1.7.14, Pour générer au hasard session id Il faut réécrire la logique

1.LocalAuthorizeHandler extends AuthorizeHandler------->
Et réécrire la méthode:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
}
 Et ensuite réaliser le hasard UUIDGénération de

2.class LocalSocketIoChannelInitializer extends SocketIOChannelInitializer------->
 La réécriture nécessite un réglage à l'intérieur AuthorizeHandlerMéthode,Remplacer par:LocalAuthorizeHandler 
3.InconfigLieutenant - généralLocalSocketIoChannelInitializer setÀSocketIOServer À l'intérieur,Comme ça
SocketIOServer server = new SocketIOServer(config);
server.setPipelineFactory(new SksSocketIoChannelInitializer());

 C'est tout.lowDieu soit loué, Enfin, découvrez avec une nouvelle version seulement dans configMoyennesetRandomSession(true)C'est fait.

config.setRandomSession(true);

 configCatégorie

@org.springframework.context.annotation.Configuration
public class SocketIoConfig {
    private final Logger LOGGER = LoggerFactory.getLogger(SocketIoConfig.class);


    @Value("${netty.io.host}")
    private String host;

    @Value("${netty.io.port}")
    private Integer port;

    @Value("${socketio.bossCount}")
    private int bossCount;

    @Value("${socketio.workCount}")
    private int workCount;

    @Value("${socketio.upgradeTimeout}")
    private int upgradeTimeout;

    @Value("${socketio.pingTimeout}")
    private int pingTimeout;

    @Value("${socketio.pingInterval}")
    private int pingInterval;

    @Bean
    public SocketIOServer socketIOServer() {
        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setTcpNoDelay(true);
        Configuration config = new Configuration();
        config.setRandomSession(true);
        config.setAllowCustomRequests(true);
        config.setSocketConfig(socketConfig);
        config.setPort(port);
        config.setHostname(host);
        config.setBossThreads(bossCount);
        config.setWorkerThreads(workCount);
        config.setPingTimeout(pingTimeout);
        config.setPingInterval(pingInterval);
        config.setUpgradeTimeout(upgradeTimeout);
        config.setMaxHttpContentLength(maxHttpContentLength);
        //Si nécessaireNginxAgents, Un chemin doit être ajouté /localPathDifférencierwebsocket, Sinon, le chemin par défaut est /socket.ioC'est dur.
        config.setContext("/localPath" + config.getContext());
        config.setMaxFramePayloadLength(maxFramePayloadLength);
        // Il peut être utilisé pour l'authentification 
        config.setAuthorizationListener(new AuthorizationListener() {
            @Override
            public boolean isAuthorized(HandshakeData data) {
               ........
        });
        return new SocketIOServer(config);
    }

}

 WebSocketHandler Catégorie

Ajouternamespace, Il n'y a pas de room.namespace C'est comme classer les connexions , Faciliter la gestion détaillée et la diffusion . Je ne vais pas plus loin namespaceEtroom

@Component
public class WebSocketHandler {
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    private final SocketIOServer socketIOServer;
    private final ConnectService connectService;
    public final static String DEV_CONNECT = "connectTask";

    /**
     * LiensclientEtwebSocket session
     */
    public static final Map<UUID, SocketIOClient> CLIENT_MAP = new ConcurrentHashMap<>();

    public WebSocketHandler(SocketIOServer socketIOServer,ConnectService connectService) {
        this.socketIOServer = socketIOServer;
        this.connectService= connectService;
    }

    @PostConstruct
    public void autoStart() {
        this.start();
    }

    @PreDestroy
    private void onDestroy() {
        if (socketIOServer != null) {
            socketIOServer.stop();
        }
    }


    public void start() {
        socketIOServer.addConnectListener(client -> {
            if (CLIENT_MAP.containsKey(client.getSessionId())) {
                client.disconnect();
            } else {
                CLIENT_MAP.put(client.getSessionId(), client);
            }
        });
        socketIOServer.addDisconnectListener(client -> {
            remoteWebHandler.onDisconnect(client);
            remoteWebHandler.cleanContext(client);
            log.info("clean client:{}", client.getSessionId());
            client.disconnect();
        });
        socketIOServer.start();
        log.info("start finish");
        addOnDataNamespace();
    }


    private void addOnDataNamespace() {
        final SocketIONamespace namespace = socketIOServer.addNamespace("/remote");
        namespace.addEventListener(WebSocketHandler.DEV_CONNECT, String.class, new DataListener<String>() {
            /**
             * Invokes when data object received from client
             *
             * @param client    - receiver
             * @param data      - received object
             * @param ackSender - ack request
             * @throws Exception
             */
            @Override
            public void onData(SocketIOClient client, String data, AckRequest ackSender) {
                try {
                    connectService.onData(client, data);
                } catch (Exception e) {
                    log.error("onData error", e);
                }
            }
        });
    }
}

service La classe a écrit un peu de logique

connectMessage de classe

  1. Utiliser un pool de Threads pour implémenter la logique de corrélation d'exécution asynchrone
  2. CacheFuture Attendre l'information comme contexte
  3. Logique d'exécution

disconnectMessage de classe

  1. 1. Définir le drapeau d'interruption pour le contexte isStopPourtrue()
  2. 2.future.cancel(true)Annuler la tâche
  • Paramètrestrue: Si la tâche a été exécutée , Vous essayez d'interrompre le traitement .( Le traitement des interruptions modifie le BIT du drapeau d'interruption , La Mission devrait: JugementisInterrupted()Ou Utilisation pendant la Mission sleep, Pour que la Mission puisse être interrompue )
  • Paramètresfalse: Si la tâche a été exécutée , Attendez que la tâche soit terminée . Si c'est une mission sans fil , Ça ne s'arrêtera pas .

3.Envoyer un message au client, Si vous voulez fermer la connexion client.disconnect(); Et effacer le cache contextuel

@Component
public class DeviceRemoteWebHandler {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ThreadPoolTaskExecutor threadPool;
    private Map<UUID, RemoteWebContext> remoteWebContexts = new ConcurrentHashMap<>();

    public DeviceRemoteWebHandler(@Qualifier(BeanConsts.USER_DATA_EXECUTOR_SERVICE) ThreadPoolTaskExecutor threadPool) {

        this.threadPool = threadPool;
    }


    public void onData(final SocketIOClient client, final String data) {
        getContext(client);
        ClientMessage message = JsonUtils.deserializeJson(JsonUtils.getObjectMapper(), data, ClientMessage.class);
        if (message == null) {
            log.info("client message is empty");
            return;
        }
		//remoteDeconnectInformation
        if (connect message) {
            try {
                connect(client, message.getData());
            } catch (InterruptedException i) {
                return;
            } catch (Exception e) {
                log.error("connect event failed", e);
                sendMsgToClient(client, getFailResult(DeviceConnectResultType.SERVER_FAIL, client));
                return;
            }
        }
		//remoteDedisconnectInformation
        if (disconnect message) {
            onDisconnect(client);
        }
    }

    private RemoteWebContext getContext(final SocketIOClient client) {
        if (remoteWebContexts.containsKey(client.getSessionId())) {
            return remoteWebContexts.get(client.getSessionId());
        }
        RemoteWebContext remoteWebContext = new RemoteWebContext(client.getSessionId());
        remoteWebContexts.put(client.getSessionId(), remoteWebContext);
        return remoteWebContext;
    }

    public void cleanContext(final SocketIOClient client) {
        if (remoteWebContexts.containsKey(client.getSessionId())) {
            log.info("remove remoteWebContext by sessionId:{}", client.getSessionId());
            remoteWebContexts.remove(client.getSessionId());
        }
    }

    public void onDisconnect(final SocketIOClient client) {
        if (remoteWebContexts.containsKey(client.getSessionId())) {
            RemoteWebContext context = getContext(client);
            context.setIsStop();
            Future future = remoteWebContexts.get(client.getSessionId()).getFuture();
            log.info("Try to Interrupt if running");
            if (future == null) {
               //Traitement vide
                return;
            }
            try {
                future.cancel(true);
            } catch (Exception e) {
                log.error("DisConnect Failed.", e);
                return;
            }
            sendMsgToClient(client);
        }
    }


    /**
     *  Post - traitement de l'établissement de la connexion 
     */
    public void connect(final SocketIOClient client, final DeviceConnectModel connectData) throws InterruptedException {
        log.info("Paramètre reçu:{}", client.getHandshakeData().getUrlParams());
        Future<?> deviceConnectFuture = threadPool.submit(() -> {
           @Todo  Logique nécessitant une exécution asynchrone 
        });
		//C'est exact.FuturePour mettre en cache
        remoteWebContexts.put(client.getSessionId(), new RemoteWebContext(deviceConnectFuture));
    }



    /**
     * send msg
     *
     * @param client socket.io client
     */
    private void sendMsgToClient(SocketIOClient client, DeviceConnectResult connectResult) {
        MessageBase<DeviceConnectResult> result = new MessageBase(configInstance.getDeviceConnectType(), connectResult);
        log.info("starting send connect message,sid={}\nvalue {}", client.getSessionId(), result);
        try {
            client.sendEvent(WebSocketHandler.DEV_CONNECT, result);
        } catch (Exception e) {
            log.error("send message {} to client failed", result, e);
            throw new RuntimeException(e.getMessage(), e);
        }
    }

  
}

Mentions de copyright
Auteur de cet article [Ne t'arrête pas.],Réimpression s’il vous plaît apporter le lien vers l’original, merci
https://fra.chowdera.com/2022/205/202207240024094053.html

Recommandé au hasard