金曜日, 7月 04, 2014

Java8 - Collectors 平均関連

業務用のコードでは使うことはあまりないが,あったら便利な,ストリームの要素から平均を抽出するコード.


        System.out.println(

            Stream.of(50d, 62d, 79.4d, 62d, 80.0d, 89.25d, 5d)
                .collect( Collectors.averagingDouble( (x)->x) ).doubleValue() + " " +
            Stream.of(20,15,30,45,50,25)
                .collect( Collectors.averagingInt( (x)->x ) ).intValue() + " " +
            Stream.of(300L, 200L, 100L, 400L, 500L )
                .collect( Collectors.averagingLong( (x)->x ) ).longValue()

        );


現実的な用途の一つとして考えられるのはメッセージキュー中のデータの平均サイズを取得するようなものだろう.以下は5つの別スレッド上で動作するデータプロバイダーがデータキューにデータを書き込んで、サンプラーが先頭から30個のデータの長さの平均を計算する例.

package org.tanuneko;

import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class DataSampler {

    private static final int MAX_SAMPLESIZE = 30;
    private CopyOnWriteArrayList<Data> q;

    public DataSampler() {
        q = new CopyOnWriteArrayList<>();
        Executors.newFixedThreadPool(5).execute( new DataProvider( this ) );
    }

    public void doSampling() {

        while( true ) {

            System.out.println(
            q.stream().limit( MAX_SAMPLESIZE ).mapToInt( Data::getDataSize )
                    .summaryStatistics().getAverage()
            );

            try {
                Thread.sleep( 2000 );
            } catch( InterruptedException iE ) {
                iE.printStackTrace();
                System.exit(1);
            }
            System.out.println( "(QSIZE=" + q.size() + ",MAXSAMPLESIZE=" + MAX_SAMPLESIZE + ")" );

        }

    }

    public void addData( Data d ) {
        q.add( d );
    }

    public static void main (String args[] ) {

        DataSampler d = new DataSampler();
        d.doSampling();

    }

}

class DataProvider implements Runnable {

    private DataSampler sampler;

    public DataProvider( DataSampler sampler ) {
        this.sampler = sampler;
    }

    public void run() {

        Random r = new Random();
        r.setSeed( System.currentTimeMillis() );

        while( true ) {

            sampler.addData( new Data( genRandBytes() ) );
            try {
                Thread.sleep( r.nextInt( 3000 ) );
            } catch (InterruptedException iE) {
                throw new RuntimeException(iE);
            }

        }
    }

    private byte[] genRandBytes() {

        Random r = new Random();
        r.setSeed( System.currentTimeMillis() );
        return Stream.generate(()->"a").limit( r.nextInt( 100 ) )
                .collect( Collectors.joining() )
                .getBytes();

    }
}

class Data {

    private byte[] buffer;
    public Data( byte[] buffer ) {
        this.buffer = new byte[ buffer.length ];
        System.arraycopy( this.buffer, 0, buffer, 0, buffer.length );
    }
    public byte[] getData() {
        return buffer;
    }
    public int getDataSize() {
        return buffer.length;
    }

}

0 件のコメント: