Java ile Multithreading İşlemleri-2 (Fork-Join)

Merhaba arkadaşlar, bugün sizinle fork join işlemleri üzerinde duracağız. Geçtiğimiz derste geleneksel threading yöntemiyle gitmiştik. Peki ya çok büyük veriler üzerinde çalışıyorsak. Şimdi sizinle 1’den 10 milyon’a kadar olan sayıların toplamını alan bir program yazalım bunu yaparken de Fork Join yaklaşımını kullanacağız.

Öncelikle fork join yaklaşımı hakkında biraz bilgi vereyim. Fork Join bir iş yükünü hiyerarşik olarak alt görevlere bölen bir yaklaşımdır. Örneğin 1000 elemanlı bir dizimiz olsun ve her bir fork join nesnesi en fazla 50 elemanın sorumluluğunu işleme alabilsin. İlk 1000 elemanlı dizi ikiye bölünmüş halde sağ ve sol join olmak üzere 2 alt nesneye 500’er eleman olarak verilir. Onlar da sayının limitin üstünde olduğundan dolayı 2’ye böler ve 250 elemanlı yeni sorumluluklar alt elemanlara bildirilir.Bu işlem sorumlu olunan sayılar, limitin altında kalıncaya kadar devam eder ve sonunda hiyerarşide altta kalan tüm nesneler hesaplama sonuçlarını üste bildirir.Daha iyi anlaşılabilmesi için aşağıdaki görseli paylaşıyorum.

Fork join geliştirmesi yaparken kullandığımız 2 temel sınıf vardır. Bunlar, RecursiveTask ve RecursiveAction. RecursiveTask değer döndürür ama RecursiveAction döndürmez.

Öncelikle Fork Join Sınıfımızı aşağıdaki gibi oluşturalım. Başta söylediğimiz gibi RecursiveTask’lar değer döndürür ve bizim Long tipinde bir değerimiz olacağı için gerek sınıfı deklere ederken gerekse Compute methodunu kullanırken bunu belirtmemiz gerekir.

class Sum extends RecursiveTask<Long>
{
    static final int BlockSize = 5000;
    int start, end;
    int[] data;

    Sum(int[] data, int start, int end)
    {
        this.data = data;
        this.start = start;
        this.end = end;
    }

    protected Long compute()
    {
        /* 
            Sorumluluk limitten küçükse direk hesapla yoksa alt görevlere dağıt ve 
            bunu recursive olarak yinele
       */
        if (end - start <= BlockSize)
        {
            long sum = 0;
            for (int i = start; i < end; ++i)
            {
                sum = sum + data[i];
            }
            return sum;
        }
        else
        {
            // Sorumluluğu böl ve alt görevler oluşturarak onlara paylaştır.
            int mid = start + (end - start) / 2;
            Sum left = new Sum(data, start, mid);
            Sum right = new Sum(data, mid, end);
            left.fork();
            right.fork();
            long rightSum = right.join();
            long leftSum = left.join();
            return leftSum + rightSum;
        }
    }
}

Main thread içerisinde diziyi oluştur. Diziyi,sorumluluk başlangıç noktası olarak 0’ı ve bitiş noktası olarak dizinin uzunluğunu parametre olarak fork join nesnesine ver.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Main
{

    public static void main( String[] args ) throws Exception
    {
        int numbers[] = new int[10000000];

        for(int i = 0; i < numbers.length; i++)
        {
            numbers[i] = i;
        }

        ForkJoinPool fjPool = new ForkJoinPool();
        long sum = fjPool.invoke(new Sum(numbers,0,numbers.length));
        System.out.println("Sum of numbers in the array = "+sum);
    }

}

Sonuç

Sum of numbers in the array = 49999995000000

Leave a Reply

Your email address will not be published. Required fields are marked *