博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty reactor线程模型分析
阅读量:7002 次
发布时间:2019-06-27

本文共 16981 字,大约阅读时间需要 56 分钟。

netty4线程模型

ServerBootstrap http示例

// Configure the server.        EventLoopGroup bossGroup = new EpollEventLoopGroup(1);        EventLoopGroup workerGroup = new EpollEventLoopGroup();                try {            ServerBootstrap b = new ServerBootstrap();            b.channel(EpollServerSocketChannel.class);            b.option(ChannelOption.SO_BACKLOG, 1024);            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);            b.group(bossGroup, workerGroup)            // .handler(new LoggingHandler(LogLevel.INFO))             .childHandler(new HttpHelloWorldServerInitializer(sslCtx));            Channel ch = b.bind(PORT).sync().channel();/*            System.err.println("Open your web browser and navigate to " +                    (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');*/            ch.closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }

绑定过程:

private ChannelFuture doBind(final SocketAddress localAddress) {        final ChannelFuture regFuture = initAndRegister();        final Channel channel = regFuture.channel();        if (regFuture.cause() != null) {            return regFuture;        }        if (regFuture.isDone()) {            // At this point we know that the registration was complete and successful.            ChannelPromise promise = channel.newPromise();            doBind0(regFuture, channel, localAddress, promise);            return promise;        } else {            // Registration future is almost always fulfilled already, but just in case it's not.            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);            regFuture.addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture future) throws Exception {                    Throwable cause = future.cause();                    if (cause != null) {                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an                        // IllegalStateException once we try to access the EventLoop of the Channel.                        promise.setFailure(cause);                    } else {                        // Registration was successful, so set the correct executor to use.                        // See https://github.com/netty/netty/issues/2586                        promise.executor = channel.eventLoop();                    }                    doBind0(regFuture, channel, localAddress, promise);                }            });            return promise;        }    }

初始化过程:

final ChannelFuture initAndRegister() {        final Channel channel = channelFactory().newChannel();        try {            init(channel);        } catch (Throwable t) {            channel.unsafe().closeForcibly();            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);        }        ChannelFuture regFuture = group().register(channel);        if (regFuture.cause() != null) {            if (channel.isRegistered()) {                channel.close();            } else {                channel.unsafe().closeForcibly();            }        }        // If we are here and the promise is not failed, it's one of the following cases:        // 1) If we attempted registration from the event loop, the registration has been completed at this point.        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.        // 2) If we attempted registration from the other thread, the registration request has been successfully        //    added to the event loop's task queue for later execution.        //    i.e. It's safe to attempt bind() or connect() now:        //         because bind() or connect() will be executed *after* the scheduled registration task is executed        //         because register(), bind(), and connect() are all bound to the same thread.        return regFuture;    }

ServerBootStrap的初始化过程:

@Override    void init(Channel channel) throws Exception {        final Map
, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map
, Object> attrs = attrs(); synchronized (attrs) { for (Entry
, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey
key = (AttributeKey) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry
, Object>[] currentChildOptions; final Entry
, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer
() { @Override public void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = handler(); if (handler != null) { pipeline.addLast(handler); } pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }

接收器ServerBootstrapAcceptor

@Override        @SuppressWarnings("unchecked")        public void channelRead(ChannelHandlerContext ctx, Object msg) {            final Channel child = (Channel) msg;            child.pipeline().addLast(childHandler);            for (Entry
, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption
) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry
, Object> e: childAttrs) { child.attr((AttributeKey
) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }

ThreadPerChannelEventLoopGroup实现注册

@Override    public ChannelFuture register(Channel channel) {        if (channel == null) {            throw new NullPointerException("channel");        }        try {            EventLoop l = nextChild();            return l.register(channel, new DefaultChannelPromise(channel, l));        } catch (Throwable t) {            return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);        }    }

获取子eventLoop

private EventLoop nextChild() throws Exception {        if (shuttingDown) {            throw new RejectedExecutionException("shutting down");        }        EventLoop loop = idleChildren.poll();        if (loop == null) {            if (maxChannels > 0 && activeChildren.size() >= maxChannels) {                throw tooManyChannels;            }            loop = newChild(childArgs);            loop.terminationFuture().addListener(childTerminationListener);        }        activeChildren.add(loop);        return loop;    }

产生新子eventLoop(SingleThreadEventExecutor.java)

/**     * Create a new instance     *     * @param parent            the {
@link EventExecutorGroup} which is the parent of this instance and belongs to it * @param executor the {
@link Executor} which will be used for executing * @param addTaskWakesUp {
@code true} if and only if invocation of {
@link #addTask(Runnable)} will wake up the * executor thread */ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { super(parent); if (executor == null) { throw new NullPointerException("executor"); } this.addTaskWakesUp = addTaskWakesUp; this.executor = executor; taskQueue = newTaskQueue(); }

其执行方法(SingleThreadEventExecutor.java):

@Override    public void execute(Runnable task) {        if (task == null) {            throw new NullPointerException("task");        }        boolean inEventLoop = inEventLoop();        if (inEventLoop) {            addTask(task);        } else {            startThread();            addTask(task);            if (isShutdown() && removeTask(task)) {                reject();            }        }        if (!addTaskWakesUp && wakesUpForTask(task)) {            wakeup(inEventLoop);        }    }

启动处理线程(SingleThreadEventExecutor.java):

private void startThread() {        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {                doStartThread();            }        }    }    private void doStartThread() {        assert thread == null;        executor.execute(new Runnable() {            @Override            public void run() {                thread = Thread.currentThread();                if (interrupted) {                    thread.interrupt();                }                boolean success = false;                updateLastExecutionTime();                try {                    SingleThreadEventExecutor.this.run();                    success = true;                } catch (Throwable t) {                    logger.warn("Unexpected exception from an event executor: ", t);                } finally {                    for (;;) {                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {                            break;                        }                    }                    // Check if confirmShutdown() was called at the end of the loop.                    if (success && gracefulShutdownStartTime == 0) {                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +                                "before run() implementation terminates.");                    }                    try {                        // Run all remaining tasks and shutdown hooks.                        for (;;) {                            if (confirmShutdown()) {                                break;                            }                        }                    } finally {                        try {                            cleanup();                        } finally {                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);                            threadLock.release();                            if (!taskQueue.isEmpty()) {                                logger.warn(                                        "An event executor terminated with " +                                                "non-empty task queue (" + taskQueue.size() + ')');                            }                            terminationFuture.setSuccess(null);                        }                    }                }            }        });    }

其中的run方法由其子类(DefaultEventLoop,EpollEventLoop,NioEventLoop,ThreadPerChannelEventLoop)各种实现,以NioEventLoop为例:

@Override    protected void run() {        for (;;) {            boolean oldWakenUp = wakenUp.getAndSet(false);            try {                if (hasTasks()) {                    selectNow();                } else {                    select(oldWakenUp);                    // 'wakenUp.compareAndSet(false, true)' is always evaluated                    // before calling 'selector.wakeup()' to reduce the wake-up                    // overhead. (Selector.wakeup() is an expensive operation.)                    //                    // However, there is a race condition in this approach.                    // The race condition is triggered when 'wakenUp' is set to                    // true too early.                    //                    // 'wakenUp' is set to true too early if:                    // 1) Selector is waken up between 'wakenUp.set(false)' and                    //    'selector.select(...)'. (BAD)                    // 2) Selector is waken up between 'selector.select(...)' and                    //    'if (wakenUp.get()) { ... }'. (OK)                    //                    // In the first case, 'wakenUp' is set to true and the                    // following 'selector.select(...)' will wake up immediately.                    // Until 'wakenUp' is set to false again in the next round,                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore                    // any attempt to wake up the Selector will fail, too, causing                    // the following 'selector.select(...)' call to block                    // unnecessarily.                    //                    // To fix this problem, we wake up the selector again if wakenUp                    // is true immediately after selector.select(...).                    // It is inefficient in that it wakes up the selector for both                    // the first case (BAD - wake-up required) and the second case                    // (OK - no wake-up required).                    if (wakenUp.get()) {                        selector.wakeup();                    }                }                cancelledKeys = 0;                needsToSelectAgain = false;                final int ioRatio = this.ioRatio;                if (ioRatio == 100) {                    processSelectedKeys();                    runAllTasks();                } else {                    final long ioStartTime = System.nanoTime();                    processSelectedKeys();                    final long ioTime = System.nanoTime() - ioStartTime;                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                }                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        break;                    }                }            } catch (Throwable t) {                logger.warn("Unexpected exception in the selector loop.", t);                // Prevent possible consecutive immediate failures that lead to                // excessive CPU consumption.                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    // Ignore.                }            }        }    }

运行所有任务(SingleThreadEventExecutor.java)

/**     * Poll all tasks from the task queue and run them via {
@link Runnable#run()} method. This method stops running * the tasks in the task queue and returns if it ran longer than {
@code timeoutNanos}. */ protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; }

小结

  本文从一个简单的示例程序,一步步分析netty4的线程模型,从ServerBootstrapAcceptor到SingleThreadEventExecutor的源码,环环相扣,可以根据上面的分析链理解

一个请求过来后,netty的处理流程。

 

转载地址:http://dsrvl.baihongyu.com/

你可能感兴趣的文章
spark scala学习笔记
查看>>
NeHe OpenGL教程 第三课:颜色渲染
查看>>
qml demo分析(photosurface-图片涅拉)
查看>>
BZOJ 2463: [中山市选2009]谁能赢呢?[智慧]
查看>>
Solidworks如何创建投影曲线
查看>>
鼠标悬浮tip 显示
查看>>
java反射基础
查看>>
使用Word2016发布随笔到博客园
查看>>
数组类型
查看>>
Database数据库切片模式
查看>>
深入分析事务的隔离级别
查看>>
基于Vue2 搭建移动端 webapp 框架
查看>>
Android View体系(四)从源码解析Scroller
查看>>
Cannot lock storage /tmp/hadoop-root/dfs/name. The directory is already locked.
查看>>
BFS和DFS的java实现
查看>>
Struts2框架起源
查看>>
Chromosome coordinate systems: 0-based, 1-based
查看>>
Myeclipse10集成Flex4.6
查看>>
java电影站点开发经验3
查看>>
notepad++自动对齐使用空格代替Tab并将空格显示为小点
查看>>