QuickSelectsort,Median Finding Algorithm


最強・最速のクイックソート,最速median関数
① クイックソートにクイックセレクトの仕組みを加え,さらに,pivot値を工夫し,攻撃に強くしました。Quicksortの単なる改良版ですが,あえて名付けるならば、QuickSelectSortと いうところでしょうか。
② クイックソートにおいて,再帰の深さが 2*log2(n) になった時点でQuickSelectSortに切り替えることにより攻撃に強くした,安全クイックソートです
③ クイックソートのの改良の過程で,中央値を求める関数を考えました。この関数は,3D median filter用 としては世界最速と思われます。
④ クイックソートのを変形して,クイックセレクト(quickselect)の関数を作り ました。

① QuickSelectSort

スタックオーバーフローが起こらず,攻撃にも強いQuicksortの改良を考えました。この改良によってもO(n2) となる確率は0ではありませんが,現実的には絶対に起こりえません。言うならば,宇宙が明日消滅する確率より小さいというところでしょうか。
②の「安全クイックソート」と比較して,Median-Of-Three Killer Sequenceに対して高速に動作します。

改良点
 分割が進んでデータ数が50個未満になったときに「挿入ソー ト」に切り替えるもので,私の環境では,Wikipediaに掲載されている原型の実装例よ り 2割近く高速です。
「挿入ソート」では,まず先頭に最小値を持って来てから挿入ソートを行う事により,普通の挿入ソートより条件判定を簡略化して高速化しています。
 クイックソートにクイックセレクトの仕組みを加え,スタックオーバーフ ローを回避するソートを考案しました。この設定では,再帰呼出しの深さはlog2(N)/log2(32/31)≒22*log2(N) で抑えられます。
再帰呼出しの深さを抑える仕組みは,「データを二分割するときに,分割された大きい方が常に31/32以下であれば,再帰呼出しの深さは,log2(N)/log2(32/31) 以下になる」というものです。quickselectは再帰呼出しではないの で, 再帰呼出しの深さを抑えることができるのです。速 度を優先した設定にしてありますので,再帰呼び出しの深さを優先する場合は,range=range/32をrange=range/4と変更すれば,深 さはlog2(N)/log2(4/3)≒2.4*log2(N) で抑えられます。
 普通,pivot値は3値の中央値や5値の中央値のように, 中央値を求めるデ―タの個数が3や5などに固定されていますが,分割された大きい方が 31/32以下にならなかった場合は,31/32以下になるまで,中央値を求めるデ―タの個数を,3値から始めて2値ずつ増やすようにしました。これによ り,Median-Of-Three Killer SequenceのようなQuicksort killer sequenceを実際に作ることが不可能になります。

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SWAP(a,b) {value_type work=a;a=b;b=work;}
#define COMPSWAP(a,b) {if((b)<(a)) SWAP(a,b)}
typedef unsigned long value_type; // ソートするキーの型

value_type median(value_type *array,int begin,int range,int kosuu)
{
    value_type a[49];
    int i,j,imax,mid=kosuu/2,haba=range/(kosuu-1);
    for (i=0;i<kosuu;i++)
    {
        a[i]=array[begin];
        begin=begin+haba;
    }
    imax=mid;
    for (j=mid-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
    for (i=kosuu-1;i>=mid+1;i--){
        if (a[i]<a[imax]) {
            a[imax]=a[i];
            imax=mid;
            for (j=mid-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
        }
    }
    return a[imax];
}

value_type median3(value_type *a,int x,int y,int z)
{
    COMPSWAP(a[x],a[y])
    COMPSWAP(a[x],a[z])
    COMPSWAP(a[y],a[z])
    return a[y];
}

#define UTIKIRI 50
void qs_sort(value_type *array,int begin,int end)
{
    int range=end-begin;
    if (range<UTIKIRI)return;
    int begin2=begin,end2=end,kosuu=5;
    value_type pivot=median3(array,begin,begin+range/2,end);
    range=range/32;
    while(1){
        int i=begin2,j=end2;
        while(1){
            while(array[i]<pivot) i++;
            while(array[j]>pivot) j--;
            if(i>=j) break;
            SWAP(array[i],array[j])
            i++;j--;
        }
        if(i-begin<range)
            begin2=j+1;
        else if(end-j<range)
            end2=i-1;
        else {
            begin2=j+1;
            end2=i-1;
            break;
        }
        pivot=median(array,begin2,end2-begin2,kosuu);
        kosuu=kosuu+2;
    }
    qs_sort(array,begin,end2);
    qs_sort(array,begin2,end);
}

void insert_sort(value_type *array,int n)
{
    int i,j,min;
    if (n<UTIKIRI)min=n-1;else min=UTIKIRI-1;
    for (i=min-1;i>=0;i--)if (array[i]<array[min])min=i;
    SWAP(array[0],array[min])

    for (i=2;i<n;i++) {
        value_type work=array[i];
        for (j=i;array[j-1]>work;j--)array[j]=array[j-1];
        array[j]=work;
    }
}

void sort(value_type *array,int n)
{
    qs_sort(array,0,n-1);
    insert_sort(array,n);
}

int main()
{
    int n=100000000;
    int i;
    clock_t start,end;
    value_type *array=malloc(n*sizeof(value_type));
    srand((unsigned) time(NULL));
    for (i=0;i<n;i++)
        array[i]=rand()*(RAND_MAX+1)+rand();
    start=clock();
    sort(array,n);
    end=clock();
    printf("%f秒  \n", (double)(end-start)/CLOCKS_PER_SEC);
    for (i=1; i<n; i++) if(array[i-1]>array[i]) printf("%u\n", array[i]);
    free(array);
    return 0;
}

② 安全クイックソート
Quicksortの改良判で,作為的なデータによる攻撃を防ぐためのものです。

改良点
Quicksortにおいて,Median-Of-Three Killer SequenceのようなQuicksort killer sequenceによる作為的な攻撃を防ぐために,再帰の深さが 2*log2(n) になった時点で,QuickSelectSortに切り替える ようにしました。heapsortに切り替えるものはintorosortと呼ばれていますが,通常のデータに対してはQuicksortとして 動作しますので,新たなソートアルゴリズムではありません。
なお,野崎昭弘氏と杉本俊彦氏が,introsortと同じアルゴリズムを1979年に考案しており,情報処理学会論文誌 21(2), 164-166, 1980-03-15に「安全クイックソート」として掲載されています。

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SWAP(a,b) {value_type work=a;a=b;b=work;}
#define COMPSWAP(a,b) {if((b)<(a)) SWAP(a,b)}
typedef unsigned long value_type; //ソートするキーの型

value_type median(value_type *array,int begin,int range,int kosuu)
{
    value_type a[49];
    int i,j,imax,mid=kosuu/2,haba=range/(kosuu-1);
    for (i=0;i<kosuu;i++)
    {
        a[i]=array[begin];
        begin=begin+haba;
    }
    imax=mid;
    for (j=mid-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
    for (i=kosuu-1;i>=mid+1;i--){
        if (a[i]<a[imax]) {
            a[imax]=a[i];
            imax=mid;
            for (j=mid-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
        }
    }
    return a[imax];
}

#define UTIKIRI 50
void qs_sort(value_type *array,int begin,int end)
{
    int range=end-begin;
    if (range<UTIKIRI)return;
    int begin2=begin,end2=end,kosuu=5;
    range=range/4;
    while(1){  
        value_type pivot=median(array,begin2,end2-begin2,kosuu);
        int i=begin2,j=end2;
        while(1){
            while(array[i]<pivot) i++;
            while(array[j]>pivot) j--;
            if(i>=j) break;
            SWAP(array[i],array[j])
            i++;j--;
        }
        if(i-begin<range)
            begin2=j+1;
        else if(end-j<range)
            end2=i-1;
        else {
            begin2=j+1;
            end2=i-1;
            break;
        }
        kosuu=kosuu+2;
    }  
    qs_sort(array,begin,end2);
    qs_sort(array,begin2,end);
}

value_type median3(value_type *a,int x,int y,int z)
{
    COMPSWAP(a[x],a[y])
    COMPSWAP(a[x],a[z])
    COMPSWAP(a[y],a[z])
    return a[y];
}

void quicksort(value_type *array, int begin,int end, int maxdepth)
{
    int range=end-begin;
    if (range<UTIKIRI) return;
    if (maxdepth==0){
        qs_sort(array,begin,end);
        return;
    }
    value_type pivot=median3(array,begin,begin+range/2,end);
    int i=begin,j=end;
    while(1){
        while(array[i]<pivot) i++;
        while(array[j]>pivot) j--;
        if(i>=j) break;
        SWAP(array[i],array[j])
        i++;j--;
    }
    maxdepth--;
    quicksort(array,begin,i-1,maxdepth);
    quicksort(array,j+1,end,maxdepth);
}

void insert_sort(value_type *array,int n)
{
    int i,j,min;
    if (n<UTIKIRI)min=n-1;else min=UTIKIRI-1;
    for (i=min-1;i>=0;i--)if (array[i]<array[min])min=i;
    SWAP(array[0],array[min]);

    for (i=2;i<n;i++) {
        value_type work=array[i];
        for (j=i;array[j-1]>work;j--)array[j]=array[j-1];
        array[j]=work;
    }
}

void sort(value_type *array,int n)
{
    int maxdepth=0,i=n;
    while (i>>=1)maxdepth=maxdepth+2;//maxdepth=2*log2(n)
    quicksort(array,0,n-1,maxdepth);
    insert_sort(array,n);
}

int main()
{
    int n=100000000;
    int i;
    clock_t start,end;
    value_type *array=malloc(n*sizeof(value_type));
    srand((unsigned) time(NULL));
    for (i=0;i<n;i++)
        array[i]=rand()*(RAND_MAX+1)+rand();
    start=clock();
    sort(array,n);
    end=clock();
    printf("%f秒  \n", (double)(end-start)/CLOCKS_PER_SEC);
    for (i=1; i<n; i++) if(array[i-1]>array[i]) printf("%u\n", array[i]);
    free(array);
    return 0;
}

③ 3D median filter 用の中央値を求める関数

中央値を返す関数 median について
原理は,例えば9個のデータから,小さい方から5個のデータを抽出し,それら5個のデータの最大値を求めれば,それが中央値であるというものです。
実装では,高速化のための工夫により複雑ですが,原型は次のものです。
#define KOSUU 9
#define MID (KOSUU-1)/2
value_type median(value_type *a)
{
    int i,j,imax;
    for (i=KOSUU-1;i>=MID+1;i--){
        imax=MID;
        for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
        if (a[imax]>a[i])a[imax]=a[i];
    }
    imax=MID;
    for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
    return a[imax];
}
この関数が中央値を返すことは,次のように分かります。
データを
    a0,a1,a2,a3,a4,a5,a6,a7,a8
とします。まず,
    imax=4;
    for (j=3;j>=0;j--) if (a[j]>a[imax]) imax=j;
により,a[imax]が a0,a1,a2,a3,a4 の最大値になります。したがって,
    if (a[i]<a[imax]) a[imax]=a[i];
により,一回目のループで
   a0,a1,a2,a3,a4≦a8 …①
となります。次に2回目のループで
    a0,a1,a2,a3,a4≦a7
となりますが,2回目のループでa0,a1,a2,a3,a4は非増加ですから,①も成り立ちます。
同様にして繰り返すと,4回目のループで
    a0,a1,a2,a3,a4≦a5,a6,a7,a8
となるので,中央値は,a0,a1,a2,a3,a4 の最大値です。よって,次の
    imax=4;
    for (j=3;j>=0;j--) if (a[j]>a[imax]) imax=j;
により,a[imax]がa0,a1,a2,a3,a4 の最大値となるので,a[imax]は中央値です。
なお,この関数は,2n+1個のデータの中央値を求めるのに,(n+1)^2-1 回の比較を行います。したがって,KOSUU=9,すなわち,n=4の時の比較回数は24回です。

実装した関数で,赤で示した部分が,高速化のための工夫です。
value_type median(value_type *a)
{
    int i,j,imax;
    for (i=0;i<=MID-1;i++) COMPSWAP(a[i],a[i+MID+1]);
    for (i=1;i<=MID;i++) COMPSWAP(a[i],a[i+MID]);
    imax=MID;
    for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
    for (i=KOSUU-1;i>=MID+1;i--){
        if (a[i]<a[imax]) {
            a[imax]=a[i];
            imax=MID;
            for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
        }
    }
    return a[imax];
}
まず,次のように,a[i]>=a[imax]であるときは,if文の中の処理を行うようにします。ただし,ランダムなデータでは,a[i] <a[imax]である確率は低く,if文が増えた分だけ処理が重くなってしまいます。
if (a[i]<a[imax]) {
    a[imax]=a[i];
    imax=MID;
    for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
}
そこで,前処理により,a0,a1,a2,a3,a4になるべく小さいデータを,a5,a6,a7,a8になるべく大きいデータを入れることを考えまし た。それが次の処理です。
for (i=0;i<=MID-1;i++) COMPSWAP(a[i],a[i+MID+1]);
for (i=1;i<=MID;i++) COMPSWAP(a[i],a[i+MID]);
これにより,a[i]>=a[imax]である確率が高くなり,if文の中の処理を省略するため高速化します。ただし,データの数が9個のときは, 原型のコードより速度の改善は若干だけです。

2D median filter,3D median filter用として
中央値を求める関数medianは,2D median filter,3D median filter 用の中央値を求める関数として使え,データ数が27個の3D median filter 用としては,他に良いアルゴリズムがないため世界最速かと思われます。
実際,http://ndevilla.free.fr/median/median/index.html や http://www.disnetwork.info/the-blog/median-value-selection-algorithm にある関数より3割くらい高速です。
なお,gccのコンパイラの最適化レベルを最高にして実行速度を測定しています。最適化を行わない場合は,この関数は最速にはなりません。
ベンチマークのために, 下記に他の関数と共に示しました。

medianiq     QuickSelectを,対象データ数が少なくなったところでmedian に変更するもので,データ数が大きい場合でも高速です。
medianq0    QuickSelectよって途中まで並べ替えてから,挿入ソートによって並べ替えて中央値を求める関数です。
medianb     バブルソートによって途中まで並べ替えて中央値を求める関数です。
medians     選択ソートによって途中まで並べ替えて中央値を求める関数です。
mediani      挿入ソートによってすべてを並べ替えてから中央値を求める関数です。

なお,cpuとコンパイラとに依りますが,CPUがamdのFX8370eでgccのコンパイラの最適化レベルを最高で行ったとき,データ数が27個で は,処理時間の大小関係は
        median<QuickSelect<挿入ソート <選択ソート<バブルソート
です。ただし,データ数が37個以上ではmedianqが最速となります。

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SWAP(a,b) {value_type work=a;a=b;b=work;}
#define COMPSWAP(a,b) {if((b)<(a)) SWAP(a,b)}
#define SORT3(a,b,c) {COMPSWAP(a,b);COMPSWAP(a,c);COMPSWAP(b,c)}
typedef unsigned long value_type; // ソートするキーの型

#define KOSUU 27
#define MID (KOSUU-1)/2

value_type median(value_type *a)
{
    int i,j,imax;
    value_type work;
    for (i=0;i<=MID-1;i++) COMPSWAP(a[i],a[i+MID+1])
    for (i=1;i<=MID;i++) COMPSWAP(a[i],a[i+MID])
    imax=MID;
    for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
    for (i=KOSUU-1;i>=MID+1;i--){      
        if (a[i]<a[imax]) {
            a[imax]=a[i];
            imax=MID;
            for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
        }
    }
    return a[imax];
}

#define KYORI 10
value_type medianq(value_type *a)
{
    int begin=0,end=KOSUU-1;
    while (1){
        int middle=(begin+end)/2;
        SORT3(a[middle],a[begin],a[end]) //pivot=a[begin];
        int i=begin,j=end;
        while(1){
            do i++; while(a[i]<a[begin]);
            do j--; while(a[j]>a[begin]);
            if(i>=j) break;
            SWAP(a[i],a[j])
        }
        if (i>=MID+2){
            a[begin]=a[i-1];
            end=i-2;
            if (end<=MID+KYORI){
                int i,j,imax=MID;
                for (j=MID-1;j>=begin;j--) if (a[j]>a[imax]) imax=j;
                for (i=end;i>=MID+1;i--){      
                    if (a[i]<a[imax]) {
                        a[imax]=a[i];
                        imax=MID;
                        for (j=MID-1;j>=begin;j--) if (a[j]>a[imax]) imax=j;
                    }
                }
                return a[imax];
            }
        }else if (j<=MID-1){
            begin=j+1;
            if (begin>=MID-KYORI){
                int i,j,imin=MID;
                for (j=MID+1;j<=end;j++) if (a[j]<a[imin]) imin=j;
                for (i=begin;i<=MID-1;i++){      
                    if (a[i]>a[imin]) {
                        a[imin]=a[i];
                        imin=MID;
                        for (j=MID+1;j<=end;j++) if (a[j]<a[imin]) imin=j;
                    }
                }
                return a[imin];
            }   
        }else{           
            return a[begin];
        }   
    }
}

value_type medianq0(value_type *a)
{
    int begin=0,end=KOSUU-1,range,i,j;
    do{
        int middle=(begin+end)/2;
        SORT3(a[middle],a[begin],a[end])//pivot=a[begin];
        i=begin+1,j=end;
        SWAP(a[i],a[middle])
        while(1){
            do i++; while(a[i]<a[begin]);
            do j--; while(a[j]>a[begin]);
            if(i>=j) break;
            SWAP(a[i],a[j])
        }
        if (i>=MID+2){
            a[begin]=a[i-1];
            end=i-2;
        }else if (j<=MID-1){
            begin=j+1;
        }else{           
            return a[begin];
        }   
        range=end-begin;
    }while(range>9);
    for (i=begin+1;i<=end;i++) {
        value_type work=a[i];
        j=i;
        while (a[j-1]>work){
            a[j]=a[j-1];
            j--;
        }
        a[j]=work;
    }
    return a[MID];
}

value_type mediani(value_type *a)
{
    int i;
    for (i=1;i<KOSUU;i++) {
        value_type work=a[i];
        int j=i;
        while (j>0 && a[j-1]>work){
            a[j]=a[j-1];
            j--;
        }
        a[j]=work;
    }
    return a[MID];
}

value_type medians(value_type *a)
{
    int i,j;
    for (i=0;i<=MID;i++){
        int min=i;
        for (j=KOSUU-1;j>i;j--){
            if (a[j]<a[min])min=j;
        }
        SWAP(a[i],a[min])
    }
    return a[MID];
}

value_type medianb(value_type *a)
{
    int i,j;
    for (i=0;i<=MID;i++){
        for (j=KOSUU-1;j>i;j--){
            COMPSWAP(a[i],a[j])
        }
    }
    return a[MID];
}

int main()
{
    int i,j;
    clock_t start,end;
    value_type a[KOSUU],s=0;
    start=clock();
    value_type x=314159265;
    for (i=0;i<10000000;i++){
        for (j=0;j<KOSUU;j++){
            x=x*1103515245+12345;
            a[j]=x>>20;
        }
        s=s+median(a);
    }
    end=clock();
    printf("%f秒  \n", (double)(end-start)/CLOCKS_PER_SEC);
    printf("%u\n", s);
    return 0;
}

④ QuickSelect
データ数が多いときのQuickSelectのコードで,QuickSortのコードを変形したものです。

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SWAP(a,b) {value_type work=a;a=b;b=work;}
#define COMPSWAP(a,b) {if((b)<(a)) SWAP(a,b)}
typedef unsigned long value_type; // ソートするキーの型

#define KOSUU 13
#define MID (KOSUU-1)/2
value_type median(value_type *array,int begin,int range)
{
    int i,j,imax;
    value_type a[KOSUU];
    int haba=range/(KOSUU-1);
    for (i=0;i<KOSUU;i++)
    {
        a[i]=array[begin];
        begin=begin+haba;
    }
    for (i=0;i<=MID-1;i++) COMPSWAP(a[i],a[i+MID+1])
    for (i=1;i<=MID;i++) COMPSWAP(a[i],a[i+MID])
    imax=MID;
    for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
    for (i=KOSUU-1;i>=MID+1;i--){
        if (a[i]<a[imax]) {
            a[imax]=a[i];
            imax=MID;
            for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
        }
    }
    return a[imax];
}

value_type median3(value_type *a,int x,int y,int z)
{
    COMPSWAP(a[x],a[y])
    COMPSWAP(a[x],a[z])
    COMPSWAP(a[y],a[z])
    return a[y];
}

value_type quick_select(value_type *array,int n)
{
    int begin=0,end =n-1,middle=end/2;
    while (1){
        int range=end-begin;
        value_type pivot;
        if (range>=500){
            pivot=median(array,begin,range);
        }else{
            pivot=median3(array,begin,begin+range/2,end);
        }
        int i=begin,j=end;
        while(1){
            while(array[i]<pivot) i++;
            while(array[j]>pivot) j--;
            if(i>=j) break;
            SWAP(array[i],array[j])
            i++;j--;
        }
        if (i>=middle+1){
            end=i-1;
        }else if (j<=middle-1){
            begin=j+1;
        }else{          
            return pivot;
        }  
    }
}

int main()
{
    int n=100000001;
    int i;
    clock_t start,end;
    value_type *array=malloc(n*sizeof(value_type));
    for (i=0;i<n;i++)
        array[i]=rand()*(RAND_MAX+1)+rand();
    start=clock();
    value_type median=quick_select(array,n);
    end=clock();
    printf("%f秒  \n", (double)(end-start)/CLOCKS_PER_SEC);
    printf("%u\n", median);
    free(array);
    return 0;
}


最初のページ に戻る