Title: Java ConcurrentBlockingQueue,For ME[原创]

Categories: ConcurrentBlockingQueue

Description: 笔记

Keywords: ConcurrentBlockingQueue,java

ConcurrentBlockingQueue,自己写了一份增加理解,很久之前写代码,很地方不工整不要在意。

Java代码复习

date:2017-02-01 author:zhangjianxin

1. ConcurrentBlockingQueue

package me.firsh.tools;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class ConcurrentBlockingQueue<T extends Exchengeable> {
        public long p1, p2, p3, p4, p5, p6, p7; // cache line padding
        private volatile long proCursor = 0;
        public long p8, p9, p10, p11, p12, p13, p14; // cache line padding
        private volatile long comsumeCursor = 0;
        private final AtomicLong produceSequence = new AtomicLong(0);
        private final AtomicLong comsumeSequence = new AtomicLong(0);

        private final Exchengeable[] entries;
        private final int ringModMask;

        private final int nextHighestPowerOfTwo(int v) {
                v--;
                v |= v >> 1;
                v |= v >> 2;
                v |= v >> 4;
                v |= v >> 8;
                v |= v >> 16;
                v++;
                return v;
        }

        public ConcurrentBlockingQueue(final int size) {
                int sizeAsPowerOfTwo = Math.max(nextHighestPowerOfTwo(size), 1);
                ringModMask = sizeAsPowerOfTwo - 1;
                entries = new Exchengeable[sizeAsPowerOfTwo];
        }

        @SuppressWarnings("unchecked")
        private T getEntry(final long sequence) {
                return (T) entries[(int) (sequence & ringModMask)];
        }

        private void setEntry(final long sequence, T t) {
                final int index = (int) (sequence & ringModMask);
                if (entries[index] == null) {
                        entries[index] = t;
                } else {
                        entries[index].exchange(t);
                }
        }

        private final long waitForNextProduce() {
                while (true) {
                        if (produceSequence.get() - comsumeCursor < entries.length) {
                                return produceSequence.incrementAndGet();
                        } else {
                                try {
                                        Thread.yield();
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }
                }
        }

        public void put(T t) {
                long sequence = waitForNextProduce();
                setEntry(sequence - 1, t);
                proCursor = sequence;
        }

        private final long waitForComsume() {
                while (true) {
                        long comsume = proCursor - comsumeSequence.get();
                        if (comsume > 0 && comsume <= entries.length) {
                                return comsumeSequence.incrementAndGet();
                        } else {
                                try {
                                        Thread.yield();
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }
                }
        }

        public T take() {
                long nextSequence = waitForComsume();
                comsumeCursor = nextSequence;
                return getEntry(nextSequence - 1);
        }

        public boolean put(T t, long time, TimeUnit unit) throws IOException {
                put(t);
                return true;
        }

        public T take(long time, TimeUnit unit) throws IOException {
                return take();
        }

}

2.LengthByteArrayEncoder

@Sharable
public class LengthByteArrayEncoder extends MessageToMessageEncoder<byte[]> {
    @Override
    protected void encode(ChannelHandlerContext ctx, byte[] msg,
            List<Object> out) throws Exception {
        byte[] b1 = toBytes(msg.length);
        // out.add(Unpooled.wrappedBuffer(b1)); // length
        // out.add(Unpooled.wrappedBuffer(msg)); // data

        ByteBuf buf = Unpooled.wrappedBuffer(b1, msg); // length , data
        out.add(buf);
    }

    static final byte[] N1 = { B2Type.INT_N1 };
    static final byte[] I0 = { B2Type.INT_0 };
    static final byte[] I1 = { B2Type.INT_1 };
    static final byte[] I2 = { B2Type.INT_2 };
    static final byte[] I3 = { B2Type.INT_3 };
    static final byte[] I4 = { B2Type.INT_4 };
    static final byte[] I5 = { B2Type.INT_5 };
    static final byte[] I6 = { B2Type.INT_6 };
    static final byte[] I7 = { B2Type.INT_7 };
    static final byte[] I8 = { B2Type.INT_8 };
    static final byte[] I9 = { B2Type.INT_9 };
    static final byte[] I10 = { B2Type.INT_10 };
    static final byte[] I11 = { B2Type.INT_11 };
    static final byte[] I12 = { B2Type.INT_12 };
    static final byte[] I13 = { B2Type.INT_13 };
    static final byte[] I14 = { B2Type.INT_14 };
    static final byte[] I15 = { B2Type.INT_15 };
    static final byte[] I16 = { B2Type.INT_16 };
    static final byte[] I17 = { B2Type.INT_17 };
    static final byte[] I18 = { B2Type.INT_18 };
    static final byte[] I19 = { B2Type.INT_19 };
    static final byte[] I20 = { B2Type.INT_20 };
    static final byte[] I21 = { B2Type.INT_21 };
    static final byte[] I22 = { B2Type.INT_22 };
    static final byte[] I23 = { B2Type.INT_23 };
    static final byte[] I24 = { B2Type.INT_24 };
    static final byte[] I25 = { B2Type.INT_25 };
    static final byte[] I26 = { B2Type.INT_26 };
    static final byte[] I27 = { B2Type.INT_27 };
    static final byte[] I28 = { B2Type.INT_28 };
    static final byte[] I29 = { B2Type.INT_29 };
    static final byte[] I30 = { B2Type.INT_30 };
    static final byte[] I31 = { B2Type.INT_31 };
    static final byte[] I32 = { B2Type.INT_32 };

    static final byte[] toBytes(int v) {
        switch (v) {
        case -1:
            return N1;
        case 0:
            return I0;
        case 1:
            return I1;
        case 2:
            return I2;
        case 3:
            return I3;
        case 4:
            return I4;
        case 5:
            return I5;
        case 6:
            return I6;
        case 7:
            return I7;
        case 8:
            return I8;
        case 9:
            return I9;
        case 10:
            return I10;
        case 11:
            return I11;
        case 12:
            return I12;
        case 13:
            return I13;
        case 14:
            return I14;
        case 15:
            return I15;
        case 16:
            return I16;
        case 17:
            return I17;
        case 18:
            return I18;
        case 19:
            return I19;
        case 20:
            return I20;
        case 21:
            return I21;
        case 22:
            return I22;
        case 23:
            return I23;
        case 24:
            return I24;
        case 25:
            return I25;
        case 26:
            return I26;
        case 27:
            return I27;
        case 28:
            return I28;
        case 29:
            return I29;
        case 30:
            return I30;
        case 31:
            return I31;
        case 32:
            return I32;
        default:
            if (v >= NumEx.BYTE_MIN_VALUE && v <= NumEx.BYTE_MAX_VALUE) {
                // byte[] r2 = new byte[2];
                // r2[0] = B2Type.INT_8B;
                byte[] r2 = { B2Type.INT_8B, 1 };
                r2[1] = (byte) (v & 0xff);
                return r2;
            } else if (v >= NumEx.SHORT_MIN_VALUE && v <= NumEx.SHORT_MAX_VALUE) {
                // byte[] r2 = new byte[3];
                // r2[0] = B2Type.INT_16B;
                byte[] r2 = { B2Type.INT_16B, 1, 2 };
                r2[1] = (byte) ((v >> 8) & 0xff);
                r2[2] = (byte) ((v >> 0) & 0xff);
                return r2;
            } else {
                // byte[] r2 = new byte[5];
                // r2[0] = B2Type.INT_32B;
                byte[] r2 = { B2Type.INT_32B, 1, 2, 3, 4 };
                r2[1] = (byte) ((v >> 24) & 0xff);
                r2[2] = (byte) ((v >> 16) & 0xff);
                r2[3] = (byte) ((v >> 8) & 0xff);
                r2[4] = (byte) ((v >> 0) & 0xff);
                return r2;
            }
        }
    }
}

推荐在必要时候使用移位。